You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:38:37 UTC

[flink] 03/05: [hotfix] Remove mocking from SlotManagerTest

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c95396c05839447a75af6020896ed4733d1c5a7
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 21 17:16:32 2018 +0200

    [hotfix] Remove mocking from SlotManagerTest
---
 .../slotmanager/SlotManagerTest.java               | 160 +++++++++------------
 1 file changed, 64 insertions(+), 96 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 854d27c..33a696a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -88,9 +88,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -107,9 +105,9 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testTaskManagerRegistration() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 		final ResourceID resourceId = ResourceID.generate();
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
@@ -139,14 +137,12 @@ public class SlotManagerTest extends TestLogger {
 		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final JobID jobId = new JobID();
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			any(SlotID.class),
-			any(JobID.class),
-			any(AllocationID.class),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(new CompletableFuture<>());
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setRequestSlotFunction(tuple5 -> {
+				assertThat(tuple5.f4, is(equalTo(resourceManagerId)));
+				return new CompletableFuture<>();
+			})
+			.createTestingTaskExecutorGateway();
 
 		final ResourceID resourceId = ResourceID.generate();
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
@@ -232,8 +228,11 @@ public class SlotManagerTest extends TestLogger {
 			resourceProfile,
 			"localhost");
 
-		ResourceActions resourceManagerActions = mock(ResourceActions.class);
-		doThrow(new ResourceManagerException("Test exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class));
+		ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceFunction(value -> {
+				throw new ResourceManagerException("Test exception");
+			})
+			.build();
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 
@@ -264,19 +263,17 @@ public class SlotManagerTest extends TestLogger {
 			resourceProfile,
 			targetAddress);
 
-		ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
-
+			final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestFuture = new CompletableFuture<>();
 			// accept an incoming slot request
-			final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-			when(taskExecutorGateway.requestSlot(
-				eq(slotId),
-				eq(jobId),
-				eq(allocationId),
-				anyString(),
-				eq(resourceManagerId),
-				any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+			final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+				.setRequestSlotFunction(tuple5 -> {
+					requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4));
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				})
+				.createTestingTaskExecutorGateway();
 
 			final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
@@ -289,7 +286,7 @@ public class SlotManagerTest extends TestLogger {
 
 			assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest));
 
-			verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class));
+			assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId))));
 
 			TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -309,14 +306,9 @@ public class SlotManagerTest extends TestLogger {
 		final SlotID slotId = new SlotID(resourceID, 0);
 		final AllocationID allocationId = new AllocationID();
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			any(SlotID.class),
-			any(JobID.class),
-			any(AllocationID.class),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(new CompletableFuture<>());
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> new CompletableFuture<>())
+			.createTestingTaskExecutorGateway();
 
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
@@ -369,15 +361,14 @@ public class SlotManagerTest extends TestLogger {
 			.setAllocateResourceConsumer(ignored -> numberAllocateResourceCalls.incrementAndGet())
 			.build();
 
+		final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestFuture = new CompletableFuture<>();
 		// accept an incoming slot request
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			eq(slotId),
-			eq(jobId),
-			eq(allocationId),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setRequestSlotFunction(tuple5 -> {
+				requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4));
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			})
+			.createTestingTaskExecutorGateway();
 
 		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
@@ -394,7 +385,7 @@ public class SlotManagerTest extends TestLogger {
 				taskExecutorConnection,
 				slotReport);
 
-			verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class));
+			assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId))));
 
 			TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -510,21 +501,17 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet())
+			.build();
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
 		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
 		final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
 		final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			any(SlotID.class),
-			any(JobID.class),
-			any(AllocationID.class),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 
 		final ResourceID resourceID = ResourceID.generate();
 
@@ -547,7 +534,7 @@ public class SlotManagerTest extends TestLogger {
 
 		// check that we have only called the resource allocation only for the first slot request,
 		// since the second request is a duplicate
-		verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class));
+		assertThat(allocateResourceCalls.get(), is(0));
 	}
 
 	/**
@@ -557,21 +544,17 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet())
+			.build();
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
 		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
 		final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
 		final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			any(SlotID.class),
-			any(JobID.class),
-			any(AllocationID.class),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 
 		final ResourceID resourceID = ResourceID.generate();
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
@@ -601,7 +584,7 @@ public class SlotManagerTest extends TestLogger {
 
 		// check that we have only called the resource allocation only for the first slot request,
 		// since the second request is a duplicate
-		verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class));
+		assertThat(allocateResourceCalls.get(), is(0));
 	}
 
 	/**
@@ -637,7 +620,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testUpdateSlotReport() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
 
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
@@ -691,13 +674,16 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testTaskManagerTimeout() throws Exception {
-		final long tmTimeout = 500L;
+		final long tmTimeout = 10L;
 
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>();
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
+			.build();
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceID resourceID = ResourceID.generate();
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 		final SlotID slotId = new SlotID(resourceID, 0);
@@ -715,15 +701,9 @@ public class SlotManagerTest extends TestLogger {
 
 			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
 
-			mainThreadExecutor.execute(new Runnable() {
-				@Override
-				public void run() {
-					slotManager.registerTaskManager(taskManagerConnection, slotReport);
-				}
-			});
+			mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
 
-			verify(resourceManagerActions, timeout(100L * tmTimeout).times(1))
-				.releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class));
+			assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
 		}
 	}
 
@@ -976,13 +956,12 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testTimeoutForUnusedTaskManager() throws Exception {
 		final long taskManagerTimeout = 50L;
-		final long verifyTimeout = taskManagerTimeout * 10L;
 
-		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>();
 		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
 			.setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID))
 			.build();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
 
 		final ResourceID resourceId = ResourceID.generate();
@@ -992,14 +971,13 @@ public class SlotManagerTest extends TestLogger {
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			any(SlotID.class),
-			eq(jobId),
-			eq(allocationId),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		final CompletableFuture<SlotID> requestedSlotFuture = new CompletableFuture<>();
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setRequestSlotFunction(tuple5 -> {
+				requestedSlotFuture.complete(tuple5.f0);
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			})
+			.createTestingTaskExecutorGateway();
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
@@ -1028,17 +1006,9 @@ public class SlotManagerTest extends TestLogger {
 					}
 				},
 				mainThreadExecutor)
-			.thenAccept((Object value) -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
-
-			ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
+			.thenRun(() -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
 
-			verify(taskExecutorGateway, timeout(verifyTimeout)).requestSlot(
-				slotIdArgumentCaptor.capture(),
-				eq(jobId),
-				eq(allocationId),
-				anyString(),
-				eq(resourceManagerId),
-				any(Time.class));
+			final SlotID slotId = requestedSlotFuture.get();
 
 			CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync(
 				() -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()),
@@ -1047,8 +1017,6 @@ public class SlotManagerTest extends TestLogger {
 			// check that the TaskManager is not idle
 			assertFalse(idleFuture.get());
 
-			final SlotID slotId = slotIdArgumentCaptor.getValue();
-
 			CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync(
 				() -> slotManager.getSlot(slotId),
 				mainThreadExecutor);