You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/08/10 09:57:35 UTC
[flink] 01/04: [FLINK-10099][test] Improve YarnResourceManagerTest
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3f40783f48a6ccef9c609ac8204437e00033b76c
Author: 陈梓立 <wa...@gmail.com>
AuthorDate: Mon Aug 6 16:09:43 2018 +0800
[FLINK-10099][test] Improve YarnResourceManagerTest
Introduce methods to mock a Yarn Container and ContainerStatus.
Properly shutdown a started ResourceManager.
This closes #6499.
---
.../apache/flink/yarn/YarnResourceManagerTest.java | 329 ++++++++++++---------
1 file changed, 181 insertions(+), 148 deletions(-)
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index eb8e968..a7d4f43 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
@@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -115,21 +117,28 @@ public class YarnResourceManagerTest extends TestLogger {
private static final Time TIMEOUT = Time.seconds(10L);
- private Configuration flinkConfig = new Configuration();
+ private Configuration flinkConfig;
- private Map<String, String> env = new HashMap<>();
+ private Map<String, String> env;
+
+ private TestingFatalErrorHandler testingFatalErrorHandler;
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@Before
public void setup() {
+ testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+ flinkConfig = new Configuration();
flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100);
+
File root = folder.getRoot();
File home = new File(root, "home");
boolean created = home.mkdir();
assertTrue(created);
+ env = new HashMap<>();
env.put(ENV_APP_ID, "foo");
env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
env.put(ENV_CLIENT_SHIP_FILES, "");
@@ -139,15 +148,21 @@ public class YarnResourceManagerTest extends TestLogger {
}
@After
- public void teardown() {
- env.clear();
+ public void teardown() throws Exception {
+ if (testingFatalErrorHandler != null) {
+ testingFatalErrorHandler.rethrowError();
+ }
+
+ if (env != null) {
+ env.clear();
+ }
}
static class TestingYarnResourceManager extends YarnResourceManager {
- public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
- public NMClient mockNMClient;
+ AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
+ NMClient mockNMClient;
- public TestingYarnResourceManager(
+ TestingYarnResourceManager(
RpcService rpcService,
String resourceManagerEndpointId,
ResourceID resourceId,
@@ -181,11 +196,11 @@ public class YarnResourceManagerTest extends TestLogger {
this.mockResourceManagerClient = mockResourceManagerClient;
}
- public <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
+ <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
return callAsync(callable, TIMEOUT);
}
- public MainThreadExecutor getMainThreadExecutorForTesting() {
+ MainThreadExecutor getMainThreadExecutorForTesting() {
return super.getMainThreadExecutor();
}
@@ -193,7 +208,7 @@ public class YarnResourceManagerTest extends TestLogger {
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
YarnConfiguration yarnConfiguration,
int yarnHeartbeatIntervalMillis,
- @Nullable String webInteraceUrl) {
+ @Nullable String webInterfaceUrl) {
return mockResourceManagerClient;
}
@@ -213,7 +228,6 @@ public class YarnResourceManagerTest extends TestLogger {
// services
final TestingRpcService rpcService;
- final TestingFatalErrorHandler fatalErrorHandler;
final MockResourceManagerRuntimeServices rmServices;
// RM
@@ -240,7 +254,6 @@ public class YarnResourceManagerTest extends TestLogger {
*/
Context() throws Exception {
rpcService = new TestingRpcService();
- fatalErrorHandler = new TestingFatalErrorHandler();
rmServices = new MockResourceManagerRuntimeServices();
// resource manager
@@ -258,7 +271,7 @@ public class YarnResourceManagerTest extends TestLogger {
rmServices.metricRegistry,
rmServices.jobLeaderIdService,
new ClusterInformation("localhost", 1234),
- fatalErrorHandler,
+ testingFatalErrorHandler,
null,
mockResourceManagerClient,
mockNMClient);
@@ -269,15 +282,15 @@ public class YarnResourceManagerTest extends TestLogger {
*/
class MockResourceManagerRuntimeServices {
- public final ScheduledExecutor scheduledExecutor;
- public final TestingHighAvailabilityServices highAvailabilityServices;
- public final HeartbeatServices heartbeatServices;
- public final MetricRegistry metricRegistry;
- public final TestingLeaderElectionService rmLeaderElectionService;
- public final JobLeaderIdService jobLeaderIdService;
- public final SlotManager slotManager;
+ private final ScheduledExecutor scheduledExecutor;
+ private final TestingHighAvailabilityServices highAvailabilityServices;
+ private final HeartbeatServices heartbeatServices;
+ private final MetricRegistry metricRegistry;
+ private final TestingLeaderElectionService rmLeaderElectionService;
+ private final JobLeaderIdService jobLeaderIdService;
+ private final SlotManager slotManager;
- public UUID rmLeaderSessionId;
+ private UUID rmLeaderSessionId;
MockResourceManagerRuntimeServices() throws Exception {
scheduledExecutor = mock(ScheduledExecutor.class);
@@ -295,7 +308,7 @@ public class YarnResourceManagerTest extends TestLogger {
Time.minutes(5L));
}
- public void grantLeadership() throws Exception {
+ void grantLeadership() throws Exception {
rmLeaderSessionId = UUID.randomUUID();
rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
}
@@ -304,7 +317,7 @@ public class YarnResourceManagerTest extends TestLogger {
/**
* Start the resource manager and grant leadership to it.
*/
- public void startResourceManager() throws Exception {
+ void startResourceManager() throws Exception {
resourceManager.start();
rmServices.grantLeadership();
}
@@ -312,93 +325,129 @@ public class YarnResourceManagerTest extends TestLogger {
/**
* Stop the Akka actor system.
*/
- public void stopResourceManager() throws Exception {
+ void stopResourceManager() throws Exception {
rpcService.stopService().get();
}
- }
- @Test
- public void testStopWorker() throws Exception {
- new Context() {{
+ /**
+ * A wrapper function for running test. Deal with setup and teardown logic
+ * in Context.
+ * @param testMethod the real test body.
+ */
+ void runTest(RunnableWithException testMethod) throws Exception {
startResourceManager();
- // Request slot from SlotManager.
- CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
- rmServices.slotManager.registerSlotRequest(
- new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
- return null;
- });
+ try {
+ testMethod.run();
+ } finally {
+ stopResourceManager();
+ }
+ }
+ }
- // wait for the registerSlotRequest completion
- registerSlotRequestFuture.get();
-
- // Callback from YARN when container is allocated.
- Container testingContainer = mock(Container.class);
- when(testingContainer.getId()).thenReturn(
- ContainerId.newInstance(
- ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(System.currentTimeMillis(), 1),
- 1),
- 1));
- when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
- when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
- when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
- resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
- verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
- verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
-
- // Remote task executor registers with YarnResourceManager.
- TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
- rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
-
- final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-
- final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString());
- final SlotReport slotReport = new SlotReport(
- new SlotStatus(
- new SlotID(taskManagerResourceId, 1),
- new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap())));
-
- CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
- .registerTaskExecutor(
- taskHost,
- taskManagerResourceId,
- dataPort,
- hardwareDescription,
- Time.seconds(10L))
- .thenCompose(
- (RegistrationResponse response) -> {
- assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class));
- final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response;
- return rmGateway.sendSlotReport(
- taskManagerResourceId,
- success.getRegistrationId(),
- slotReport,
- Time.seconds(10L));
- })
- .handleAsync(
- (Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
- resourceManager.getMainThreadExecutorForTesting());
-
- final int numberRegisteredSlots = numberRegisteredSlotsFuture.get();
-
- assertEquals(1, numberRegisteredSlots);
-
- // Unregister all task executors and release all containers.
- CompletableFuture<?> unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> {
- rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
- return null;
- });
+ private static Container mockContainer(String host, int port, int containerId) {
+ Container mockContainer = mock(Container.class);
+
+ NodeId mockNodeId = NodeId.newInstance(host, port);
+ ContainerId mockContainerId = ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(System.currentTimeMillis(), 1),
+ 1
+ ),
+ containerId
+ );
+
+ when(mockContainer.getId()).thenReturn(mockContainerId);
+ when(mockContainer.getNodeId()).thenReturn(mockNodeId);
+ when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+ when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+
+ return mockContainer;
+ }
+
+ private static ContainerStatus mockContainerStatus(ContainerId containerId) {
+ ContainerStatus mockContainerStatus = mock(ContainerStatus.class);
- unregisterAndReleaseFuture.get();
+ when(mockContainerStatus.getContainerId()).thenReturn(containerId);
+ when(mockContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
+ when(mockContainerStatus.getDiagnostics()).thenReturn("Test exit");
+ when(mockContainerStatus.getExitStatus()).thenReturn(-1);
- verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
- verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+ return mockContainerStatus;
+ }
- stopResourceManager();
+ @Test
+ public void testStopWorker() throws Exception {
+ new Context() {{
+ runTest(() -> {
+ // Request slot from SlotManager.
+ CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
+ rmServices.slotManager.registerSlotRequest(
+ new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+ return null;
+ });
+
+ // wait for the registerSlotRequest completion
+ registerSlotRequestFuture.get();
+
+ // Callback from YARN when container is allocated.
+ Container testingContainer = mockContainer("container", 1234, 1);
+
+ resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+ verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+ verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
+
+ // Remote task executor registers with YarnResourceManager.
+ TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class);
+ rpcService.registerGateway(taskHost, mockTaskExecutorGateway);
+
+ final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+ final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString());
+ final SlotReport slotReport = new SlotReport(
+ new SlotStatus(
+ new SlotID(taskManagerResourceId, 1),
+ new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap())));
+
+ CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
+ .registerTaskExecutor(
+ taskHost,
+ taskManagerResourceId,
+ dataPort,
+ hardwareDescription,
+ Time.seconds(10L))
+ .thenCompose(
+ (RegistrationResponse response) -> {
+ assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class));
+ final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response;
+ return rmGateway.sendSlotReport(
+ taskManagerResourceId,
+ success.getRegistrationId(),
+ slotReport,
+ Time.seconds(10L));
+ })
+ .handleAsync(
+ (Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(),
+ resourceManager.getMainThreadExecutorForTesting());
+
+ final int numberRegisteredSlots = numberRegisteredSlotsFuture.get();
+
+ assertEquals(1, numberRegisteredSlots);
+
+ // Unregister all task executors and release all containers.
+ CompletableFuture<?> unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> {
+ rmServices.slotManager.unregisterTaskManagersAndReleaseResources();
+ return null;
+ });
+
+ unregisterAndReleaseFuture.get();
+
+ verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class));
+ verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class));
+ });
// It's now safe to access the SlotManager state since the ResourceManager has been stopped.
- assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 0);
- assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0);
+ assertThat(rmServices.slotManager.getNumberRegisteredSlots(), Matchers.equalTo(0));
+ assertThat(resourceManager.getNumberOfRegisteredTaskManagers().get(), Matchers.equalTo(0));
}};
}
@@ -411,65 +460,49 @@ public class YarnResourceManagerTest extends TestLogger {
final File applicationDir = folder.newFolder(".flink");
env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath());
- startResourceManager();
-
- resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
- assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
+ runTest(() -> {
+ resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+ assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
+ });
}};
}
/**
* Tests that YarnResourceManager will not request more containers than needs during
* callback from Yarn when container is Completed.
- * @throws Exception
*/
@Test
public void testOnContainerCompleted() throws Exception {
new Context() {{
- startResourceManager();
- CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
- rmServices.slotManager.registerSlotRequest(
- new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
- return null;
+ runTest(() -> {
+ CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
+ rmServices.slotManager.registerSlotRequest(
+ new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost));
+ return null;
+ });
+
+ // wait for the registerSlotRequest completion
+ registerSlotRequestFuture.get();
+
+ // Callback from YARN when container is allocated.
+ Container testingContainer = mockContainer("container", 1234, 1);
+
+ resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
+ verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+ verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
+
+ // Callback from YARN when container is Completed, pending request can not be fulfilled by pending
+ // containers, need to request new container.
+ ContainerStatus testingContainerStatus = mockContainerStatus(testingContainer.getId());
+
+ resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+ verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+
+ // Callback from YARN when container is Completed happened before global fail, pending request
+ // slot is already fulfilled by pending containers, no need to request new container.
+ resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
+ verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
});
-
- // wait for the registerSlotRequest completion
- registerSlotRequestFuture.get();
-
- ContainerId testContainerId = ContainerId.newInstance(
- ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(System.currentTimeMillis(), 1),
- 1),
- 1);
-
- // Callback from YARN when container is allocated.
- Container testingContainer = mock(Container.class);
- when(testingContainer.getId()).thenReturn(testContainerId);
- when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
- when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
- when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
- resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
- verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
- verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
-
- // Callback from YARN when container is Completed, pending request can not be fulfilled by pending
- // containers, need to request new container.
- ContainerStatus testingContainerStatus = mock(ContainerStatus.class);
- when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
- when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
- when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
- when(testingContainerStatus.getExitStatus()).thenReturn(-1);
- resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
- verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-
- // Callback from YARN when container is Completed happened before global fail, pending request
- // slot is already fulfilled by pending containers, no need to request new container.
- when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
- when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
- when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
- when(testingContainerStatus.getExitStatus()).thenReturn(-1);
- resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
- verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
}};
}
}