You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/03 22:42:52 UTC
[flink] branch master updated: [FLINK-21204][coordination] Remove
LogicalSlot#getSlotSharingGroupId
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 21a36f8 [FLINK-21204][coordination] Remove LogicalSlot#getSlotSharingGroupId
21a36f8 is described below
commit 21a36f8dc29853233ed1aac6f4d09db5f9e03401
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jan 29 14:17:31 2021 +0100
[FLINK-21204][coordination] Remove LogicalSlot#getSlotSharingGroupId
---
.../org/apache/flink/runtime/jobmaster/LogicalSlot.java | 9 ---------
.../runtime/jobmaster/slotpool/SingleLogicalSlot.java | 14 +-------------
.../org/apache/flink/runtime/scheduler/SharedSlot.java | 1 -
.../executiongraph/ExecutionVertexLocalityTest.java | 6 +-----
.../apache/flink/runtime/jobmaster/TestingLogicalSlot.java | 11 -----------
.../flink/runtime/jobmaster/TestingLogicalSlotBuilder.java | 8 --------
.../runtime/jobmaster/slotpool/SingleLogicalSlotTest.java | 2 +-
.../org/apache/flink/runtime/scheduler/SharedSlotTest.java | 2 --
8 files changed, 3 insertions(+), 50 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
index 027c0df..5ea1999 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -135,14 +134,6 @@ public interface LogicalSlot {
*/
SlotRequestId getSlotRequestId();
- /**
- * Gets the slot sharing group id to which this slot belongs.
- *
- * @return slot sharing group id of this slot or null, if none.
- */
- @Nullable
- SlotSharingGroupId getSlotSharingGroupId();
-
/** Payload for a logical slot. */
interface Payload {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index f8e767b..ccf0bca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -50,7 +49,6 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
private final SlotContext slotContext;
// null if the logical slot does not belong to a slot sharing group, otherwise non-null
- @Nullable private final SlotSharingGroupId slotSharingGroupId;
// locality of this slot wrt the requested preferred locations
private final Locality locality;
@@ -72,23 +70,20 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
public SingleLogicalSlot(
SlotRequestId slotRequestId,
SlotContext slotContext,
- @Nullable SlotSharingGroupId slotSharingGroupId,
Locality locality,
SlotOwner slotOwner) {
- this(slotRequestId, slotContext, slotSharingGroupId, locality, slotOwner, true);
+ this(slotRequestId, slotContext, locality, slotOwner, true);
}
public SingleLogicalSlot(
SlotRequestId slotRequestId,
SlotContext slotContext,
- @Nullable SlotSharingGroupId slotSharingGroupId,
Locality locality,
SlotOwner slotOwner,
boolean willBeOccupiedIndefinitely) {
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
this.slotContext = Preconditions.checkNotNull(slotContext);
- this.slotSharingGroupId = slotSharingGroupId;
this.locality = Preconditions.checkNotNull(locality);
this.slotOwner = Preconditions.checkNotNull(slotOwner);
this.willBeOccupiedIndefinitely = willBeOccupiedIndefinitely;
@@ -154,12 +149,6 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
return slotRequestId;
}
- @Nullable
- @Override
- public SlotSharingGroupId getSlotSharingGroupId() {
- return slotSharingGroupId;
- }
-
public static SingleLogicalSlot allocateFromPhysicalSlot(
final SlotRequestId slotRequestId,
final PhysicalSlot physicalSlot,
@@ -171,7 +160,6 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
new SingleLogicalSlot(
slotRequestId,
physicalSlot,
- null,
locality,
slotOwner,
slotWillBeOccupiedIndefinitely);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
index 4d18035..24f0444 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
@@ -183,7 +183,6 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
return new SingleLogicalSlot(
logicalSlotRequestId,
physicalSlot,
- null,
Locality.UNKNOWN,
this,
slotWillBeOccupiedIndefinitely);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index a2e6f23..bc5d84a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -237,11 +237,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
LogicalSlot slot =
new SingleLogicalSlot(
- new SlotRequestId(),
- slotContext,
- null,
- Locality.LOCAL,
- mock(SlotOwner.class));
+ new SlotRequestId(), slotContext, Locality.LOCAL, mock(SlotOwner.class));
if (!vertex.getCurrentExecutionAttempt().tryAssignResource(slot)) {
throw new FlinkException("Could not assign resource.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
index 2245c74..32e8359 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -51,8 +50,6 @@ public class TestingLogicalSlot implements LogicalSlot {
private final SlotRequestId slotRequestId;
- @Nullable private final SlotSharingGroupId slotSharingGroupId;
-
private boolean released;
TestingLogicalSlot(
@@ -61,7 +58,6 @@ public class TestingLogicalSlot implements LogicalSlot {
int slotNumber,
AllocationID allocationId,
SlotRequestId slotRequestId,
- @Nullable SlotSharingGroupId slotSharingGroupId,
boolean automaticallyCompleteReleaseFuture,
SlotOwner slotOwner) {
@@ -71,7 +67,6 @@ public class TestingLogicalSlot implements LogicalSlot {
this.slotNumber = slotNumber;
this.allocationId = Preconditions.checkNotNull(allocationId);
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
- this.slotSharingGroupId = slotSharingGroupId;
this.releaseFuture = new CompletableFuture<>();
this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture;
this.slotOwner = Preconditions.checkNotNull(slotOwner);
@@ -141,12 +136,6 @@ public class TestingLogicalSlot implements LogicalSlot {
return slotRequestId;
}
- @Nullable
- @Override
- public SlotSharingGroupId getSlotSharingGroupId() {
- return slotSharingGroupId;
- }
-
public CompletableFuture<?> getReleaseFuture() {
return releaseFuture;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java
index f5c0443..ac3366f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -33,7 +32,6 @@ public class TestingLogicalSlotBuilder {
private int slotNumber = 0;
private AllocationID allocationId = new AllocationID();
private SlotRequestId slotRequestId = new SlotRequestId();
- private SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
private SlotOwner slotOwner = new DummySlotOwner();
private boolean automaticallyCompleteReleaseFuture = true;
@@ -63,11 +61,6 @@ public class TestingLogicalSlotBuilder {
return this;
}
- public TestingLogicalSlotBuilder setSlotSharingGroupId(SlotSharingGroupId slotSharingGroupId) {
- this.slotSharingGroupId = slotSharingGroupId;
- return this;
- }
-
public TestingLogicalSlotBuilder setAutomaticallyCompleteReleaseFuture(
boolean automaticallyCompleteReleaseFuture) {
this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture;
@@ -86,7 +79,6 @@ public class TestingLogicalSlotBuilder {
slotNumber,
allocationId,
slotRequestId,
- slotSharingGroupId,
automaticallyCompleteReleaseFuture,
slotOwner);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
index 99f75c2..ce981da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
@@ -71,7 +71,7 @@ public class SingleLogicalSlotTest extends TestLogger {
private SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner) {
return new SingleLogicalSlot(
- new SlotRequestId(), createSlotContext(), null, Locality.LOCAL, slotOwner);
+ new SlotRequestId(), createSlotContext(), Locality.LOCAL, slotOwner);
}
private static SlotContext createSlotContext() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
index 885c2c9..ac633c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
@@ -40,7 +40,6 @@ import java.util.function.Consumer;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId;
import static org.apache.flink.runtime.scheduler.SharedSlotTestingUtils.createExecutionSlotSharingGroup;
import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -107,7 +106,6 @@ public class SharedSlotTest extends TestLogger {
assertThat(logicalSlot.getTaskManagerGateway(), is(taskManagerGateway));
assertThat(logicalSlot.getPhysicalSlotNumber(), is(physicalSlotNumber));
assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
- assertThat(logicalSlot.getSlotSharingGroupId(), nullValue());
}
@Test