You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:38:37 UTC
[flink] 03/05: [hotfix] Remove mocking from SlotManagerTest
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0c95396c05839447a75af6020896ed4733d1c5a7
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 21 17:16:32 2018 +0200
[hotfix] Remove mocking from SlotManagerTest
---
.../slotmanager/SlotManagerTest.java | 160 +++++++++------------
1 file changed, 64 insertions(+), 96 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 854d27c..33a696a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -88,9 +88,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -107,9 +105,9 @@ public class SlotManagerTest extends TestLogger {
@Test
public void testTaskManagerRegistration() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
- final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+ final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
- final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+ final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final ResourceID resourceId = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
@@ -139,14 +137,12 @@ public class SlotManagerTest extends TestLogger {
final ResourceActions resourceManagerActions = mock(ResourceActions.class);
final JobID jobId = new JobID();
- final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- any(SlotID.class),
- any(JobID.class),
- any(AllocationID.class),
- anyString(),
- eq(resourceManagerId),
- any(Time.class))).thenReturn(new CompletableFuture<>());
+ final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+ assertThat(tuple5.f4, is(equalTo(resourceManagerId)));
+ return new CompletableFuture<>();
+ })
+ .createTestingTaskExecutorGateway();
final ResourceID resourceId = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
@@ -232,8 +228,11 @@ public class SlotManagerTest extends TestLogger {
resourceProfile,
"localhost");
- ResourceActions resourceManagerActions = mock(ResourceActions.class);
- doThrow(new ResourceManagerException("Test exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class));
+ ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+ .setAllocateResourceFunction(value -> {
+ throw new ResourceManagerException("Test exception");
+ })
+ .build();
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
@@ -264,19 +263,17 @@ public class SlotManagerTest extends TestLogger {
resourceProfile,
targetAddress);
- ResourceActions resourceManagerActions = mock(ResourceActions.class);
+ ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
-
+ final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestFuture = new CompletableFuture<>();
// accept an incoming slot request
- final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- eq(slotId),
- eq(jobId),
- eq(allocationId),
- anyString(),
- eq(resourceManagerId),
- any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+ requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4));
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
@@ -289,7 +286,7 @@ public class SlotManagerTest extends TestLogger {
assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest));
- verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class));
+ assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId))));
TaskManagerSlot slot = slotManager.getSlot(slotId);
@@ -309,14 +306,9 @@ public class SlotManagerTest extends TestLogger {
final SlotID slotId = new SlotID(resourceID, 0);
final AllocationID allocationId = new AllocationID();
- final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- any(SlotID.class),
- any(JobID.class),
- any(AllocationID.class),
- anyString(),
- eq(resourceManagerId),
- any(Time.class))).thenReturn(new CompletableFuture<>());
+ final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> new CompletableFuture<>())
+ .createTestingTaskExecutorGateway();
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
@@ -369,15 +361,14 @@ public class SlotManagerTest extends TestLogger {
.setAllocateResourceConsumer(ignored -> numberAllocateResourceCalls.incrementAndGet())
.build();
+ final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestFuture = new CompletableFuture<>();
// accept an incoming slot request
- final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- eq(slotId),
- eq(jobId),
- eq(allocationId),
- anyString(),
- eq(resourceManagerId),
- any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+ requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4));
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
@@ -394,7 +385,7 @@ public class SlotManagerTest extends TestLogger {
taskExecutorConnection,
slotReport);
- verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class));
+ assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId))));
TaskManagerSlot slot = slotManager.getSlot(slotId);
@@ -510,21 +501,17 @@ public class SlotManagerTest extends TestLogger {
@Test
public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
- final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+ final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
+ final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+ .setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet())
+ .build();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
- final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- any(SlotID.class),
- any(JobID.class),
- any(AllocationID.class),
- anyString(),
- eq(resourceManagerId),
- any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final ResourceID resourceID = ResourceID.generate();
@@ -547,7 +534,7 @@ public class SlotManagerTest extends TestLogger {
// check that we have only called the resource allocation only for the first slot request,
// since the second request is a duplicate
- verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class));
+ assertThat(allocateResourceCalls.get(), is(0));
}
/**
@@ -557,21 +544,17 @@ public class SlotManagerTest extends TestLogger {
@Test
public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
- final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+ final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
+ final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+ .setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet())
+ .build();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
- final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- any(SlotID.class),
- any(JobID.class),
- any(AllocationID.class),
- anyString(),
- eq(resourceManagerId),
- any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final ResourceID resourceID = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
@@ -601,7 +584,7 @@ public class SlotManagerTest extends TestLogger {
// check that we have only called the resource allocation only for the first slot request,
// since the second request is a duplicate
- verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class));
+ assertThat(allocateResourceCalls.get(), is(0));
}
/**
@@ -637,7 +620,7 @@ public class SlotManagerTest extends TestLogger {
@Test
public void testUpdateSlotReport() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
- final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+ final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
@@ -691,13 +674,16 @@ public class SlotManagerTest extends TestLogger {
*/
@Test
public void testTaskManagerTimeout() throws Exception {
- final long tmTimeout = 500L;
+ final long tmTimeout = 10L;
- final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+ final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>();
+ final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+ .setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
+ .build();
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceID resourceID = ResourceID.generate();
- final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+ final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotID slotId = new SlotID(resourceID, 0);
@@ -715,15 +701,9 @@ public class SlotManagerTest extends TestLogger {
slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
- mainThreadExecutor.execute(new Runnable() {
- @Override
- public void run() {
- slotManager.registerTaskManager(taskManagerConnection, slotReport);
- }
- });
+ mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
- verify(resourceManagerActions, timeout(100L * tmTimeout).times(1))
- .releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class));
+ assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
}
}
@@ -976,13 +956,12 @@ public class SlotManagerTest extends TestLogger {
@Test
public void testTimeoutForUnusedTaskManager() throws Exception {
final long taskManagerTimeout = 50L;
- final long verifyTimeout = taskManagerTimeout * 10L;
- final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID))
.build();
+ final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
final ResourceID resourceId = ResourceID.generate();
@@ -992,14 +971,13 @@ public class SlotManagerTest extends TestLogger {
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
- final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- when(taskExecutorGateway.requestSlot(
- any(SlotID.class),
- eq(jobId),
- eq(allocationId),
- anyString(),
- eq(resourceManagerId),
- any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ final CompletableFuture<SlotID> requestedSlotFuture = new CompletableFuture<>();
+ final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+ requestedSlotFuture.complete(tuple5.f0);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .createTestingTaskExecutorGateway();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
@@ -1028,17 +1006,9 @@ public class SlotManagerTest extends TestLogger {
}
},
mainThreadExecutor)
- .thenAccept((Object value) -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
-
- ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
+ .thenRun(() -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
- verify(taskExecutorGateway, timeout(verifyTimeout)).requestSlot(
- slotIdArgumentCaptor.capture(),
- eq(jobId),
- eq(allocationId),
- anyString(),
- eq(resourceManagerId),
- any(Time.class));
+ final SlotID slotId = requestedSlotFuture.get();
CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync(
() -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()),
@@ -1047,8 +1017,6 @@ public class SlotManagerTest extends TestLogger {
// check that the TaskManager is not idle
assertFalse(idleFuture.get());
- final SlotID slotId = slotIdArgumentCaptor.getValue();
-
CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync(
() -> slotManager.getSlot(slotId),
mainThreadExecutor);