You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:38:38 UTC
[flink] 04/05: [hotfix] Remove mocking from SlotProtocolTest
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b0ba980caed9f26095bc8134bbe581635bb98dde
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 21 17:24:49 2018 +0200
[hotfix] Remove mocking from SlotProtocolTest
---
.../slotmanager/SlotProtocolTest.java | 46 +++++++++++-----------
1 file changed, 23 insertions(+), 23 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 51e6b0b..66966cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -33,13 +33,13 @@ import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnect
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
-import org.mockito.Mockito;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
@@ -50,12 +50,10 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
+/**
+ * Tests for the slot allocation protocol.
+ */
public class SlotProtocolTest extends TestLogger {
private static final long timeout = 10000L;
@@ -92,7 +90,7 @@ public class SlotProtocolTest extends TestLogger {
final CompletableFuture<ResourceProfile> resourceProfileFuture = new CompletableFuture<>();
ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
- .setAllocateResourceConsumer(resourceProfile -> resourceProfileFuture.complete(resourceProfile))
+ .setAllocateResourceConsumer(resourceProfileFuture::complete)
.build();
slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions);
@@ -108,11 +106,13 @@ public class SlotProtocolTest extends TestLogger {
assertThat(resourceProfileFuture.get(), is(equalTo(slotRequest.getResourceProfile())));
// slot becomes available
- TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- Mockito.when(
- taskExecutorGateway
- .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class)))
- .thenReturn(mock(CompletableFuture.class));
+ final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> requestFuture = new CompletableFuture<>();
+ TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+ requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2));
+ return new CompletableFuture<>();
+ })
+ .createTestingTaskExecutorGateway();
final ResourceID resourceID = ResourceID.generate();
final SlotID slotID = new SlotID(resourceID, 0);
@@ -125,8 +125,7 @@ public class SlotProtocolTest extends TestLogger {
slotManager.registerTaskManager(new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
// 4) Slot becomes available and TaskExecutor gets a SlotRequest
- verify(taskExecutorGateway, timeout(5000L))
- .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class));
+ assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
}
}
@@ -143,11 +142,13 @@ public class SlotProtocolTest extends TestLogger {
final ResourceManagerId rmLeaderID = ResourceManagerId.generate();
- TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- Mockito.when(
- taskExecutorGateway
- .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class)))
- .thenReturn(mock(CompletableFuture.class));
+ final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> requestFuture = new CompletableFuture<>();
+ TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(tuple5 -> {
+ requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2));
+ return new CompletableFuture<>();
+ })
+ .createTestingTaskExecutorGateway();
try (SlotManager slotManager = new SlotManager(
scheduledExecutor,
@@ -155,7 +156,7 @@ public class SlotProtocolTest extends TestLogger {
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime())) {
- ResourceActions resourceManagerActions = mock(ResourceActions.class);
+ ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions);
@@ -178,8 +179,7 @@ public class SlotProtocolTest extends TestLogger {
slotManager.registerSlotRequest(slotRequest);
// a SlotRequest is routed to the TaskExecutor
- verify(taskExecutorGateway, timeout(5000))
- .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class));
+ assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
}
}
}