You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:23:10 UTC
[51/52] [abbrv] flink git commit: [tests] Harden TaskExecutorTest
[tests] Harden TaskExecutorTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/235a1696
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/235a1696
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/235a1696
Branch: refs/heads/master
Commit: 235a169691bd3c3ff2a25b7a7763c6900a0f2c6c
Parents: 368d0da
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 23 19:21:43 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:28 2016 +0100
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskExecutorTest.java | 77 +++++++++++++++-----
1 file changed, 57 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/235a1696/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 2af97b5..aacd329 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -75,14 +75,11 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
-
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
-
import org.junit.rules.TestName;
import org.mockito.Matchers;
-import org.powermock.api.mockito.PowerMockito;
import java.net.InetAddress;
import java.net.URL;
@@ -90,9 +87,18 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TaskExecutorTest extends TestLogger {
@@ -107,10 +113,16 @@ public class TaskExecutorTest extends TestLogger {
final TestingSerialRpcService rpc = new TestingSerialRpcService();
try {
+ final FatalErrorHandler errorHandler = mock(FatalErrorHandler.class);
+
// register a mock resource manager gateway
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
+ when(rmGateway.registerTaskExecutor(
+ any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+ .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success()));
+
TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
- PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+ when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
rpc.registerGateway(resourceManagerAddress, rmGateway);
@@ -123,6 +135,8 @@ public class TaskExecutorTest extends TestLogger {
final SlotReport slotReport = new SlotReport();
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
TaskExecutor taskManager = new TaskExecutor(
taskManagerServicesConfiguration,
taskManagerLocation,
@@ -138,13 +152,16 @@ public class TaskExecutorTest extends TestLogger {
taskSlotTable,
mock(JobManagerTable.class),
mock(JobLeaderService.class),
- mock(FatalErrorHandler.class));
+ testingFatalErrorHandler);
taskManager.start();
String taskManagerAddress = taskManager.getAddress();
verify(rmGateway).registerTaskExecutor(
any(UUID.class), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
+
+ // check if a concurrent error occurred
+ testingFatalErrorHandler.rethrowError();
}
finally {
rpc.stopService();
@@ -165,6 +182,14 @@ public class TaskExecutorTest extends TestLogger {
// register the mock resource manager gateways
ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
+
+ when(rmGateway1.registerTaskExecutor(
+ any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+ .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success()));
+ when(rmGateway2.registerTaskExecutor(
+ any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+ .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success()));
+
rpc.registerGateway(address1, rmGateway1);
rpc.registerGateway(address2, rmGateway2);
@@ -174,9 +199,9 @@ public class TaskExecutorTest extends TestLogger {
haServices.setResourceManagerLeaderRetriever(testLeaderService);
TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
- PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
- PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
- PowerMockito.when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]);
+ when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+ when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
+ when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]);
TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
@@ -186,6 +211,8 @@ public class TaskExecutorTest extends TestLogger {
final SlotReport slotReport = new SlotReport();
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
TaskExecutor taskManager = new TaskExecutor(
taskManagerServicesConfiguration,
taskManagerLocation,
@@ -201,7 +228,7 @@ public class TaskExecutorTest extends TestLogger {
taskSlotTable,
mock(JobManagerTable.class),
mock(JobLeaderService.class),
- mock(FatalErrorHandler.class));
+ testingFatalErrorHandler);
taskManager.start();
String taskManagerAddress = taskManager.getAddress();
@@ -225,6 +252,9 @@ public class TaskExecutorTest extends TestLogger {
verify(rmGateway2).registerTaskExecutor(
eq(leaderId2), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
+
+ // check if a concurrent error occurred
+ testingFatalErrorHandler.rethrowError();
}
finally {
rpc.stopService();
@@ -310,6 +340,7 @@ public class TaskExecutorTest extends TestLogger {
when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class));
try {
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
TaskExecutor taskManager = new TaskExecutor(
taskManagerConfiguration,
@@ -326,7 +357,7 @@ public class TaskExecutorTest extends TestLogger {
taskSlotTable,
jobManagerTable,
mock(JobLeaderService.class),
- mock(FatalErrorHandler.class));
+ testingFatalErrorHandler);
taskManager.start();
@@ -336,6 +367,8 @@ public class TaskExecutorTest extends TestLogger {
completionFuture.get();
+ // check if a concurrent error occurred
+ testingFatalErrorHandler.rethrowError();
} finally {
rpc.stopService();
}
@@ -452,10 +485,10 @@ public class TaskExecutorTest extends TestLogger {
(Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)),
eq(jobManagerLeaderId),
any(Time.class));
- } finally {
+
// check if a concurrent error occurred
testingFatalErrorHandler.rethrowError();
-
+ } finally {
rpc.stopService();
}
}
@@ -559,10 +592,10 @@ public class TaskExecutorTest extends TestLogger {
assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
assertTrue(taskSlotTable.isSlotFree(1));
- } finally {
+
// check if a concurrent error occurred
testingFatalErrorHandler.rethrowError();
-
+ } finally {
rpc.stopService();
}
}
@@ -595,11 +628,13 @@ public class TaskExecutorTest extends TestLogger {
haServices.setResourceManagerLeaderRetriever(testLeaderService);
TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
- PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+ when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
TaskExecutor taskManager = new TaskExecutor(
taskManagerServicesConfiguration,
taskManagerLocation,
@@ -615,7 +650,7 @@ public class TaskExecutorTest extends TestLogger {
mock(TaskSlotTable.class),
mock(JobManagerTable.class),
mock(JobLeaderService.class),
- mock(FatalErrorHandler.class));
+ testingFatalErrorHandler);
taskManager.start();
String taskManagerAddress = taskManager.getAddress();
@@ -653,6 +688,8 @@ public class TaskExecutorTest extends TestLogger {
taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered);
+ // check if a concurrent error occurred
+ testingFatalErrorHandler.rethrowError();
}
finally {
rpc.stopService();