You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:23:10 UTC

[51/52] [abbrv] flink git commit: [tests] Harden TaskExecutorTest

[tests] Harden TaskExecutorTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/235a1696
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/235a1696
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/235a1696

Branch: refs/heads/master
Commit: 235a169691bd3c3ff2a25b7a7763c6900a0f2c6c
Parents: 368d0da
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 23 19:21:43 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:28 2016 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutorTest.java  | 77 +++++++++++++++-----
 1 file changed, 57 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/235a1696/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 2af97b5..aacd329 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -75,14 +75,11 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
-
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
-
 import org.junit.rules.TestName;
 import org.mockito.Matchers;
-import org.powermock.api.mockito.PowerMockito;
 
 import java.net.InetAddress;
 import java.net.URL;
@@ -90,9 +87,18 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.UUID;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
 import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TaskExecutorTest extends TestLogger {
 
@@ -107,10 +113,16 @@ public class TaskExecutorTest extends TestLogger {
 
 		final TestingSerialRpcService rpc = new TestingSerialRpcService();
 		try {
+			final FatalErrorHandler errorHandler = mock(FatalErrorHandler.class);
+
 			// register a mock resource manager gateway
 			ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
+			when(rmGateway.registerTaskExecutor(
+					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+				.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success()));
+
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
-			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+			when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
 
 			rpc.registerGateway(resourceManagerAddress, rmGateway);
 
@@ -123,6 +135,8 @@ public class TaskExecutorTest extends TestLogger {
 			final SlotReport slotReport = new SlotReport();
 			when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
+			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
 			TaskExecutor taskManager = new TaskExecutor(
 				taskManagerServicesConfiguration,
 				taskManagerLocation,
@@ -138,13 +152,16 @@ public class TaskExecutorTest extends TestLogger {
 				taskSlotTable,
 				mock(JobManagerTable.class),
 				mock(JobLeaderService.class),
-				mock(FatalErrorHandler.class));
+				testingFatalErrorHandler);
 
 			taskManager.start();
 			String taskManagerAddress = taskManager.getAddress();
 
 			verify(rmGateway).registerTaskExecutor(
 					any(UUID.class), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
+
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
 		}
 		finally {
 			rpc.stopService();
@@ -165,6 +182,14 @@ public class TaskExecutorTest extends TestLogger {
 			// register the mock resource manager gateways
 			ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
 			ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
+
+			when(rmGateway1.registerTaskExecutor(
+					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+					.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success()));
+			when(rmGateway2.registerTaskExecutor(
+					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+					.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new RegistrationResponse.Success()));
+
 			rpc.registerGateway(address1, rmGateway1);
 			rpc.registerGateway(address2, rmGateway2);
 
@@ -174,9 +199,9 @@ public class TaskExecutorTest extends TestLogger {
 			haServices.setResourceManagerLeaderRetriever(testLeaderService);
 
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
-			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
-			PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
-			PowerMockito.when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]);
+			when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+			when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
+			when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]);
 
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
 			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
@@ -186,6 +211,8 @@ public class TaskExecutorTest extends TestLogger {
 			final SlotReport slotReport = new SlotReport();
 			when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
+			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
 			TaskExecutor taskManager = new TaskExecutor(
 				taskManagerServicesConfiguration,
 				taskManagerLocation,
@@ -201,7 +228,7 @@ public class TaskExecutorTest extends TestLogger {
 				taskSlotTable,
 				mock(JobManagerTable.class),
 				mock(JobLeaderService.class),
-				mock(FatalErrorHandler.class));
+				testingFatalErrorHandler);
 
 			taskManager.start();
 			String taskManagerAddress = taskManager.getAddress();
@@ -225,6 +252,9 @@ public class TaskExecutorTest extends TestLogger {
 			verify(rmGateway2).registerTaskExecutor(
 					eq(leaderId2), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
+
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
 		}
 		finally {
 			rpc.stopService();
@@ -310,6 +340,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class));
 
 		try {
+			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 			TaskExecutor taskManager = new TaskExecutor(
 				taskManagerConfiguration,
@@ -326,7 +357,7 @@ public class TaskExecutorTest extends TestLogger {
 				taskSlotTable,
 				jobManagerTable,
 				mock(JobLeaderService.class),
-				mock(FatalErrorHandler.class));
+				testingFatalErrorHandler);
 
 			taskManager.start();
 
@@ -336,6 +367,8 @@ public class TaskExecutorTest extends TestLogger {
 
 			completionFuture.get();
 
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
 		} finally {
 			rpc.stopService();
 		}
@@ -452,10 +485,10 @@ public class TaskExecutorTest extends TestLogger {
 					(Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)),
 					eq(jobManagerLeaderId),
 					any(Time.class));
-		} finally {
+
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
-
+		} finally {
 			rpc.stopService();
 		}
 	}
@@ -559,10 +592,10 @@ public class TaskExecutorTest extends TestLogger {
 			assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
 			assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
 			assertTrue(taskSlotTable.isSlotFree(1));
-		} finally {
+
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
-
+		} finally {
 			rpc.stopService();
 		}
 	}
@@ -595,11 +628,13 @@ public class TaskExecutorTest extends TestLogger {
 			haServices.setResourceManagerLeaderRetriever(testLeaderService);
 
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
-			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+			when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
 
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
 			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
 
+			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
 			TaskExecutor taskManager = new TaskExecutor(
 				taskManagerServicesConfiguration,
 				taskManagerLocation,
@@ -615,7 +650,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(TaskSlotTable.class),
 				mock(JobManagerTable.class),
 				mock(JobLeaderService.class),
-				mock(FatalErrorHandler.class));
+				testingFatalErrorHandler);
 
 			taskManager.start();
 			String taskManagerAddress = taskManager.getAddress();
@@ -653,6 +688,8 @@ public class TaskExecutorTest extends TestLogger {
 				taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
 			assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered);
 
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
 		}
 		finally {
 			rpc.stopService();