You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/08/10 09:57:35 UTC

[flink] 01/04: [FLINK-10099][test] Improve YarnResourceManagerTest

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f40783f48a6ccef9c609ac8204437e00033b76c
Author: 陈梓立 <wa...@gmail.com>
AuthorDate: Mon Aug 6 16:09:43 2018 +0800

    [FLINK-10099][test] Improve YarnResourceManagerTest
    
    Introduce methods to mock a Yarn Container and ContainerStatus.
    
    Properly shutdown a started ResourceManager.
    
    This closes #6499.
---
 .../apache/flink/yarn/YarnResourceManagerTest.java | 329 ++++++++++++---------
 1 file changed, 181 insertions(+), 148 deletions(-)

diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index eb8e968..a7d4f43 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
 
@@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -115,21 +117,28 @@ public class YarnResourceManagerTest extends TestLogger {
 
 	private static final Time TIMEOUT = Time.seconds(10L);
 
-	private Configuration flinkConfig = new Configuration();
+	private Configuration flinkConfig;
 
-	private Map<String, String> env = new HashMap<>();
+	private Map<String, String> env;
+
+	private TestingFatalErrorHandler testingFatalErrorHandler;
 
 	@Rule
 	public TemporaryFolder folder = new TemporaryFolder();
 
 	@Before
 	public void setup() {
+		testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		flinkConfig = new Configuration();
 		flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
+
 		File root = folder.getRoot();
 		File home = new File(root, "home");
 		boolean created = home.mkdir();
 		assertTrue(created);
 
+		env = new HashMap<>();
 		env.put(ENV_APP_ID, "foo");
 		env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
 		env.put(ENV_CLIENT_SHIP_FILES, "");
@@ -139,15 +148,21 @@ public class YarnResourceManagerTest extends TestLogger {
 	}
 
 	@After
-	public void teardown() {
-		env.clear();
+	public void teardown() throws Exception {
+		if (testingFatalErrorHandler != null) {
+			testingFatalErrorHandler.rethrowError();
+		}
+
+		if (env != null) {
+			env.clear();
+		}
 	}
 
 	static class TestingYarnResourceManager extends YarnResourceManager {
-		public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
-		public NMClient mockNMClient;
+		AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
+		NMClient mockNMClient;
 
-		public TestingYarnResourceManager(
+		TestingYarnResourceManager(
 				RpcService rpcService,
 				String resourceManagerEndpointId,
 				ResourceID resourceId,
@@ -181,11 +196,11 @@ public class YarnResourceManagerTest extends TestLogger {
 			this.mockResourceManagerClient = mockResourceManagerClient;
 		}
 
-		public <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
+		<T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
 			return callAsync(callable, TIMEOUT);
 		}
 
-		public MainThreadExecutor getMainThreadExecutorForTesting() {
+		MainThreadExecutor getMainThreadExecutorForTesting() {
 			return super.getMainThreadExecutor();
 		}
 
@@ -193,7 +208,7 @@ public class YarnResourceManagerTest extends TestLogger {
 		protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
 				YarnConfiguration yarnConfiguration,
 				int yarnHeartbeatIntervalMillis,
-				@Nullable String webInteraceUrl) {
+				@Nullable String webInterfaceUrl) {
 			return mockResourceManagerClient;
 		}
 
@@ -213,7 +228,6 @@ public class YarnResourceManagerTest extends TestLogger {
 
 		// services
 		final TestingRpcService rpcService;
-		final TestingFatalErrorHandler fatalErrorHandler;
 		final MockResourceManagerRuntimeServices rmServices;
 
 		// RM
@@ -240,7 +254,6 @@ public class YarnResourceManagerTest extends TestLogger {
 		 */
 		Context() throws Exception {
 			rpcService = new TestingRpcService();
-			fatalErrorHandler = new TestingFatalErrorHandler();
 			rmServices = new MockResourceManagerRuntimeServices();
 
 			// resource manager
@@ -258,7 +271,7 @@ public class YarnResourceManagerTest extends TestLogger {
 							rmServices.metricRegistry,
 							rmServices.jobLeaderIdService,
 							new ClusterInformation("localhost", 1234),
-							fatalErrorHandler,
+							testingFatalErrorHandler,
 							null,
 							mockResourceManagerClient,
 							mockNMClient);
@@ -269,15 +282,15 @@ public class YarnResourceManagerTest extends TestLogger {
 		 */
 		class MockResourceManagerRuntimeServices {
 
-			public final ScheduledExecutor scheduledExecutor;
-			public final TestingHighAvailabilityServices highAvailabilityServices;
-			public final HeartbeatServices heartbeatServices;
-			public final MetricRegistry metricRegistry;
-			public final TestingLeaderElectionService rmLeaderElectionService;
-			public final JobLeaderIdService jobLeaderIdService;
-			public final SlotManager slotManager;
+			private final ScheduledExecutor scheduledExecutor;
+			private final TestingHighAvailabilityServices highAvailabilityServices;
+			private final HeartbeatServices heartbeatServices;
+			private final MetricRegistry metricRegistry;
+			private final TestingLeaderElectionService rmLeaderElectionService;
+			private final JobLeaderIdService jobLeaderIdService;
+			private final SlotManager slotManager;
 
-			public UUID rmLeaderSessionId;
+			private UUID rmLeaderSessionId;
 
 			MockResourceManagerRuntimeServices() throws Exception {
 				scheduledExecutor = mock(ScheduledExecutor.class);
@@ -295,7 +308,7 @@ public class YarnResourceManagerTest extends TestLogger {
 						Time.minutes(5L));
 			}
 
-			public void grantLeadership() throws Exception {
+			void grantLeadership() throws Exception {
 				rmLeaderSessionId = UUID.randomUUID();
 				rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 			}
@@ -304,7 +317,7 @@ public class YarnResourceManagerTest extends TestLogger {
 		/**
 		 * Start the resource manager and grant leadership to it.
 		 */
-		public void startResourceManager() throws Exception {
+		void startResourceManager() throws Exception {
 			resourceManager.start();
 			rmServices.grantLeadership();
 		}
@@ -312,93 +325,129 @@ public class YarnResourceManagerTest extends TestLogger {
 		/**
 		 * Stop the Akka actor system.
 		 */
-		public void stopResourceManager() throws Exception {
+		void stopResourceManager() throws Exception {
 			rpcService.stopService().get();
 		}
-	}
 
-	@Test
-	public void testStopWorker() throws Exception {
-		new Context() {{
+		/**
+		 * A wrapper function for running test. Deal with setup and teardown logic
+		 * in Context.
+		 * @param testMethod the real test body.
+		 */
+		void runTest(RunnableWithException testMethod) throws Exception {
 			startResourceManager();
-			// Request slot from SlotManager.
-			CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
-				rmServices.slotManager.registerSlotRequest(
-					new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
-				return null;
-			});
+			try {
+				testMethod.run();
+			} finally {
+				stopResourceManager();
+			}
+		}
+	}
 
-			// wait for the registerSlotRequest completion
-			registerSlotRequestFuture.get();
-
-			// Callback from YARN when container is allocated.
-			Container testingContainer = mock(Container.class);
-			when(testingContainer.getId()).thenReturn(
-				ContainerId.newInstance(
-					ApplicationAttemptId.newInstance(
-						ApplicationId.newInstance(System.currentTimeMillis(), 1),
-						1),
-					1));
-			when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
-			when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
-			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
-			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
-			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
-
-			// Remote task executor registers with YarnResourceManager.
-			TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
-			rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
-
-			final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-
-			final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString());
-			final SlotReport slotReport = new SlotReport(
-				new SlotStatus(
-					new SlotID(taskManagerResourceId, 1),
-					new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap())));
-
-			CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
-				.registerTaskExecutor(
-					taskHost,
-					taskManagerResourceId,
-					dataPort,
-					hardwareDescription,
-					Time.seconds(10L))
-				.thenCompose(
-					(RegistrationResponse response) -> {
-						assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class));
-						final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response;
-						return rmGateway.sendSlotReport(
-							taskManagerResourceId,
-							success.getRegistrationId(),
-							slotReport,
-							Time.seconds(10L));
-					})
-				.handleAsync(
-					(Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
-					resourceManager.getMainThreadExecutorForTesting());
-
-			final int numberRegisteredSlots = numberRegisteredSlotsFuture.get();
-
-			assertEquals(1, numberRegisteredSlots);
-
-			// Unregister all task executors and release all containers.
-			CompletableFuture<?> unregisterAndReleaseFuture =  resourceManager.runInMainThread(() -> {
-				rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
-				return null;
-			});
+	private static Container mockContainer(String host, int port, int containerId) {
+		Container mockContainer = mock(Container.class);
+
+		NodeId mockNodeId = NodeId.newInstance(host, port);
+		ContainerId mockContainerId = ContainerId.newInstance(
+			ApplicationAttemptId.newInstance(
+				ApplicationId.newInstance(System.currentTimeMillis(), 1),
+				1
+			),
+			containerId
+		);
+
+		when(mockContainer.getId()).thenReturn(mockContainerId);
+		when(mockContainer.getNodeId()).thenReturn(mockNodeId);
+		when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+		when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+
+		return mockContainer;
+	}
+
+	private static ContainerStatus mockContainerStatus(ContainerId containerId) {
+		ContainerStatus mockContainerStatus = mock(ContainerStatus.class);
 
-			unregisterAndReleaseFuture.get();
+		when(mockContainerStatus.getContainerId()).thenReturn(containerId);
+		when(mockContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
+		when(mockContainerStatus.getDiagnostics()).thenReturn("Test exit");
+		when(mockContainerStatus.getExitStatus()).thenReturn(-1);
 
-			verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
-			verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+		return mockContainerStatus;
+	}
 
-			stopResourceManager();
+	@Test
+	public void testStopWorker() throws Exception {
+		new Context() {{
+			runTest(() -> {
+				// Request slot from SlotManager.
+				CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
+					rmServices.slotManager.registerSlotRequest(
+						new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+					return null;
+				});
+
+				// wait for the registerSlotRequest completion
+				registerSlotRequestFuture.get();
+
+				// Callback from YARN when container is allocated.
+				Container testingContainer = mockContainer("container", 1234, 1);
+
+				resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+				verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+				verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
+
+				// Remote task executor registers with YarnResourceManager.
+				TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
+				rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
+
+				final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+				final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString());
+				final SlotReport slotReport = new SlotReport(
+					new SlotStatus(
+						new SlotID(taskManagerResourceId, 1),
+						new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap())));
+
+				CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
+					.registerTaskExecutor(
+						taskHost,
+						taskManagerResourceId,
+						dataPort,
+						hardwareDescription,
+						Time.seconds(10L))
+					.thenCompose(
+						(RegistrationResponse response) -> {
+							assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class));
+							final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response;
+							return rmGateway.sendSlotReport(
+								taskManagerResourceId,
+								success.getRegistrationId(),
+								slotReport,
+								Time.seconds(10L));
+						})
+					.handleAsync(
+						(Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
+						resourceManager.getMainThreadExecutorForTesting());
+
+				final int numberRegisteredSlots = numberRegisteredSlotsFuture.get();
+
+				assertEquals(1, numberRegisteredSlots);
+
+				// Unregister all task executors and release all containers.
+				CompletableFuture<?> unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> {
+					rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
+					return null;
+				});
+
+				unregisterAndReleaseFuture.get();
+
+				verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
+				verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+			});
 
 			// It's now safe to access the SlotManager state since the ResourceManager has been stopped.
-			assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 0);
-			assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0);
+			assertThat(rmServices.slotManager.getNumberRegisteredSlots(), Matchers.equalTo(0));
+			assertThat(resourceManager.getNumberOfRegisteredTaskManagers().get(), Matchers.equalTo(0));
 		}};
 	}
 
@@ -411,65 +460,49 @@ public class YarnResourceManagerTest extends TestLogger {
 			final File applicationDir = folder.newFolder(".flink");
 			env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath());
 
-			startResourceManager();
-
-			resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
-			assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
+			runTest(() -> {
+				resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+				assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
+			});
 		}};
 	}
 
 	/**
 	 * Tests that YarnResourceManager will not request more containers than needs during
 	 * callback from Yarn when container is Completed.
-	 * @throws Exception
 	 */
 	@Test
 	public void testOnContainerCompleted() throws Exception {
 		new Context() {{
-			startResourceManager();
-			CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
-				rmServices.slotManager.registerSlotRequest(
-					new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
-				return null;
+			runTest(() -> {
+				CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
+					rmServices.slotManager.registerSlotRequest(
+						new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+					return null;
+				});
+
+				// wait for the registerSlotRequest completion
+				registerSlotRequestFuture.get();
+
+				// Callback from YARN when container is allocated.
+				Container testingContainer = mockContainer("container", 1234, 1);
+
+				resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+				verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+				verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
+
+				// Callback from YARN when container is Completed, pending request can not be fulfilled by pending
+				// containers, need to request new container.
+				ContainerStatus testingContainerStatus = mockContainerStatus(testingContainer.getId());
+
+				resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+				verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+
+				// Callback from YARN when container is Completed happened before global fail, pending request
+				// slot is already fulfilled by pending containers, no need to request new container.
+				resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+				verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
 			});
-
-			// wait for the registerSlotRequest completion
-			registerSlotRequestFuture.get();
-
-			ContainerId testContainerId = ContainerId.newInstance(
-				ApplicationAttemptId.newInstance(
-					ApplicationId.newInstance(System.currentTimeMillis(), 1),
-					1),
-				1);
-
-			// Callback from YARN when container is allocated.
-			Container testingContainer = mock(Container.class);
-			when(testingContainer.getId()).thenReturn(testContainerId);
-			when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
-			when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
-			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
-			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
-			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
-
-			// Callback from YARN when container is Completed, pending request can not be fulfilled by pending
-			// containers, need to request new container.
-			ContainerStatus testingContainerStatus = mock(ContainerStatus.class);
-			when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
-			when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
-			when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
-			when(testingContainerStatus.getExitStatus()).thenReturn(-1);
-			resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
-			verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-
-			// Callback from YARN when container is Completed happened before global fail, pending request
-			// slot is already fulfilled by pending containers, no need to request new container.
-			when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
-			when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
-			when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
-			when(testingContainerStatus.getExitStatus()).thenReturn(-1);
-			resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
-			verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
 		}};
 	}
 }