You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/15 13:15:33 UTC

[flink] branch master updated: [FLINK-11710][tests] Refactor SimpleSlotProvider to TestingLogicalSlotProvider

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 946971a  [FLINK-11710][tests] Refactor SimpleSlotProvider to TestingLogicalSlotProvider
946971a is described below

commit 946971aa119a766a3e18f2981358733c4dedc166
Author: TisonKun <wa...@gmail.com>
AuthorDate: Thu May 9 10:43:40 2019 +0200

    [FLINK-11710][tests] Refactor SimpleSlotProvider to TestingLogicalSlotProvider
---
 .../executiongraph/utils/SimpleSlotProvider.java   | 31 +++++++++----
 .../runtime/jobmaster/TestingLogicalSlot.java      | 31 ++++++++++++-
 .../jobmaster/slotpool/SingleLogicalSlotTest.java  | 54 +++++-----------------
 3 files changed, 62 insertions(+), 54 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 007684d..76f7daa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -22,11 +22,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
-import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -35,10 +34,10 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -76,7 +75,8 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 				new AllocationID(),
 				new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
 				0,
-				taskManagerGateway);
+				taskManagerGateway,
+				ResourceProfile.UNKNOWN);
 			slots.add(as);
 		}
 
@@ -99,7 +99,15 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 				slot = slots.removeFirst();
 			}
 			if (slot != null) {
-				SimpleSlot result = new SimpleSlot(slot, this, 0);
+				TestingLogicalSlot result = new TestingLogicalSlot(
+					slot.getTaskManagerLocation(),
+					slot.getTaskManagerGateway(),
+					slot.getPhysicalSlotNumber(),
+					slot.getAllocationId(),
+					slotRequestId,
+					new SlotSharingGroupId(),
+					null,
+					this);
 				allocatedSlots.put(slotRequestId, slot);
 				return CompletableFuture.completedFuture(result);
 			}
@@ -124,12 +132,15 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 
 	@Override
 	public void returnLogicalSlot(LogicalSlot logicalSlot) {
-		Preconditions.checkArgument(logicalSlot instanceof Slot);
-
-		final Slot slot = ((Slot) logicalSlot);
-
 		synchronized (lock) {
-			slots.add(slot.getSlotContext());
+			SimpleSlotContext as = new SimpleSlotContext(
+				logicalSlot.getAllocationId(),
+				logicalSlot.getTaskManagerLocation(),
+				logicalSlot.getPhysicalSlotNumber(),
+				logicalSlot.getTaskManagerGateway(),
+				ResourceProfile.UNKNOWN);
+
+			slots.add(as);
 			allocatedSlots.remove(logicalSlot.getSlotRequestId());
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
index 516cd70..5060478 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
@@ -49,7 +49,10 @@ public class TestingLogicalSlot implements LogicalSlot {
 
 	@Nullable
 	private final CompletableFuture<?> customReleaseFuture;
-	
+
+	@Nullable
+	private final SlotOwner slotOwner;
+
 	private final AllocationID allocationId;
 
 	private final SlotRequestId slotRequestId;
@@ -79,6 +82,27 @@ public class TestingLogicalSlot implements LogicalSlot {
 			SlotRequestId slotRequestId,
 			SlotSharingGroupId slotSharingGroupId,
 			@Nullable CompletableFuture<?> customReleaseFuture) {
+		this(
+			taskManagerLocation,
+			taskManagerGateway,
+			slotNumber,
+			allocationId,
+			slotRequestId,
+			slotSharingGroupId,
+			customReleaseFuture,
+			null);
+	}
+
+	public TestingLogicalSlot(
+			TaskManagerLocation taskManagerLocation,
+			TaskManagerGateway taskManagerGateway,
+			int slotNumber,
+			AllocationID allocationId,
+			SlotRequestId slotRequestId,
+			SlotSharingGroupId slotSharingGroupId,
+			@Nullable CompletableFuture<?> customReleaseFuture,
+			@Nullable SlotOwner slotOwner) {
+
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
 		this.payloadReference = new AtomicReference<>();
@@ -88,6 +112,7 @@ public class TestingLogicalSlot implements LogicalSlot {
 		this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
 		this.releaseFuture = new CompletableFuture<>();
 		this.customReleaseFuture = customReleaseFuture;
+		this.slotOwner = slotOwner;
 	}
 
 	@Override
@@ -127,6 +152,10 @@ public class TestingLogicalSlot implements LogicalSlot {
 
 	@Override
 	public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
+		if (slotOwner != null) {
+			slotOwner.returnLogicalSlot(this);
+		}
+
 		if (customReleaseFuture != null) {
 			return customReleaseFuture;
 		} else {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
index 071fcc4..4570269 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
@@ -22,15 +22,14 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -75,12 +74,21 @@ public class SingleLogicalSlotTest extends TestLogger {
 	private SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner) {
 		return new SingleLogicalSlot(
 			new SlotRequestId(),
-			new DummySlotContext(),
+			createSlotContext(),
 			null,
 			Locality.LOCAL,
 			slotOwner);
 	}
 
+	private static SlotContext createSlotContext() {
+		return new SimpleSlotContext(
+			new AllocationID(),
+			new LocalTaskManagerLocation(),
+			0,
+			new SimpleAckingTaskManagerGateway(),
+			ResourceProfile.UNKNOWN);
+	}
+
 	@Test
 	public void testAlive() throws Exception {
 		final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot();
@@ -284,44 +292,4 @@ public class SingleLogicalSlotTest extends TestLogger {
 			returnAllocatedSlotFuture.complete(logicalSlot);
 		}
 	}
-
-	private static final class DummySlotContext implements SlotContext {
-
-		private final AllocationID allocationId;
-
-		private final TaskManagerLocation taskManagerLocation;
-
-		private final TaskManagerGateway taskManagerGateway;
-
-		DummySlotContext() {
-			allocationId = new AllocationID();
-			taskManagerLocation = new LocalTaskManagerLocation();
-			taskManagerGateway = new SimpleAckingTaskManagerGateway();
-		}
-
-		@Override
-		public AllocationID getAllocationId() {
-			return allocationId;
-		}
-
-		@Override
-		public TaskManagerLocation getTaskManagerLocation() {
-			return taskManagerLocation;
-		}
-
-		@Override
-		public int getPhysicalSlotNumber() {
-			return 0;
-		}
-
-		@Override
-		public ResourceProfile getResourceProfile() {
-			return ResourceProfile.UNKNOWN;
-		}
-
-		@Override
-		public TaskManagerGateway getTaskManagerGateway() {
-			return taskManagerGateway;
-		}
-	}
 }