You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/15 13:15:33 UTC
[flink] branch master updated: [FLINK-11710][tests] Refactor
SimpleSlotProvider to TestingLogicalSlotProvider
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 946971a [FLINK-11710][tests] Refactor SimpleSlotProvider to TestingLogicalSlotProvider
946971a is described below
commit 946971aa119a766a3e18f2981358733c4dedc166
Author: TisonKun <wa...@gmail.com>
AuthorDate: Thu May 9 10:43:40 2019 +0200
[FLINK-11710][tests] Refactor SimpleSlotProvider to TestingLogicalSlotProvider
---
.../executiongraph/utils/SimpleSlotProvider.java | 31 +++++++++----
.../runtime/jobmaster/TestingLogicalSlot.java | 31 ++++++++++++-
.../jobmaster/slotpool/SingleLogicalSlotTest.java | 54 +++++-----------------
3 files changed, 62 insertions(+), 54 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 007684d..76f7daa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -22,11 +22,10 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SimpleSlotContext;
-import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -35,10 +34,10 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
@@ -76,7 +75,8 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
new AllocationID(),
new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
0,
- taskManagerGateway);
+ taskManagerGateway,
+ ResourceProfile.UNKNOWN);
slots.add(as);
}
@@ -99,7 +99,15 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
slot = slots.removeFirst();
}
if (slot != null) {
- SimpleSlot result = new SimpleSlot(slot, this, 0);
+ TestingLogicalSlot result = new TestingLogicalSlot(
+ slot.getTaskManagerLocation(),
+ slot.getTaskManagerGateway(),
+ slot.getPhysicalSlotNumber(),
+ slot.getAllocationId(),
+ slotRequestId,
+ new SlotSharingGroupId(),
+ null,
+ this);
allocatedSlots.put(slotRequestId, slot);
return CompletableFuture.completedFuture(result);
}
@@ -124,12 +132,15 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
@Override
public void returnLogicalSlot(LogicalSlot logicalSlot) {
- Preconditions.checkArgument(logicalSlot instanceof Slot);
-
- final Slot slot = ((Slot) logicalSlot);
-
synchronized (lock) {
- slots.add(slot.getSlotContext());
+ SimpleSlotContext as = new SimpleSlotContext(
+ logicalSlot.getAllocationId(),
+ logicalSlot.getTaskManagerLocation(),
+ logicalSlot.getPhysicalSlotNumber(),
+ logicalSlot.getTaskManagerGateway(),
+ ResourceProfile.UNKNOWN);
+
+ slots.add(as);
allocatedSlots.remove(logicalSlot.getSlotRequestId());
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
index 516cd70..5060478 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
@@ -49,7 +49,10 @@ public class TestingLogicalSlot implements LogicalSlot {
@Nullable
private final CompletableFuture<?> customReleaseFuture;
-
+
+ @Nullable
+ private final SlotOwner slotOwner;
+
private final AllocationID allocationId;
private final SlotRequestId slotRequestId;
@@ -79,6 +82,27 @@ public class TestingLogicalSlot implements LogicalSlot {
SlotRequestId slotRequestId,
SlotSharingGroupId slotSharingGroupId,
@Nullable CompletableFuture<?> customReleaseFuture) {
+ this(
+ taskManagerLocation,
+ taskManagerGateway,
+ slotNumber,
+ allocationId,
+ slotRequestId,
+ slotSharingGroupId,
+ customReleaseFuture,
+ null);
+ }
+
+ public TestingLogicalSlot(
+ TaskManagerLocation taskManagerLocation,
+ TaskManagerGateway taskManagerGateway,
+ int slotNumber,
+ AllocationID allocationId,
+ SlotRequestId slotRequestId,
+ SlotSharingGroupId slotSharingGroupId,
+ @Nullable CompletableFuture<?> customReleaseFuture,
+ @Nullable SlotOwner slotOwner) {
+
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
this.payloadReference = new AtomicReference<>();
@@ -88,6 +112,7 @@ public class TestingLogicalSlot implements LogicalSlot {
this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
this.releaseFuture = new CompletableFuture<>();
this.customReleaseFuture = customReleaseFuture;
+ this.slotOwner = slotOwner;
}
@Override
@@ -127,6 +152,10 @@ public class TestingLogicalSlot implements LogicalSlot {
@Override
public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
+ if (slotOwner != null) {
+ slotOwner.returnLogicalSlot(this);
+ }
+
if (customReleaseFuture != null) {
return customReleaseFuture;
} else {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
index 071fcc4..4570269 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
@@ -22,15 +22,14 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
@@ -75,12 +74,21 @@ public class SingleLogicalSlotTest extends TestLogger {
private SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner) {
return new SingleLogicalSlot(
new SlotRequestId(),
- new DummySlotContext(),
+ createSlotContext(),
null,
Locality.LOCAL,
slotOwner);
}
+ private static SlotContext createSlotContext() {
+ return new SimpleSlotContext(
+ new AllocationID(),
+ new LocalTaskManagerLocation(),
+ 0,
+ new SimpleAckingTaskManagerGateway(),
+ ResourceProfile.UNKNOWN);
+ }
+
@Test
public void testAlive() throws Exception {
final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot();
@@ -284,44 +292,4 @@ public class SingleLogicalSlotTest extends TestLogger {
returnAllocatedSlotFuture.complete(logicalSlot);
}
}
-
- private static final class DummySlotContext implements SlotContext {
-
- private final AllocationID allocationId;
-
- private final TaskManagerLocation taskManagerLocation;
-
- private final TaskManagerGateway taskManagerGateway;
-
- DummySlotContext() {
- allocationId = new AllocationID();
- taskManagerLocation = new LocalTaskManagerLocation();
- taskManagerGateway = new SimpleAckingTaskManagerGateway();
- }
-
- @Override
- public AllocationID getAllocationId() {
- return allocationId;
- }
-
- @Override
- public TaskManagerLocation getTaskManagerLocation() {
- return taskManagerLocation;
- }
-
- @Override
- public int getPhysicalSlotNumber() {
- return 0;
- }
-
- @Override
- public ResourceProfile getResourceProfile() {
- return ResourceProfile.UNKNOWN;
- }
-
- @Override
- public TaskManagerGateway getTaskManagerGateway() {
- return taskManagerGateway;
- }
- }
}