You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 15:33:36 UTC
[2/6] flink git commit: [FLINK-8087] Decouple Slot from AllocatedSlot
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 2e6558a..586f51b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.LogicalSlot;
@@ -39,7 +38,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
@@ -285,7 +284,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(parallelism);
final TestingSlotOwner slotOwner = new TestingSlotOwner();
slotOwner.setReturnAllocatedSlotConsumer(
- (Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
+ (Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
@@ -366,7 +365,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(2);
final TestingSlotOwner slotOwner = new TestingSlotOwner();
slotOwner.setReturnAllocatedSlotConsumer(
- (Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
+ (Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
final SimpleSlot[] slots = new SimpleSlot[parallelism];
@@ -448,8 +447,11 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
TaskManagerLocation location = new TaskManagerLocation(
ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
- AllocatedSlot slot = new AllocatedSlot(
- new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, taskManager);
+ SimpleSlotContext slot = new SimpleSlotContext(
+ new AllocationID(),
+ location,
+ 0,
+ taskManager);
return new SimpleSlot(slot, slotOwner, 0);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
index 4ce3f9d..3c8d994 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -111,28 +111,28 @@ public class ExecutionGraphStopTest extends TestLogger {
// deploy source 1
for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) {
- SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
+ SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway);
ev.getCurrentExecutionAttempt().tryAssignResource(slot);
ev.getCurrentExecutionAttempt().deploy();
}
// deploy source 2
for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) {
- SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
+ SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway);
ev.getCurrentExecutionAttempt().tryAssignResource(slot);
ev.getCurrentExecutionAttempt().deploy();
}
// deploy non-source 1
for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) {
- SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
+ SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway);
ev.getCurrentExecutionAttempt().tryAssignResource(slot);
ev.getCurrentExecutionAttempt().deploy();
}
// deploy non-source 2
for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) {
- SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
+ SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway);
ev.getCurrentExecutionAttempt().tryAssignResource(slot);
ev.getCurrentExecutionAttempt().deploy();
}
@@ -164,7 +164,7 @@ public class ExecutionGraphStopTest extends TestLogger {
when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
+ final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(gateway);
exec.tryAssignResource(slot);
exec.deploy();
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 42a63ec..06ffaa0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
@@ -49,7 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -240,19 +239,20 @@ public class ExecutionGraphTestUtils {
// Mocking Slots
// ------------------------------------------------------------------------
- public static SimpleSlot createMockSimpleSlot(JobID jid, TaskManagerGateway gateway) {
+ public static SimpleSlot createMockSimpleSlot(TaskManagerGateway gateway) {
final TaskManagerLocation location = new TaskManagerLocation(
ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572);
- final AllocatedSlot allocatedSlot = new AllocatedSlot(
+ final SimpleSlotContext allocatedSlot = new SimpleSlotContext(
new AllocationID(),
- jid,
location,
0,
- ResourceProfile.UNKNOWN,
gateway);
- return new SimpleSlot(allocatedSlot, mock(SlotOwner.class), 0);
+ return new SimpleSlot(
+ allocatedSlot,
+ mock(SlotOwner.class),
+ 0);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index c6fb836..71d6f51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -81,7 +81,6 @@ public class ExecutionTest extends TestLogger {
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final SimpleSlot slot = new SimpleSlot(
- new JobID(),
slotOwner,
new LocalTaskManagerLocation(),
0,
@@ -121,7 +120,6 @@ public class ExecutionTest extends TestLogger {
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final SimpleSlot slot = new SimpleSlot(
- new JobID(),
slotOwner,
new LocalTaskManagerLocation(),
0,
@@ -171,7 +169,6 @@ public class ExecutionTest extends TestLogger {
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final SimpleSlot slot = new SimpleSlot(
- new JobID(),
slotOwner,
new LocalTaskManagerLocation(),
0,
@@ -285,11 +282,12 @@ public class ExecutionTest extends TestLogger {
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final SimpleSlot slot = new SimpleSlot(
- new JobID(),
slotOwner,
new LocalTaskManagerLocation(),
0,
- new SimpleAckingTaskManagerGateway());
+ new SimpleAckingTaskManagerGateway(),
+ null,
+ null);
final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 1b8daca..cd613f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -18,36 +18,42 @@
package org.apache.flink.runtime.executiongraph;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.io.IOException;
-
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.io.IOException;
+
import scala.concurrent.ExecutionContext;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexResource;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
@SuppressWarnings("serial")
public class ExecutionVertexCancelTest extends TestLogger {
@@ -134,7 +140,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
executionContext, 2);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot();
vertex.deployToSlot(slot);
@@ -202,7 +208,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
2);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot();
vertex.deployToSlot(slot);
@@ -262,7 +268,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
1);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot();
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
@@ -302,7 +308,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
1);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot();
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
@@ -350,7 +356,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
1);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot();
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
@@ -383,7 +389,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
final ActorGateway gateway = new CancelSequenceActorGateway(TestingUtils.directExecutionContext(), 0);
Instance instance = getInstance(new ActorTaskManagerGateway(gateway));
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot();
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
@@ -458,7 +464,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
// the scheduler (or any caller) needs to know that the slot should be released
try {
Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot();
vertex.deployToSlot(slot);
fail("Method should throw an exception");
@@ -501,7 +507,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
setVertexState(vertex, ExecutionState.CANCELING);
Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot();
vertex.deployToSlot(slot);
fail("Method should throw an exception");
@@ -517,7 +523,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
AkkaUtils.getDefaultTimeout());
Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
- SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot = instance.allocateSimpleSlot();
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.CANCELING);
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 973c7d4..7f97d12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
@@ -67,7 +67,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleActorGateway(TestingUtils.directExecutionContext())));
- final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+ final SimpleSlot slot = instance.allocateSimpleSlot();
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
@@ -104,7 +104,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
final Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleActorGateway(TestingUtils.directExecutionContext())));
- final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+ final SimpleSlot slot = instance.allocateSimpleSlot();
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
@@ -146,7 +146,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
final Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleActorGateway(TestingUtils.defaultExecutionContext())));
- final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+ final SimpleSlot slot = instance.allocateSimpleSlot();
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -191,7 +191,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
final Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleFailingActorGateway(TestingUtils.directExecutionContext())));
- final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+ final SimpleSlot slot = instance.allocateSimpleSlot();
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -221,7 +221,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
final Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleFailingActorGateway(TestingUtils.directExecutionContext())));
- final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+ final SimpleSlot slot = instance.allocateSimpleSlot();
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -265,7 +265,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
final Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleActorGateway(TestingUtils.directExecutionContext())));
- final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+ final SimpleSlot slot = instance.allocateSimpleSlot();
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
@@ -310,7 +310,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
context,
2)));
- final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+ final SimpleSlot slot = instance.allocateSimpleSlot();
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -372,7 +372,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
result.getPartitions()[0].addConsumer(mockEdge, 0);
AllocatedSlot allocatedSlot = mock(AllocatedSlot.class);
- when(allocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
+ when(allocatedSlot.getAllocationId()).thenReturn(new AllocationID());
LogicalSlot slot = mock(LogicalSlot.class);
when(slot.getAllocationId()).thenReturn(new AllocationID());
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 15d021a..98f7259 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -37,7 +36,8 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -233,8 +233,8 @@ public class ExecutionVertexLocalityTest extends TestLogger {
// - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted
// - exposing test methods in the ExecutionVertex leads to undesirable setters
- AllocatedSlot slot = new AllocatedSlot(
- new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
+ SlotContext slot = new SimpleSlotContext(
+ new AllocationID(), location, 0, mock(TaskManagerGateway.class));
SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 2941739..9310912 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -57,7 +57,7 @@ public class ExecutionVertexSchedulingTest {
// a slot than cannot be deployed to
final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
- final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+ final SimpleSlot slot = instance.allocateSimpleSlot();
slot.releaseInstanceSlot();
assertTrue(slot.isReleased());
@@ -89,7 +89,7 @@ public class ExecutionVertexSchedulingTest {
// a slot than cannot be deployed to
final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
- final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+ final SimpleSlot slot = instance.allocateSimpleSlot();
slot.releaseInstanceSlot();
assertTrue(slot.isReleased());
@@ -126,7 +126,7 @@ public class ExecutionVertexSchedulingTest {
final Instance instance = getInstance(new ActorTaskManagerGateway(
new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext())));
- final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
+ final SimpleSlot slot = instance.allocateSimpleSlot();
Scheduler scheduler = mock(Scheduler.class);
CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 14e0e66..9a19d24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -29,7 +28,8 @@ import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -47,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class SimpleSlotProvider implements SlotProvider, SlotOwner {
- private final ArrayDeque<AllocatedSlot> slots;
+ private final ArrayDeque<SlotContext> slots;
public SimpleSlotProvider(JobID jobId, int numSlots) {
this(jobId, numSlots, new SimpleAckingTaskManagerGateway());
@@ -60,12 +60,10 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
this.slots = new ArrayDeque<>(numSlots);
for (int i = 0; i < numSlots; i++) {
- AllocatedSlot as = new AllocatedSlot(
+ SimpleSlotContext as = new SimpleSlotContext(
new AllocationID(),
- jobId,
new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
0,
- ResourceProfile.UNKNOWN,
taskManagerGateway);
slots.add(as);
}
@@ -76,7 +74,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {
- final AllocatedSlot slot;
+ final SlotContext slot;
synchronized (slots) {
if (slots.isEmpty()) {
@@ -98,7 +96,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
@Override
public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
synchronized (slots) {
- slots.add(slot.getAllocatedSlot());
+ slots.add(slot.getSlotContext());
}
return CompletableFuture.completedFuture(true);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
index 0e4bfc0..bc396c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -20,7 +20,12 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -28,10 +33,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-public class AllocatedSlotsTest {
+public class AllocatedSlotsTest extends TestLogger {
@Test
public void testOperations() throws Exception {
@@ -39,12 +42,13 @@ public class AllocatedSlotsTest {
final AllocationID allocation1 = new AllocationID();
final SlotPoolGateway.SlotRequestID slotRequestID = new SlotPoolGateway.SlotRequestID();
- final ResourceID resource1 = new ResourceID("resource1");
- final Slot slot1 = createSlot(resource1, allocation1);
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final ResourceID resource1 = taskManagerLocation.getResourceID();
+ final AllocatedSlot slot1 = createSlot(allocation1, taskManagerLocation);
allocatedSlots.add(slotRequestID, slot1);
- assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+ assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
assertTrue(allocatedSlots.containResource(resource1));
assertEquals(slot1, allocatedSlots.get(allocation1));
@@ -53,12 +57,12 @@ public class AllocatedSlotsTest {
final AllocationID allocation2 = new AllocationID();
final SlotPoolGateway.SlotRequestID slotRequestID2 = new SlotPoolGateway.SlotRequestID();
- final Slot slot2 = createSlot(resource1, allocation2);
+ final AllocatedSlot slot2 = createSlot(allocation2, taskManagerLocation);
allocatedSlots.add(slotRequestID2, slot2);
- assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
- assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+ assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+ assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
assertTrue(allocatedSlots.containResource(resource1));
assertEquals(slot1, allocatedSlots.get(allocation1));
@@ -68,14 +72,15 @@ public class AllocatedSlotsTest {
final AllocationID allocation3 = new AllocationID();
final SlotPoolGateway.SlotRequestID slotRequestID3 = new SlotPoolGateway.SlotRequestID();
- final ResourceID resource2 = new ResourceID("resource2");
- final Slot slot3 = createSlot(resource2, allocation3);
+ final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
+ final ResourceID resource2 = taskManagerLocation2.getResourceID();
+ final AllocatedSlot slot3 = createSlot(allocation3, taskManagerLocation2);
allocatedSlots.add(slotRequestID3, slot3);
- assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
- assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
- assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+ assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+ assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
+ assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
assertTrue(allocatedSlots.containResource(resource1));
assertTrue(allocatedSlots.containResource(resource2));
@@ -86,11 +91,11 @@ public class AllocatedSlotsTest {
assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
assertEquals(3, allocatedSlots.size());
- allocatedSlots.remove(slot2);
+ allocatedSlots.remove(slot2.getAllocationId());
- assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
- assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
- assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+ assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
+ assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+ assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
assertTrue(allocatedSlots.containResource(resource1));
assertTrue(allocatedSlots.containResource(resource2));
@@ -101,11 +106,11 @@ public class AllocatedSlotsTest {
assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
assertEquals(2, allocatedSlots.size());
- allocatedSlots.remove(slot1);
+ allocatedSlots.remove(slot1.getAllocationId());
- assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
- assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
- assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+ assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
+ assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+ assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
assertFalse(allocatedSlots.containResource(resource1));
assertTrue(allocatedSlots.containResource(resource2));
@@ -116,11 +121,11 @@ public class AllocatedSlotsTest {
assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
assertEquals(1, allocatedSlots.size());
- allocatedSlots.remove(slot3);
+ allocatedSlots.remove(slot3.getAllocationId());
- assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
- assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
- assertFalse(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+ assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
+ assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
+ assertFalse(allocatedSlots.contains(slot3.getAllocationId()));
assertFalse(allocatedSlots.containResource(resource1));
assertFalse(allocatedSlots.containResource(resource2));
@@ -132,13 +137,13 @@ public class AllocatedSlotsTest {
assertEquals(0, allocatedSlots.size());
}
- private Slot createSlot(final ResourceID resourceId, final AllocationID allocationId) {
- AllocatedSlot mockAllocatedSlot = mock(AllocatedSlot.class);
- Slot slot = mock(Slot.class);
- when(slot.getTaskManagerID()).thenReturn(resourceId);
- when(slot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
-
- when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(allocationId);
- return slot;
+ private AllocatedSlot createSlot(final AllocationID allocationId, final TaskManagerLocation taskManagerLocation) {
+ return new AllocatedSlot(
+ allocationId,
+ taskManagerLocation,
+ 0,
+ ResourceProfile.UNKNOWN,
+ new SimpleAckingTaskManagerGateway(),
+ new DummySlotOwner());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
index 4ed88c4..9ede899 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
@@ -18,14 +18,15 @@
package org.apache.flink.runtime.instance;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -35,7 +36,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class AvailableSlotsTest {
+public class AvailableSlotsTest extends TestLogger {
static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
@@ -57,27 +58,27 @@ public class AvailableSlotsTest {
availableSlots.add(slot3, 3L);
assertEquals(3, availableSlots.size());
- assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
- assertTrue(availableSlots.contains(slot2.getSlotAllocationId()));
- assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
+ assertTrue(availableSlots.contains(slot1.getAllocationId()));
+ assertTrue(availableSlots.contains(slot2.getAllocationId()));
+ assertTrue(availableSlots.contains(slot3.getAllocationId()));
assertTrue(availableSlots.containsTaskManager(resource1));
assertTrue(availableSlots.containsTaskManager(resource2));
availableSlots.removeAllForTaskManager(resource1);
assertEquals(1, availableSlots.size());
- assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
- assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
- assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
+ assertFalse(availableSlots.contains(slot1.getAllocationId()));
+ assertFalse(availableSlots.contains(slot2.getAllocationId()));
+ assertTrue(availableSlots.contains(slot3.getAllocationId()));
assertFalse(availableSlots.containsTaskManager(resource1));
assertTrue(availableSlots.containsTaskManager(resource2));
availableSlots.removeAllForTaskManager(resource2);
assertEquals(0, availableSlots.size());
- assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
- assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
- assertFalse(availableSlots.contains(slot3.getSlotAllocationId()));
+ assertFalse(availableSlots.contains(slot1.getAllocationId()));
+ assertFalse(availableSlots.contains(slot2.getAllocationId()));
+ assertFalse(availableSlots.contains(slot3.getAllocationId()));
assertFalse(availableSlots.containsTaskManager(resource1));
assertFalse(availableSlots.containsTaskManager(resource2));
}
@@ -92,7 +93,7 @@ public class AvailableSlotsTest {
availableSlots.add(slot1, 1L);
assertEquals(1, availableSlots.size());
- assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
+ assertTrue(availableSlots.contains(slot1.getAllocationId()));
assertTrue(availableSlots.containsTaskManager(resource1));
assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, null));
@@ -100,7 +101,7 @@ public class AvailableSlotsTest {
SlotAndLocality slotAndLocality = availableSlots.poll(DEFAULT_TESTING_PROFILE, null);
assertEquals(slot1, slotAndLocality.slot());
assertEquals(0, availableSlots.size());
- assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
+ assertFalse(availableSlots.contains(slot1.getAllocationId()));
assertFalse(availableSlots.containsTaskManager(resource1));
}
@@ -112,10 +113,10 @@ public class AvailableSlotsTest {
return new AllocatedSlot(
new AllocationID(),
- new JobID(),
mockTaskManagerLocation,
0,
DEFAULT_TESTING_PROFILE,
- mockTaskManagerGateway);
+ mockTaskManagerGateway,
+ new DummySlotOwner());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 5b85f72..229237d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -18,17 +18,22 @@
package org.apache.flink.runtime.instance;
-import static org.junit.Assert.*;
-
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
import org.junit.Test;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
/**
* Tests for the {@link Instance} class.
*/
@@ -53,10 +58,10 @@ public class InstanceTest {
assertEquals(4, instance.getNumberOfAvailableSlots());
assertEquals(0, instance.getNumberOfAllocatedSlots());
- SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
- SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
- SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
- SimpleSlot slot4 = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot1 = instance.allocateSimpleSlot();
+ SimpleSlot slot2 = instance.allocateSimpleSlot();
+ SimpleSlot slot3 = instance.allocateSimpleSlot();
+ SimpleSlot slot4 = instance.allocateSimpleSlot();
assertNotNull(slot1);
assertNotNull(slot2);
@@ -69,7 +74,7 @@ public class InstanceTest {
slot3.getSlotNumber() + slot4.getSlotNumber());
// no more slots
- assertNull(instance.allocateSimpleSlot(new JobID()));
+ assertNull(instance.allocateSimpleSlot());
try {
instance.returnAllocatedSlot(slot2);
fail("instance accepted a non-cancelled slot.");
@@ -118,9 +123,9 @@ public class InstanceTest {
assertEquals(3, instance.getNumberOfAvailableSlots());
- SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
- SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
- SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot1 = instance.allocateSimpleSlot();
+ SimpleSlot slot2 = instance.allocateSimpleSlot();
+ SimpleSlot slot3 = instance.allocateSimpleSlot();
instance.markDead();
@@ -154,9 +159,9 @@ public class InstanceTest {
assertEquals(3, instance.getNumberOfAvailableSlots());
- SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
- SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
- SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
+ SimpleSlot slot1 = instance.allocateSimpleSlot();
+ SimpleSlot slot2 = instance.allocateSimpleSlot();
+ SimpleSlot slot3 = instance.allocateSimpleSlot();
instance.cancelAndReleaseAllSlots();
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index 1e2b6af..5104e48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.instance;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
@@ -34,7 +33,12 @@ import org.junit.Test;
import java.util.Collections;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for the allocation, properties, and release of shared slots.
@@ -46,7 +50,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void allocateAndReleaseEmptySlot() {
try {
- JobID jobId = new JobID();
JobVertexID vertexId = new JobVertexID();
SlotSharingGroup sharingGroup = new SlotSharingGroup(vertexId);
@@ -62,7 +65,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(2, instance.getNumberOfAvailableSlots());
// allocate a shared slot
- SharedSlot slot = instance.allocateSharedSlot(jobId, assignment);
+ SharedSlot slot = instance.allocateSharedSlot(assignment);
assertEquals(2, instance.getTotalNumberOfSlots());
assertEquals(1, instance.getNumberOfAllocatedSlots());
assertEquals(1, instance.getNumberOfAvailableSlots());
@@ -110,7 +113,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void allocateSimpleSlotsAndReleaseFromRoot() {
try {
- JobID jobId = new JobID();
JobVertexID vid1 = new JobVertexID();
JobVertexID vid2 = new JobVertexID();
JobVertexID vid3 = new JobVertexID();
@@ -122,7 +124,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
// allocate a shared slot
- SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+ SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
// allocate a series of sub slots
@@ -134,7 +136,6 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, sub1.getNumberLeaves());
assertEquals(vid1, sub1.getGroupID());
assertEquals(instance.getTaskManagerID(), sub1.getTaskManagerID());
- assertEquals(jobId, sub1.getJobID());
assertEquals(sharedSlot, sub1.getParent());
assertEquals(sharedSlot, sub1.getRoot());
assertEquals(0, sub1.getRootSlotNumber());
@@ -153,7 +154,6 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, sub2.getNumberLeaves());
assertEquals(vid2, sub2.getGroupID());
assertEquals(instance.getTaskManagerID(), sub2.getTaskManagerID());
- assertEquals(jobId, sub2.getJobID());
assertEquals(sharedSlot, sub2.getParent());
assertEquals(sharedSlot, sub2.getRoot());
assertEquals(0, sub2.getRootSlotNumber());
@@ -172,7 +172,6 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, sub3.getNumberLeaves());
assertEquals(vid3, sub3.getGroupID());
assertEquals(instance.getTaskManagerID(), sub3.getTaskManagerID());
- assertEquals(jobId, sub3.getJobID());
assertEquals(sharedSlot, sub3.getParent());
assertEquals(sharedSlot, sub3.getRoot());
assertEquals(0, sub3.getRootSlotNumber());
@@ -192,7 +191,6 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, sub4.getNumberLeaves());
assertEquals(vid4, sub4.getGroupID());
assertEquals(instance.getTaskManagerID(), sub4.getTaskManagerID());
- assertEquals(jobId, sub4.getJobID());
assertEquals(sharedSlot, sub4.getParent());
assertEquals(sharedSlot, sub4.getRoot());
assertEquals(0, sub4.getRootSlotNumber());
@@ -235,7 +233,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void allocateSimpleSlotsAndReleaseFromLeaves() {
try {
- JobID jobId = new JobID();
JobVertexID vid1 = new JobVertexID();
JobVertexID vid2 = new JobVertexID();
JobVertexID vid3 = new JobVertexID();
@@ -246,7 +243,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
// allocate a shared slot
- SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+ SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
// allocate a series of sub slots
@@ -320,7 +317,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void allocateAndReleaseInMixedOrder() {
try {
- JobID jobId = new JobID();
JobVertexID vid1 = new JobVertexID();
JobVertexID vid2 = new JobVertexID();
JobVertexID vid3 = new JobVertexID();
@@ -331,7 +327,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
// allocate a shared slot
- SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+ SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
// allocate a series of sub slots
@@ -427,7 +423,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
// allocate a shared slot
- SharedSlot sharedSlot = instance.allocateSharedSlot(new JobID(), assignment);
+ SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
// get the first simple slot
SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId);
@@ -563,7 +559,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
// allocate a shared slot
- SharedSlot sharedSlot = instance.allocateSharedSlot(new JobID(), assignment);
+ SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
// get the first simple slot
SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId);
@@ -607,7 +603,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void testImmediateReleaseOneLevel() {
try {
- JobID jobId = new JobID();
JobVertexID vid = new JobVertexID();
SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
@@ -615,7 +610,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
- SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+ SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid);
sub.releaseInstanceSlot();
@@ -635,7 +630,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void testImmediateReleaseTwoLevel() {
try {
- JobID jobId = new JobID();
JobVertexID vid = new JobVertexID();
JobVertex vertex = new JobVertex("vertex", vid);
@@ -647,7 +641,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
- SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
+ SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, constraint);
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index 42cbbbf..6d572ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.instance;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -34,7 +33,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class SimpleSlotTest extends TestLogger {
+public class SimpleSlotTest extends TestLogger {
@Test
public void testStateTransitions() {
@@ -137,6 +136,6 @@ public class SimpleSlotTest extends TestLogger {
hardwareDescription,
1);
- return instance.allocateSimpleSlot(new JobID());
+ return instance.allocateSimpleSlot();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 8875e00..5d82f47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -36,6 +36,9 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
@@ -158,7 +161,7 @@ public class SlotPoolRpcTest extends TestLogger {
assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get());
- slotPoolGateway.cancelSlotAllocation(requestId).get();
+ slotPoolGateway.cancelSlotRequest(requestId).get();
assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get());
} finally {
@@ -202,7 +205,7 @@ public class SlotPoolRpcTest extends TestLogger {
assertEquals(1L, (long) pool.getNumberOfPendingRequests().get());
- slotPoolGateway.cancelSlotAllocation(requestId).get();
+ slotPoolGateway.cancelSlotRequest(requestId).get();
assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
} finally {
RpcUtils.terminateRpcEndpoint(pool, timeout);
@@ -252,17 +255,22 @@ public class SlotPoolRpcTest extends TestLogger {
}
AllocationID allocationId = allocationIdFuture.get();
- ResourceID resourceID = ResourceID.generate();
- AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationId, jid, DEFAULT_TESTING_PROFILE);
- slotPoolGateway.registerTaskManager(resourceID).get();
+ final SlotOffer slotOffer = new SlotOffer(
+ allocationId,
+ 0,
+ DEFAULT_TESTING_PROFILE);
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
- assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+ assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
assertTrue(pool.containsAllocatedSlot(allocationId).get());
- pool.cancelSlotAllocation(requestId).get();
+ pool.cancelSlotRequest(requestId).get();
assertFalse(pool.containsAllocatedSlot(allocationId).get());
assertTrue(pool.containsAvailableSlot(allocationId).get());
@@ -351,14 +359,14 @@ public class SlotPoolRpcTest extends TestLogger {
}
@Override
- public CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID slotRequestId) {
+ public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId) {
final Consumer<SlotRequestID> currentCancelSlotAllocationConsumer = cancelSlotAllocationConsumer;
if (currentCancelSlotAllocationConsumer != null) {
currentCancelSlotAllocationConsumer.accept(slotRequestId);
}
- return super.cancelSlotAllocation(slotRequestId);
+ return super.cancelSlotRequest(slotRequestId);
}
CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationId) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 450d377..9d90a12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
@@ -74,10 +76,17 @@ public class SlotPoolTest extends TestLogger {
private JobID jobId;
+ private TaskManagerLocation taskManagerLocation;
+
+ private TaskManagerGateway taskManagerGateway;
+
@Before
public void setUp() throws Exception {
this.rpcService = new TestingRpcService();
this.jobId = new JobID();
+
+ taskManagerLocation = new LocalTaskManagerLocation();
+ taskManagerGateway = new SimpleAckingTaskManagerGateway();
}
@After
@@ -92,8 +101,7 @@ public class SlotPoolTest extends TestLogger {
try {
SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
- ResourceID resourceID = new ResourceID("resource");
- slotPoolGateway.registerTaskManager(resourceID);
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
@@ -104,14 +112,17 @@ public class SlotPoolTest extends TestLogger {
final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
- AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
- assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+ final SlotOffer slotOffer = new SlotOffer(
+ slotRequest.getAllocationId(),
+ 0,
+ DEFAULT_TESTING_PROFILE);
+
+ assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
assertTrue(future.isDone());
assertTrue(slot.isAlive());
- assertEquals(resourceID, slot.getTaskManagerLocation().getResourceID());
- assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocationId()), slot);
+ assertEquals(taskManagerLocation, slot.getTaskManagerLocation());
} finally {
slotPool.shutDown();
}
@@ -124,8 +135,7 @@ public class SlotPoolTest extends TestLogger {
try {
SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
- ResourceID resourceID = new ResourceID("resource");
- slotPool.registerTaskManager(resourceID);
+ slotPool.registerTaskManager(taskManagerLocation.getResourceID());
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
@@ -139,8 +149,12 @@ public class SlotPoolTest extends TestLogger {
final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
- AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
- assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+ final SlotOffer slotOffer = new SlotOffer(
+ slotRequests.get(0).getAllocationId(),
+ 0,
+ DEFAULT_TESTING_PROFILE);
+
+ assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
@@ -158,7 +172,7 @@ public class SlotPoolTest extends TestLogger {
assertTrue(slot2.isAlive());
assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
- assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocationId()), slot2);
+ assertEquals(slot1.getAllocationId(), slot2.getAllocationId());
} finally {
slotPool.shutDown();
}
@@ -171,8 +185,7 @@ public class SlotPoolTest extends TestLogger {
try {
SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
- ResourceID resourceID = new ResourceID("resource");
- slotPoolGateway.registerTaskManager(resourceID);
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future1.isDone());
@@ -182,8 +195,12 @@ public class SlotPoolTest extends TestLogger {
final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
- AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
- assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+ final SlotOffer slotOffer = new SlotOffer(
+ slotRequest.getAllocationId(),
+ 0,
+ DEFAULT_TESTING_PROFILE);
+
+ assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
@@ -214,8 +231,7 @@ public class SlotPoolTest extends TestLogger {
try {
SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
- ResourceID resourceID = new ResourceID("resource");
- slotPoolGateway.registerTaskManager(resourceID);
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future.isDone());
@@ -225,29 +241,36 @@ public class SlotPoolTest extends TestLogger {
final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+ final SlotOffer slotOffer = new SlotOffer(
+ slotRequest.getAllocationId(),
+ 0,
+ DEFAULT_TESTING_PROFILE);
+
+ final TaskManagerLocation invalidTaskManagerLocation = new LocalTaskManagerLocation();
+
// slot from unregistered resource
- AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
- assertFalse(slotPoolGateway.offerSlot(invalid).get());
+ assertFalse(slotPoolGateway.offerSlot(invalidTaskManagerLocation, taskManagerGateway, slotOffer).get());
- AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE);
+ final SlotOffer nonRequestedSlotOffer = new SlotOffer(
+ new AllocationID(),
+ 0,
+ DEFAULT_TESTING_PROFILE);
// we'll also accept non requested slots
- assertTrue(slotPoolGateway.offerSlot(notRequested).get());
-
- AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+ assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, nonRequestedSlotOffer).get());
// accepted slot
- assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+ assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
LogicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(slot.isAlive());
// duplicated offer with using slot
- assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+ assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
assertTrue(slot.isAlive());
// duplicated offer with free slot
slot.releaseSlot();
- assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+ assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
} finally {
slotPool.shutDown();
}
@@ -261,8 +284,8 @@ public class SlotPoolTest extends TestLogger {
final SlotPool slotPool = new SlotPool(rpcService, jobId) {
@Override
- public void returnAllocatedSlot(Slot slot) {
- super.returnAllocatedSlot(slot);
+ public void returnAllocatedSlot(SlotContext allocatedSlot) {
+ super.returnAllocatedSlot(allocatedSlot);
slotReturnFuture.complete(true);
}
@@ -270,8 +293,7 @@ public class SlotPoolTest extends TestLogger {
try {
SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
- ResourceID resourceID = new ResourceID("resource");
- slotPoolGateway.registerTaskManager(resourceID);
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
@@ -282,14 +304,18 @@ public class SlotPoolTest extends TestLogger {
CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
- AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
- assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+ final SlotOffer slotOffer = new SlotOffer(
+ slotRequest.getAllocationId(),
+ 0,
+ DEFAULT_TESTING_PROFILE);
+
+ assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
assertFalse(future2.isDone());
- slotPoolGateway.releaseTaskManager(resourceID);
+ slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID());
// wait until the slot has been returned
slotReturnFuture.get();
@@ -378,24 +404,4 @@ public class SlotPoolTest extends TestLogger {
return slotPool.getSelfGateway(SlotPoolGateway.class);
}
-
- static AllocatedSlot createAllocatedSlot(
- final ResourceID resourceId,
- final AllocationID allocationId,
- final JobID jobId,
- final ResourceProfile resourceProfile) {
- TaskManagerLocation mockTaskManagerLocation = mock(TaskManagerLocation.class);
- when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId);
-
- TaskManagerGateway mockTaskManagerGateway = mock(TaskManagerGateway.class);
-
- return new AllocatedSlot(
- allocationId,
- jobId,
- mockTaskManagerLocation,
- 0,
- resourceProfile,
- mockTaskManagerGateway);
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
index dca47d3..28cab72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.instance;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
@@ -49,14 +48,12 @@ public class SlotSharingGroupAssignmentTest extends TestLogger {
final int numberSlots = 2;
final JobVertexID sourceId = new JobVertexID();
final JobVertexID sinkId = new JobVertexID();
- final JobID jobId = new JobID();
for (int i = 0; i < numberTaskManagers; i++) {
final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), i + 1000);
for (int j = 0; j < numberSlots; j++) {
final SharedSlot slot = new SharedSlot(
- jobId,
mock(SlotOwner.class),
taskManagerLocation,
j,
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
index 3f267ac..d40ff61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
@@ -90,10 +90,10 @@ public class CoLocationConstraintTest {
Instance instance1 = SchedulerTestUtils.getRandomInstance(2);
Instance instance2 = SchedulerTestUtils.getRandomInstance(2);
- SharedSlot slot1_1 = instance1.allocateSharedSlot(jid, assignment);
- SharedSlot slot1_2 = instance1.allocateSharedSlot(jid, assignment);
- SharedSlot slot2_1 = instance2.allocateSharedSlot(jid, assignment);
- SharedSlot slot2_2 = instance2.allocateSharedSlot(jid, assignment);
+ SharedSlot slot1_1 = instance1.allocateSharedSlot(assignment);
+ SharedSlot slot1_2 = instance1.allocateSharedSlot(assignment);
+ SharedSlot slot2_1 = instance2.allocateSharedSlot(assignment);
+ SharedSlot slot2_2 = instance2.allocateSharedSlot(assignment);
// constraint is still completely unassigned
assertFalse(constraint.isAssigned());
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
new file mode 100644
index 0000000..6894542
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.instance.Slot;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * SlotOwner implementation used for testing purposes only.
+ */
+public class DummySlotOwner implements SlotOwner {
+ @Override
+ public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
+ return CompletableFuture.completedFuture(false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 9c12fff..01f445b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -58,12 +58,14 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
+
import org.hamcrest.Matchers;
import org.junit.Test;
import org.mockito.Mockito;
import java.net.InetAddress;
import java.util.Arrays;
+import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
@@ -169,7 +171,7 @@ public class TaskExecutorITCase extends TestLogger {
when(jmGateway.getHostname()).thenReturn(jmAddress);
when(jmGateway.offerSlots(
eq(taskManagerResourceId),
- any(Iterable.class),
+ any(Collection.class),
any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
when(jmGateway.getFencingToken()).thenReturn(jobMasterId);
@@ -214,7 +216,7 @@ public class TaskExecutorITCase extends TestLogger {
verify(jmGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(
eq(taskManagerResourceId),
- (Iterable<SlotOffer>)argThat(Matchers.contains(slotOffer)),
+ (Collection<SlotOffer>)argThat(Matchers.contains(slotOffer)),
any(Time.class));
} finally {
if (testingFatalErrorHandler.hasExceptionOccurred()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6372792..29d07fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -852,7 +852,7 @@ public class TaskExecutorTest extends TestLogger {
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
when(jobMasterGateway.offerSlots(
any(ResourceID.class),
- any(Iterable.class),
+ any(Collection.class),
any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
@@ -904,7 +904,7 @@ public class TaskExecutorTest extends TestLogger {
// the job leader should get the allocation id offered
verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(
any(ResourceID.class),
- (Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)),
+ (Collection<SlotOffer>)Matchers.argThat(contains(slotOffer)),
any(Time.class));
// check if a concurrent error occurred
@@ -975,7 +975,7 @@ public class TaskExecutorTest extends TestLogger {
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
when(jobMasterGateway.offerSlots(
- any(ResourceID.class), any(Iterable.class), any(Time.class)))
+ any(ResourceID.class), any(Collection.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1)));
rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
@@ -1315,7 +1315,7 @@ public class TaskExecutorTest extends TestLogger {
when(
jobMasterGateway.offerSlots(
any(ResourceID.class),
- any(Iterable.class),
+ any(Collection.class),
any(Time.class)))
.thenReturn(offerResultFuture);
@@ -1323,7 +1323,7 @@ public class TaskExecutorTest extends TestLogger {
// been properly started. This will also offer the slots to the job master
jobLeaderService.addJob(jobId, jobManagerAddress);
- verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Iterable.class), any(Time.class));
+ verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Collection.class), any(Time.class));
// submit the task without having acknowledge the offered slots
tmGateway.submitTask(tdd, jobMasterId, timeout);