You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:38:38 UTC

[flink] 04/05: [hotfix] Remove mocking from SlotProtocolTest

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0ba980caed9f26095bc8134bbe581635bb98dde
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 21 17:24:49 2018 +0200

    [hotfix] Remove mocking from SlotProtocolTest
---
 .../slotmanager/SlotProtocolTest.java              | 46 +++++++++++-----------
 1 file changed, 23 insertions(+), 23 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 51e6b0b..66966cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -33,13 +33,13 @@ import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnect
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
@@ -50,12 +50,10 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
 
+/**
+ * Tests for the slot allocation protocol.
+ */
 public class SlotProtocolTest extends TestLogger {
 
 	private static final long timeout = 10000L;
@@ -92,7 +90,7 @@ public class SlotProtocolTest extends TestLogger {
 
 			final CompletableFuture<ResourceProfile> resourceProfileFuture = new CompletableFuture<>();
 			ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
-				.setAllocateResourceConsumer(resourceProfile -> resourceProfileFuture.complete(resourceProfile))
+				.setAllocateResourceConsumer(resourceProfileFuture::complete)
 				.build();
 
 			slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions);
@@ -108,11 +106,13 @@ public class SlotProtocolTest extends TestLogger {
 			assertThat(resourceProfileFuture.get(), is(equalTo(slotRequest.getResourceProfile())));
 
 			// slot becomes available
-			TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-			Mockito.when(
-				taskExecutorGateway
-					.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class)))
-				.thenReturn(mock(CompletableFuture.class));
+			final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> requestFuture = new CompletableFuture<>();
+			TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+				.setRequestSlotFunction(tuple5 -> {
+					requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2));
+					return new CompletableFuture<>();
+				})
+				.createTestingTaskExecutorGateway();
 
 			final ResourceID resourceID = ResourceID.generate();
 			final SlotID slotID = new SlotID(resourceID, 0);
@@ -125,8 +125,7 @@ public class SlotProtocolTest extends TestLogger {
 			slotManager.registerTaskManager(new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
 
 			// 4) Slot becomes available and TaskExecutor gets a SlotRequest
-			verify(taskExecutorGateway, timeout(5000L))
-				.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class));
+			assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
 		}
 	}
 
@@ -143,11 +142,13 @@ public class SlotProtocolTest extends TestLogger {
 
 		final ResourceManagerId rmLeaderID = ResourceManagerId.generate();
 
-		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		Mockito.when(
-			taskExecutorGateway
-				.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class)))
-			.thenReturn(mock(CompletableFuture.class));
+		final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> requestFuture = new CompletableFuture<>();
+		TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setRequestSlotFunction(tuple5 -> {
+				requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2));
+				return new CompletableFuture<>();
+			})
+			.createTestingTaskExecutorGateway();
 
 		try (SlotManager slotManager = new SlotManager(
 			scheduledExecutor,
@@ -155,7 +156,7 @@ public class SlotProtocolTest extends TestLogger {
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime())) {
 
-			ResourceActions resourceManagerActions = mock(ResourceActions.class);
+			ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
 
 			slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions);
 
@@ -178,8 +179,7 @@ public class SlotProtocolTest extends TestLogger {
 			slotManager.registerSlotRequest(slotRequest);
 
 			// a SlotRequest is routed to the TaskExecutor
-			verify(taskExecutorGateway, timeout(5000))
-				.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class));
+			assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
 		}
 	}
 }