You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/30 15:53:23 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #13004: [FLINK-18719][runtime] Define interfaces for new active RMs and implement

tillrohrmann commented on a change in pull request #13004:
URL: https://github.com/apache/flink/pull/13004#discussion_r463060550



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemory.java
##########
@@ -50,7 +53,8 @@
 	private final MemorySize jvmHeap;
 	private final MemorySize offHeapMemory;
 
-	JobManagerFlinkMemory(MemorySize jvmHeap, MemorySize offHeapMemory) {
+	@VisibleForTesting

Review comment:
       The reason why Xintong added this annotation is that he changed the visibility of this class for some tests I believe.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriver.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link ResourceManagerDriver} is responsible for requesting and releasing resources from/to a particular external
+ * resource manager.
+ */
+public interface ResourceManagerDriver<WorkerType extends ResourceIDRetrievable> {
+
+	/**
+	 * Initialize the deployment specific components.
+	 *
+	 * @param resourceEventHandler Handler that handles resource events.
+	 */
+	void initialize(ResourceEventHandler<WorkerType> resourceEventHandler) throws Throwable;
+
+	/**
+	 * Terminate the deployment specific components.
+	 *
+	 * @return A future that will be completed successfully when the driver is terminated, or exceptionally if cannot be
+	 * terminated.
+	 */
+	CompletableFuture<Void> terminate();
+
+	/**
+	 * The deployment specific code to deregister the application. This should report the application's final status and
+	 * shut down the resource manager driver cleanly.
+	 *
+	 * <p>This method also needs to make sure all pending containers that are not registered yet are returned.
+	 *
+	 * @param finalStatus The application status to report.
+	 * @param optionalDiagnostics A diagnostics message or {@code null}.
+	 * @throws ResourceManagerException if the application could not be shut down.
+	 */
+	void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws Throwable;
+
+	/**
+	 * Request resource from the external resource manager.
+	 *
+	 * <p>This method request a new resource from the external resource manager, and tries to launch a task manager
+	 * inside the allocated resource, with respect to the provided taskExecutorProcessSpec The returned future will be

Review comment:
       ```suggestion
   	 * inside the allocated resource, with respect to the provided taskExecutorProcessSpec. The returned future will be
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
+import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ActiveResourceManager}.
+ */
+public class ActiveResourceManagerTest extends TestLogger {
+
+	private static final long TIMEOUT_SEC = 5L;
+	private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
+
+	private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = WorkerResourceSpec.ZERO;
+
+	/**
+	 * Tests worker successfully requested, started and registered.
+	 */
+	@Test
+	public void testStartNewWorker() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+			final CompletableFuture<TaskExecutorProcessSpec> requestWorkerFromDriverFuture = new CompletableFuture<>();
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				requestWorkerFromDriverFuture.complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceId);
+			});
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec = requestWorkerFromDriverFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker failed while requesting.
+	 */
+	@Test
+	public void testStartNewWorkerFailedRequesting() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<CompletableFuture<ResourceID>> resourceIdFutures = new ArrayList<>();
+			resourceIdFutures.add(new CompletableFuture<>());
+			resourceIdFutures.add(new CompletableFuture<>());
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return resourceIdFutures.get(idx);
+			});
+
+			slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec1,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// first request failed, verify requesting another worker from driver
+				runInMainThread(() -> resourceIdFutures.get(0).completeExceptionally(new Throwable("testing error")));
+				TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+						requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+
+				// second request allocated, verify registration succeed
+				runInMainThread(() -> resourceIdFutures.get(1).complete(tmResourceId));
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker terminated after requested before registered.
+	 */
+	@Test
+	public void testWorkerTerminatedBeforeRegister() throws Exception {
+		new Context() {{
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<ResourceID> tmResourceIds = new ArrayList<>();
+			tmResourceIds.add(ResourceID.generate());
+			tmResourceIds.add(ResourceID.generate());
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceIds.get(idx));
+			});
+
+			slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec1,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// first worker failed before register, verify requesting another worker from driver
+				runInMainThread(() -> getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+				TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+						requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+
+				// second worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceIds.get(1));
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker terminated after registered.
+	 */
+	@Test
+	public void testWorkerTerminatedAfterRegister() throws Exception {
+		new Context() {{
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<ResourceID> tmResourceIds = new ArrayList<>();
+			tmResourceIds.add(ResourceID.generate());
+			tmResourceIds.add(ResourceID.generate());
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceIds.get(idx));
+			});
+
+			slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec1,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// first worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture1 = registerTaskExecutor(tmResourceIds.get(0));
+				assertThat(registerTaskExecutorFuture1.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+
+				// first worker terminated, verify requesting another worker from driver
+				runInMainThread(() -> getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+				TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+						requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+
+				// second worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture2 = registerTaskExecutor(tmResourceIds.get(1));
+				assertThat(registerTaskExecutorFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker terminated and is no longer required.
+	 */
+	@Test
+	public void testWorkerTerminatedNoLongerRequired() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceId);
+			});
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+
+				// worker terminated, verify not requesting new worker
+				runInMainThread(() -> {
+					getResourceManager().onWorkerTerminated(tmResourceId);
+					// needs to return something, so that we can use `get()` to make sure the main thread processing
+					// finishes before the assertions
+					return null;
+				}).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+				assertFalse(requestWorkerFromDriverFutures.get(1).isDone());
+			});
+		}};
+	}
+
+	/**
+	 * Tests workers from previous attempt successfully recovered and registered.
+	 */
+	@Test
+	public void testRecoverWorkerFromPreviousAttempt() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+
+			runTest(() -> {
+				runInMainThread(() -> getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(tmResourceId)));
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests decline unknown worker registration.
+	 */
+	@Test
+	public void testRegisterUnknownWorker() throws Exception {
+		new Context() {{
+			runTest(() -> {
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(ResourceID.generate());
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Decline.class));
+			});
+		}};
+	}
+
+	@Test
+	public void testOnError() throws Exception {
+		new Context() {{
+			final Throwable fatalError = new Throwable("Testing fatal error");
+			runTest(() -> {
+				runInMainThread(() -> getResourceManager().onError(fatalError));
+				final Throwable reportedError = getFatalErrorHandler().getErrorFuture().get(TIMEOUT_SEC, TimeUnit.SECONDS);
+				assertThat(reportedError, is(fatalError));
+			});
+		}};
+	}
+
+	class Context {
+
+		final Configuration flinkConfig = new Configuration();
+		final TestingResourceManagerDriver.Builder driverBuilder = new TestingResourceManagerDriver.Builder();
+		final TestingSlotManagerBuilder slotManagerBuilder = new TestingSlotManagerBuilder();
+
+		private ActiveResourceManager<ResourceID> resourceManager;
+		private TestingFatalErrorHandler fatalErrorHandler;
+
+		ActiveResourceManager<ResourceID> getResourceManager() {
+			return resourceManager;
+		}
+
+		TestingFatalErrorHandler getFatalErrorHandler() {
+			return fatalErrorHandler;
+		}
+
+		void runTest(RunnableWithException testMethod) throws Exception {
+			fatalErrorHandler = new TestingFatalErrorHandler();
+			resourceManager = createAndStartResourceManager(
+					flinkConfig,
+					driverBuilder.build(),
+					slotManagerBuilder.createSlotManager());
+
+			try {
+				testMethod.run();
+			} finally {
+				resourceManager.close();
+			}
+		}
+
+		private ActiveResourceManager<ResourceID> createAndStartResourceManager(
+				Configuration configuration,
+				ResourceManagerDriver<ResourceID> driver,
+				SlotManager slotManager) throws Exception {
+			final TestingRpcService rpcService = new TestingRpcService(configuration);

Review comment:
       nit: I think it would be a bit cheaper to reuse the `TestingRpcService` via `@ClassRule static final TestingRpcServiceResource testingRpcServiceResource = ....`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
+import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ActiveResourceManager}.
+ */
+public class ActiveResourceManagerTest extends TestLogger {
+
+	private static final long TIMEOUT_SEC = 5L;
+	private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
+
+	private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = WorkerResourceSpec.ZERO;
+
+	/**
+	 * Tests worker successfully requested, started and registered.
+	 */
+	@Test
+	public void testStartNewWorker() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+			final CompletableFuture<TaskExecutorProcessSpec> requestWorkerFromDriverFuture = new CompletableFuture<>();
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				requestWorkerFromDriverFuture.complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceId);
+			});
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec = requestWorkerFromDriverFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker failed while requesting.
+	 */
+	@Test
+	public void testStartNewWorkerFailedRequesting() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<CompletableFuture<ResourceID>> resourceIdFutures = new ArrayList<>();
+			resourceIdFutures.add(new CompletableFuture<>());
+			resourceIdFutures.add(new CompletableFuture<>());
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return resourceIdFutures.get(idx);
+			});
+
+			slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec1,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// first request failed, verify requesting another worker from driver
+				runInMainThread(() -> resourceIdFutures.get(0).completeExceptionally(new Throwable("testing error")));
+				TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+						requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+
+				// second request allocated, verify registration succeed
+				runInMainThread(() -> resourceIdFutures.get(1).complete(tmResourceId));
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker terminated after requested before registered.
+	 */
+	@Test
+	public void testWorkerTerminatedBeforeRegister() throws Exception {
+		new Context() {{
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<ResourceID> tmResourceIds = new ArrayList<>();
+			tmResourceIds.add(ResourceID.generate());
+			tmResourceIds.add(ResourceID.generate());
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceIds.get(idx));
+			});
+
+			slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec1,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// first worker failed before register, verify requesting another worker from driver
+				runInMainThread(() -> getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+				TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+						requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+
+				// second worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceIds.get(1));
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker terminated after registered.
+	 */
+	@Test
+	public void testWorkerTerminatedAfterRegister() throws Exception {
+		new Context() {{
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<ResourceID> tmResourceIds = new ArrayList<>();
+			tmResourceIds.add(ResourceID.generate());
+			tmResourceIds.add(ResourceID.generate());
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceIds.get(idx));
+			});
+
+			slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec1,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// first worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture1 = registerTaskExecutor(tmResourceIds.get(0));
+				assertThat(registerTaskExecutorFuture1.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+
+				// first worker terminated, verify requesting another worker from driver
+				runInMainThread(() -> getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+				TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+						requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+
+				// second worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture2 = registerTaskExecutor(tmResourceIds.get(1));
+				assertThat(registerTaskExecutorFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker terminated and is no longer required.
+	 */
+	@Test
+	public void testWorkerTerminatedNoLongerRequired() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceId);
+			});
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+
+				// worker terminated, verify not requesting new worker
+				runInMainThread(() -> {
+					getResourceManager().onWorkerTerminated(tmResourceId);
+					// needs to return something, so that we can use `get()` to make sure the main thread processing
+					// finishes before the assertions
+					return null;
+				}).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+				assertFalse(requestWorkerFromDriverFutures.get(1).isDone());
+			});
+		}};
+	}
+
+	/**
+	 * Tests workers from previous attempt successfully recovered and registered.
+	 */
+	@Test
+	public void testRecoverWorkerFromPreviousAttempt() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+
+			runTest(() -> {
+				runInMainThread(() -> getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(tmResourceId)));
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests decline unknown worker registration.
+	 */
+	@Test
+	public void testRegisterUnknownWorker() throws Exception {
+		new Context() {{
+			runTest(() -> {
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(ResourceID.generate());
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Decline.class));
+			});
+		}};
+	}
+
+	@Test
+	public void testOnError() throws Exception {
+		new Context() {{
+			final Throwable fatalError = new Throwable("Testing fatal error");
+			runTest(() -> {
+				runInMainThread(() -> getResourceManager().onError(fatalError));
+				final Throwable reportedError = getFatalErrorHandler().getErrorFuture().get(TIMEOUT_SEC, TimeUnit.SECONDS);
+				assertThat(reportedError, is(fatalError));
+			});
+		}};
+	}
+
+	class Context {
+
+		final Configuration flinkConfig = new Configuration();
+		final TestingResourceManagerDriver.Builder driverBuilder = new TestingResourceManagerDriver.Builder();
+		final TestingSlotManagerBuilder slotManagerBuilder = new TestingSlotManagerBuilder();
+
+		private ActiveResourceManager<ResourceID> resourceManager;
+		private TestingFatalErrorHandler fatalErrorHandler;
+
+		ActiveResourceManager<ResourceID> getResourceManager() {
+			return resourceManager;
+		}
+
+		TestingFatalErrorHandler getFatalErrorHandler() {
+			return fatalErrorHandler;
+		}
+
+		void runTest(RunnableWithException testMethod) throws Exception {
+			fatalErrorHandler = new TestingFatalErrorHandler();
+			resourceManager = createAndStartResourceManager(
+					flinkConfig,
+					driverBuilder.build(),
+					slotManagerBuilder.createSlotManager());
+
+			try {
+				testMethod.run();
+			} finally {
+				resourceManager.close();
+			}
+		}
+
+		private ActiveResourceManager<ResourceID> createAndStartResourceManager(
+				Configuration configuration,
+				ResourceManagerDriver<ResourceID> driver,
+				SlotManager slotManager) throws Exception {
+			final TestingRpcService rpcService = new TestingRpcService(configuration);
+			final MockResourceManagerRuntimeServices rmServices = new MockResourceManagerRuntimeServices(rpcService, TIMEOUT_TIME, slotManager);
+
+			final ActiveResourceManager<ResourceID> activeResourceManager = new ActiveResourceManager<>(
+					driver,
+					configuration,
+					rpcService,
+					ResourceID.generate(),
+					rmServices.highAvailabilityServices,
+					rmServices.heartbeatServices,
+					rmServices.slotManager,
+					NoOpResourceManagerPartitionTracker::get,
+					rmServices.jobLeaderIdService,
+					new ClusterInformation("localhost", 1234),
+					fatalErrorHandler,
+					UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
+
+			activeResourceManager.start();
+			rmServices.grantLeadership();
+
+			return activeResourceManager;
+		}
+
+		public void runInMainThread(Runnable runnable) {
+			resourceManager.handleInMainThread(runnable);
+		}
+
+		public <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
+			return resourceManager.runInMainThread(callable, TIMEOUT_TIME);
+		}
+
+		CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceID resourceID) throws Exception {
+			final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+					.createTestingTaskExecutorGateway();
+			((TestingRpcService) resourceManager.getRpcService()).registerGateway(resourceID.toString(), taskExecutorGateway);

Review comment:
       Having a class wide `TestingRpcService` would avoid this casting here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceEventHandler.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import java.util.Collection;
+
+/**
+ * Callback interfaces for handling resource events from external resource managers.
+ */
+public interface ResourceEventHandler<WorkerType extends ResourceIDRetrievable> {
+
+	/**
+	 * Notifies that workers of previous attempt have been recovered from the external resource manager.
+	 *
+	 * @param recoveredWorkers Collection of worker nodes, in the deployment specific type.
+	 */
+	void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWorkers);
+
+	/**
+	 * Notifies that the worker has been terminated.
+	 *
+	 * <p>See also {@link ResourceManagerDriver#requestResource}.
+	 *
+	 * @param resourceId Identifier of the terminated worker.
+	 */
+	void onWorkerTerminated(ResourceID resourceId);
+
+	/**
+	 * Notifies that an error has occurred that the process cannot proceed.
+	 *
+	 * @param exception Exception that describes the error.
+	 */
+	void onError(Throwable exception);
+
+	/**
+	 * Execute given runnable in the rpc main thread.
+	 *
+	 * @param runnable Runnable to be executed.
+	 */
+	void handleInMainThread(Runnable runnable);

Review comment:
       Would it make sense to pass into the `ResourceManagerDriver.initialize()` a `MainThreadExecutor` instead of offering this method? The advantage would be that once could directly run future call backs in the main thread and does not have to call this method from the callback. Moreover, it would separate the concerns a bit better because the event handler does not need to know about the main thread if I'm not mistaken.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriver.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link ResourceManagerDriver} is responsible for requesting and releasing resources from/to a particular external
+ * resource manager.
+ */
+public interface ResourceManagerDriver<WorkerType extends ResourceIDRetrievable> {
+
+	/**
+	 * Initialize the deployment specific components.
+	 *
+	 * @param resourceEventHandler Handler that handles resource events.
+	 */
+	void initialize(ResourceEventHandler<WorkerType> resourceEventHandler) throws Throwable;
+
+	/**
+	 * Terminate the deployment specific components.
+	 *
+	 * @return A future that will be completed successfully when the driver is terminated, or exceptionally if cannot be
+	 * terminated.
+	 */
+	CompletableFuture<Void> terminate();
+
+	/**
+	 * The deployment specific code to deregister the application. This should report the application's final status and
+	 * shut down the resource manager driver cleanly.
+	 *
+	 * <p>This method also needs to make sure all pending containers that are not registered yet are returned.
+	 *
+	 * @param finalStatus The application status to report.
+	 * @param optionalDiagnostics A diagnostics message or {@code null}.
+	 * @throws ResourceManagerException if the application could not be shut down.
+	 */
+	void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws Throwable;
+
+	/**
+	 * Request resource from the external resource manager.
+	 *
+	 * <p>This method request a new resource from the external resource manager, and tries to launch a task manager
+	 * inside the allocated resource, with respect to the provided taskExecutorProcessSpec The returned future will be
+	 * completed with a worker node in the deployment specific type, or exceptionally if the allocation has failed.
+	 *
+	 * <p>Note: Success completion of the returned future does not necessarily mean the success of resource allocation
+	 * and task manager launching. Allocation and launching failures can still happen after the future completion. In
+	 * such cases, {@link ResourceEventHandler#onWorkerTerminated} will be called.
+	 *
+	 * <p>The future is guaranteed to be completed in the rpc main thread, before trying to launch the task manager,
+	 * thus before the task manager registration. It is also guaranteed that
+	 * {@link ResourceEventHandler#onWorkerTerminated} will not be called on the requested worker, until the returned
+	 * future is completed successfully.

Review comment:
       Nice description of the contract.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriver.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link ResourceManagerDriver} is responsible for requesting and releasing resources from/to a particular external
+ * resource manager.
+ */
+public interface ResourceManagerDriver<WorkerType extends ResourceIDRetrievable> {
+
+	/**
+	 * Initialize the deployment specific components.
+	 *
+	 * @param resourceEventHandler Handler that handles resource events.
+	 */
+	void initialize(ResourceEventHandler<WorkerType> resourceEventHandler) throws Throwable;
+
+	/**
+	 * Terminate the deployment specific components.
+	 *
+	 * @return A future that will be completed successfully when the driver is terminated, or exceptionally if cannot be
+	 * terminated.
+	 */
+	CompletableFuture<Void> terminate();
+
+	/**
+	 * The deployment specific code to deregister the application. This should report the application's final status and
+	 * shut down the resource manager driver cleanly.
+	 *
+	 * <p>This method also needs to make sure all pending containers that are not registered yet are returned.
+	 *
+	 * @param finalStatus The application status to report.
+	 * @param optionalDiagnostics A diagnostics message or {@code null}.
+	 * @throws ResourceManagerException if the application could not be shut down.
+	 */
+	void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws Throwable;
+
+	/**
+	 * Request resource from the external resource manager.
+	 *
+	 * <p>This method request a new resource from the external resource manager, and tries to launch a task manager
+	 * inside the allocated resource, with respect to the provided taskExecutorProcessSpec The returned future will be
+	 * completed with a worker node in the deployment specific type, or exceptionally if the allocation has failed.
+	 *
+	 * <p>Note: Success completion of the returned future does not necessarily mean the success of resource allocation

Review comment:
       ```suggestion
   	 * <p>Note: Completion of the returned future does not necessarily mean the success of resource allocation
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriver.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link ResourceManagerDriver} is responsible for requesting and releasing resources from/to a particular external
+ * resource manager.
+ */
+public interface ResourceManagerDriver<WorkerType extends ResourceIDRetrievable> {
+
+	/**
+	 * Initialize the deployment specific components.
+	 *
+	 * @param resourceEventHandler Handler that handles resource events.
+	 */
+	void initialize(ResourceEventHandler<WorkerType> resourceEventHandler) throws Throwable;
+
+	/**
+	 * Terminate the deployment specific components.
+	 *
+	 * @return A future that will be completed successfully when the driver is terminated, or exceptionally if cannot be
+	 * terminated.
+	 */
+	CompletableFuture<Void> terminate();
+
+	/**
+	 * The deployment specific code to deregister the application. This should report the application's final status and
+	 * shut down the resource manager driver cleanly.
+	 *
+	 * <p>This method also needs to make sure all pending containers that are not registered yet are returned.
+	 *
+	 * @param finalStatus The application status to report.
+	 * @param optionalDiagnostics A diagnostics message or {@code null}.
+	 * @throws ResourceManagerException if the application could not be shut down.
+	 */
+	void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws Throwable;

Review comment:
       What is the relation between these two methods? From the JavaDocs, it reads as if `deregisterApplication` would also shut down the driver similar to `terminate`. Can they be unified?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An active implementation of {@link ResourceManager}.
+ *
+ * <p>This resource manager actively requests and releases resources from/to the external resource management frameworks.
+ * With different {@link ResourceManagerDriver} provided, this resource manager can work with various frameworks.
+ */
+public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
+		extends ResourceManager<WorkerType> implements ResourceEventHandler<WorkerType> {
+
+	protected final Configuration flinkConfig;
+
+	private final ResourceManagerDriver<WorkerType> resourceManagerDriver;
+
+	/** All workers maintained by {@link ActiveResourceManager}. */
+	private final Map<ResourceID, WorkerType> workerNodeMap;
+
+	/** Number of requested and not registered workers per worker resource spec. */
+	private final PendingWorkerCounter pendingWorkerCounter;
+
+	/** Identifiers and worker resource spec of requested not registered workers. */
+	private final Map<ResourceID, WorkerResourceSpec> currentAttemptUnregisteredWorkers;
+
+	public ActiveResourceManager(
+			ResourceManagerDriver<WorkerType> resourceManagerDriver,
+			Configuration flinkConfig,
+			RpcService rpcService,
+			ResourceID resourceId,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			SlotManager slotManager,
+			ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory,
+			JobLeaderIdService jobLeaderIdService,
+			ClusterInformation clusterInformation,
+			FatalErrorHandler fatalErrorHandler,
+			ResourceManagerMetricGroup resourceManagerMetricGroup) {
+		super(
+				rpcService,
+				resourceId,
+				highAvailabilityServices,
+				heartbeatServices,
+				slotManager,
+				clusterPartitionTrackerFactory,
+				jobLeaderIdService,
+				clusterInformation,
+				fatalErrorHandler,
+				resourceManagerMetricGroup,
+				AkkaUtils.getTimeoutAsTime(Preconditions.checkNotNull(flinkConfig)));
+
+		this.flinkConfig = flinkConfig;
+		this.resourceManagerDriver = Preconditions.checkNotNull(resourceManagerDriver);
+		this.workerNodeMap = new HashMap<>();
+		this.pendingWorkerCounter = new PendingWorkerCounter();
+		this.currentAttemptUnregisteredWorkers = new HashMap<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  ResourceManager
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void initialize() throws ResourceManagerException {
+		try {
+			resourceManagerDriver.initialize(this);
+		} catch (Throwable t) {
+			throw new ResourceManagerException("Cannot initialize resource provider.", t);
+		}
+	}
+
+	@Override
+	protected void terminate() throws ResourceManagerException {
+		try {
+			resourceManagerDriver.terminate().get();
+		} catch (Throwable t) {
+			throw new ResourceManagerException("Cannot terminate resource provider.", t);
+		}
+	}
+
+	@Override
+	protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics)
+			throws ResourceManagerException {
+		try {
+			resourceManagerDriver.deregisterApplication(finalStatus, optionalDiagnostics);
+		} catch (Throwable t) {
+			throw new ResourceManagerException("Cannot deregister application.", t);
+		}
+	}
+
+	@Override
+	public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
+		requestNewWorker(workerResourceSpec);
+		return true;
+	}
+
+	@Override
+	protected WorkerType workerStarted(ResourceID resourceID) {
+		return workerNodeMap.get(resourceID);
+	}
+
+	@Override
+	public boolean stopWorker(WorkerType worker) {
+		final ResourceID resourceId = worker.getResourceID();
+		resourceManagerDriver.releaseResource(worker);
+
+		log.info("Stopping worker {}.", resourceId);
+
+		clearStateForWorker(resourceId);
+
+		return true;
+	}
+
+	@Override
+	protected void onWorkerRegistered(WorkerType worker) {
+		final ResourceID resourceId = worker.getResourceID();
+		log.info("Worker {} is registered.", resourceId);
+
+		final WorkerResourceSpec workerResourceSpec = currentAttemptUnregisteredWorkers.remove(resourceId);
+		if (workerResourceSpec != null) {
+			final int count = pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+			log.info("Worker {} with resource spec {} was requested in current attempt." +
+							" Current pending count after registering: {}.",
+					resourceId,
+					workerResourceSpec,
+					count);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  ResourceEventListener
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWorkers) {
+		log.info("Recovered {} workers from previous attempt.", recoveredWorkers.size());
+		for (WorkerType worker : recoveredWorkers) {
+			final ResourceID resourceId = worker.getResourceID();
+			workerNodeMap.put(resourceId, worker);
+			log.info("Worker {} recovered from previous attempt.", resourceId);
+		}
+	}
+
+	@Override
+	public void onWorkerTerminated(ResourceID resourceId) {
+		log.info("Worker {} is terminated.", resourceId);
+		if (clearStateForWorker(resourceId)) {
+			requestWorkerIfRequired();
+		}
+	}
+
+	@Override
+	public void onError(Throwable exception) {
+		onFatalError(exception);
+	}
+
+	@Override
+	public void handleInMainThread(Runnable runnable) {
+		runAsync(runnable);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Internal
+	// ------------------------------------------------------------------------
+
+	private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
+		final TaskExecutorProcessSpec taskExecutorProcessSpec =
+				TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec);
+		final int pendingCount = pendingWorkerCounter.increaseAndGet(workerResourceSpec);
+
+		log.info("Requesting new worker with resource spec {}, current pending count: {}.",
+				workerResourceSpec,
+				pendingCount);
+
+		resourceManagerDriver.requestResource(taskExecutorProcessSpec)
+				.whenComplete((worker, exception) -> {
+					if (exception != null) {
+						final int count = pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+						log.warn("Failed requesting worker with resource spec {}, current pending count: {}, exception: {}",
+								workerResourceSpec,
+								count,
+								exception);
+						requestWorkerIfRequired();
+					} else {
+						final ResourceID resourceId = worker.getResourceID();
+						workerNodeMap.put(resourceId, worker);
+						currentAttemptUnregisteredWorkers.put(resourceId, workerResourceSpec);
+						log.info("Requested worker {} with resource spec {}.",
+								resourceId,
+								workerResourceSpec);
+					}
+				});

Review comment:
       It would be good to guard against unwanted exceptions via `FutureUtils.assertNoException(...)`. In this case `whenComplete` would have to become `handle` which returns `null`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/CommonProcessMemorySpec.java
##########
@@ -94,4 +96,16 @@ public MemorySize getTotalFlinkMemorySize() {
 	public MemorySize getTotalProcessMemorySize() {
 		return flinkMemory.getTotalFlinkMemorySize().add(getJvmMetaspaceSize()).add(getJvmOverheadSize());
 	}
+
+	@Override
+	public boolean equals(Object obj) {

Review comment:
       For what do we need the `equals` method on `CommonProcessMemorySpec`?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/TestingResourceManagerDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.ConsumerWithException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of {@link ResourceManagerDriver}.
+ */
+public class TestingResourceManagerDriver implements ResourceManagerDriver<ResourceID> {
+
+	private final ConsumerWithException<ResourceEventHandler<ResourceID>, Throwable> initializeConsumer;
+	private final Supplier<CompletableFuture<Void>> terminateSupplier;
+	private final BiConsumerWithException<ApplicationStatus, String, Throwable> deregisterApplicationConsumer;
+	private final Function<TaskExecutorProcessSpec, CompletableFuture<ResourceID>> requestResourceFunction;
+	private final Consumer<ResourceID> releaseResourceConsumer;
+
+	private TestingResourceManagerDriver(
+			final ConsumerWithException<ResourceEventHandler<ResourceID>, Throwable> initializeConsumer,
+			final Supplier<CompletableFuture<Void>> terminateSupplier,
+			final BiConsumerWithException<ApplicationStatus, String, Throwable> deregisterApplicationConsumer,
+			final Function<TaskExecutorProcessSpec, CompletableFuture<ResourceID>> requestResourceFunction,
+			final Consumer<ResourceID> releaseResourceConsumer) {
+		this.initializeConsumer = Preconditions.checkNotNull(initializeConsumer);
+		this.terminateSupplier = Preconditions.checkNotNull(terminateSupplier);
+		this.deregisterApplicationConsumer = Preconditions.checkNotNull(deregisterApplicationConsumer);
+		this.requestResourceFunction = Preconditions.checkNotNull(requestResourceFunction);
+		this.releaseResourceConsumer = Preconditions.checkNotNull(releaseResourceConsumer);
+	}
+
+	@Override
+	public void initialize(ResourceEventHandler<ResourceID> resourceEventHandler) throws Throwable {
+		initializeConsumer.accept(resourceEventHandler);
+	}
+
+	@Override
+	public CompletableFuture<Void> terminate() {
+		return terminateSupplier.get();
+	}
+
+	@Override
+	public void deregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws Throwable {
+		deregisterApplicationConsumer.accept(finalStatus, optionalDiagnostics);
+	}
+
+	@Override
+	public CompletableFuture<ResourceID> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+		return requestResourceFunction.apply(taskExecutorProcessSpec);
+	}
+
+	@Override
+	public void releaseResource(ResourceID worker) {
+		releaseResourceConsumer.accept(worker);
+	}
+
+	public static class Builder {
+		private ConsumerWithException<ResourceEventHandler<ResourceID>, Throwable> initializeConsumer =
+				(ignore) -> {};
+
+		private Supplier<CompletableFuture<Void>> terminateSupplier =
+				() -> CompletableFuture.completedFuture(null);
+
+		private BiConsumerWithException<ApplicationStatus, String, Throwable> deregisterApplicationConsumer =
+				(ignore1, ignore20) -> {};

Review comment:
       ```suggestion
   				(ignore1, ignore2) -> {};
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
+import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ActiveResourceManager}.
+ */
+public class ActiveResourceManagerTest extends TestLogger {
+
+	private static final long TIMEOUT_SEC = 5L;
+	private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
+
+	private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = WorkerResourceSpec.ZERO;
+
+	/**
+	 * Tests worker successfully requested, started and registered.
+	 */
+	@Test
+	public void testStartNewWorker() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+			final CompletableFuture<TaskExecutorProcessSpec> requestWorkerFromDriverFuture = new CompletableFuture<>();
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				requestWorkerFromDriverFuture.complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceId);
+			});
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec = requestWorkerFromDriverFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// worker registered, verify registration succeed

Review comment:
       ```suggestion
   				// worker registered, verify registration succeeded
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An active implementation of {@link ResourceManager}.
+ *
+ * <p>This resource manager actively requests and releases resources from/to the external resource management frameworks.
+ * With different {@link ResourceManagerDriver} provided, this resource manager can work with various frameworks.
+ */
+public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
+		extends ResourceManager<WorkerType> implements ResourceEventHandler<WorkerType> {
+
+	protected final Configuration flinkConfig;
+
+	private final ResourceManagerDriver<WorkerType> resourceManagerDriver;
+
+	/** All workers maintained by {@link ActiveResourceManager}. */
+	private final Map<ResourceID, WorkerType> workerNodeMap;
+
+	/** Number of requested and not registered workers per worker resource spec. */
+	private final PendingWorkerCounter pendingWorkerCounter;
+
+	/** Identifiers and worker resource spec of requested not registered workers. */
+	private final Map<ResourceID, WorkerResourceSpec> currentAttemptUnregisteredWorkers;
+
+	public ActiveResourceManager(
+			ResourceManagerDriver<WorkerType> resourceManagerDriver,
+			Configuration flinkConfig,
+			RpcService rpcService,
+			ResourceID resourceId,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			SlotManager slotManager,
+			ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory,
+			JobLeaderIdService jobLeaderIdService,
+			ClusterInformation clusterInformation,
+			FatalErrorHandler fatalErrorHandler,
+			ResourceManagerMetricGroup resourceManagerMetricGroup) {
+		super(
+				rpcService,
+				resourceId,
+				highAvailabilityServices,
+				heartbeatServices,
+				slotManager,
+				clusterPartitionTrackerFactory,
+				jobLeaderIdService,
+				clusterInformation,
+				fatalErrorHandler,
+				resourceManagerMetricGroup,
+				AkkaUtils.getTimeoutAsTime(Preconditions.checkNotNull(flinkConfig)));
+
+		this.flinkConfig = flinkConfig;
+		this.resourceManagerDriver = Preconditions.checkNotNull(resourceManagerDriver);

Review comment:
       null checks seem a bit inconsistent here.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
+import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ActiveResourceManager}.
+ */
+public class ActiveResourceManagerTest extends TestLogger {
+
+	private static final long TIMEOUT_SEC = 5L;
+	private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
+
+	private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = WorkerResourceSpec.ZERO;
+
+	/**
+	 * Tests worker successfully requested, started and registered.
+	 */
+	@Test
+	public void testStartNewWorker() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+			final CompletableFuture<TaskExecutorProcessSpec> requestWorkerFromDriverFuture = new CompletableFuture<>();
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				requestWorkerFromDriverFuture.complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceId);
+			});
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec = requestWorkerFromDriverFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker failed while requesting.
+	 */
+	@Test
+	public void testStartNewWorkerFailedRequesting() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<CompletableFuture<ResourceID>> resourceIdFutures = new ArrayList<>();
+			resourceIdFutures.add(new CompletableFuture<>());
+			resourceIdFutures.add(new CompletableFuture<>());
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return resourceIdFutures.get(idx);
+			});
+
+			slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec1,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// first request failed, verify requesting another worker from driver
+				runInMainThread(() -> resourceIdFutures.get(0).completeExceptionally(new Throwable("testing error")));
+				TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+						requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+
+				// second request allocated, verify registration succeed
+				runInMainThread(() -> resourceIdFutures.get(1).complete(tmResourceId));
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker terminated after requested before registered.
+	 */
+	@Test
+	public void testWorkerTerminatedBeforeRegister() throws Exception {
+		new Context() {{
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<ResourceID> tmResourceIds = new ArrayList<>();
+			tmResourceIds.add(ResourceID.generate());
+			tmResourceIds.add(ResourceID.generate());
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceIds.get(idx));
+			});
+
+			slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec1,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// first worker failed before register, verify requesting another worker from driver
+				runInMainThread(() -> getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+				TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+						requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+
+				// second worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceIds.get(1));
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker terminated after registered.
+	 */
+	@Test
+	public void testWorkerTerminatedAfterRegister() throws Exception {
+		new Context() {{
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<ResourceID> tmResourceIds = new ArrayList<>();
+			tmResourceIds.add(ResourceID.generate());
+			tmResourceIds.add(ResourceID.generate());
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceIds.get(idx));
+			});
+
+			slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec1,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// first worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture1 = registerTaskExecutor(tmResourceIds.get(0));
+				assertThat(registerTaskExecutorFuture1.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+
+				// first worker terminated, verify requesting another worker from driver
+				runInMainThread(() -> getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+				TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+						requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+
+				// second worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture2 = registerTaskExecutor(tmResourceIds.get(1));
+				assertThat(registerTaskExecutorFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests worker terminated and is no longer required.
+	 */
+	@Test
+	public void testWorkerTerminatedNoLongerRequired() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<CompletableFuture<TaskExecutorProcessSpec>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+				return CompletableFuture.completedFuture(tmResourceId);
+			});
+
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+						getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				TaskExecutorProcessSpec taskExecutorProcessSpec = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+				assertThat(taskExecutorProcessSpec,
+						is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, WORKER_RESOURCE_SPEC)));
+
+				// worker registered, verify registration succeed
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+
+				// worker terminated, verify not requesting new worker
+				runInMainThread(() -> {
+					getResourceManager().onWorkerTerminated(tmResourceId);
+					// needs to return something, so that we can use `get()` to make sure the main thread processing
+					// finishes before the assertions
+					return null;
+				}).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+				assertFalse(requestWorkerFromDriverFutures.get(1).isDone());
+			});
+		}};
+	}
+
+	/**
+	 * Tests workers from previous attempt successfully recovered and registered.
+	 */
+	@Test
+	public void testRecoverWorkerFromPreviousAttempt() throws Exception {
+		new Context() {{
+			final ResourceID tmResourceId = ResourceID.generate();
+
+			runTest(() -> {
+				runInMainThread(() -> getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(tmResourceId)));
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Success.class));
+			});
+		}};
+	}
+
+	/**
+	 * Tests decline unknown worker registration.
+	 */
+	@Test
+	public void testRegisterUnknownWorker() throws Exception {
+		new Context() {{
+			runTest(() -> {
+				CompletableFuture<RegistrationResponse> registerTaskExecutorFuture = registerTaskExecutor(ResourceID.generate());
+				assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), instanceOf(RegistrationResponse.Decline.class));
+			});
+		}};
+	}
+
+	@Test
+	public void testOnError() throws Exception {
+		new Context() {{
+			final Throwable fatalError = new Throwable("Testing fatal error");
+			runTest(() -> {
+				runInMainThread(() -> getResourceManager().onError(fatalError));
+				final Throwable reportedError = getFatalErrorHandler().getErrorFuture().get(TIMEOUT_SEC, TimeUnit.SECONDS);
+				assertThat(reportedError, is(fatalError));
+			});
+		}};
+	}
+
+	class Context {
+
+		final Configuration flinkConfig = new Configuration();
+		final TestingResourceManagerDriver.Builder driverBuilder = new TestingResourceManagerDriver.Builder();
+		final TestingSlotManagerBuilder slotManagerBuilder = new TestingSlotManagerBuilder();
+
+		private ActiveResourceManager<ResourceID> resourceManager;
+		private TestingFatalErrorHandler fatalErrorHandler;
+
+		ActiveResourceManager<ResourceID> getResourceManager() {
+			return resourceManager;
+		}
+
+		TestingFatalErrorHandler getFatalErrorHandler() {
+			return fatalErrorHandler;
+		}
+
+		void runTest(RunnableWithException testMethod) throws Exception {
+			fatalErrorHandler = new TestingFatalErrorHandler();
+			resourceManager = createAndStartResourceManager(
+					flinkConfig,
+					driverBuilder.build(),
+					slotManagerBuilder.createSlotManager());
+
+			try {
+				testMethod.run();
+			} finally {
+				resourceManager.close();
+			}
+		}
+
+		private ActiveResourceManager<ResourceID> createAndStartResourceManager(
+				Configuration configuration,
+				ResourceManagerDriver<ResourceID> driver,
+				SlotManager slotManager) throws Exception {
+			final TestingRpcService rpcService = new TestingRpcService(configuration);
+			final MockResourceManagerRuntimeServices rmServices = new MockResourceManagerRuntimeServices(rpcService, TIMEOUT_TIME, slotManager);
+
+			final ActiveResourceManager<ResourceID> activeResourceManager = new ActiveResourceManager<>(
+					driver,
+					configuration,
+					rpcService,
+					ResourceID.generate(),
+					rmServices.highAvailabilityServices,
+					rmServices.heartbeatServices,
+					rmServices.slotManager,
+					NoOpResourceManagerPartitionTracker::get,
+					rmServices.jobLeaderIdService,
+					new ClusterInformation("localhost", 1234),
+					fatalErrorHandler,
+					UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
+
+			activeResourceManager.start();
+			rmServices.grantLeadership();
+
+			return activeResourceManager;
+		}
+
+		public void runInMainThread(Runnable runnable) {
+			resourceManager.handleInMainThread(runnable);
+		}
+
+		public <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
+			return resourceManager.runInMainThread(callable, TIMEOUT_TIME);
+		}
+
+		CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceID resourceID) throws Exception {
+			final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+					.createTestingTaskExecutorGateway();
+			((TestingRpcService) resourceManager.getRpcService()).registerGateway(resourceID.toString(), taskExecutorGateway);
+
+			final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
+					resourceID.toString(),
+					resourceID,
+					1234,
+					new HardwareDescription(1, 2L, 3L, 4L),
+					ResourceProfile.ZERO,
+					ResourceProfile.ZERO);
+
+			return runInMainThread(() -> resourceManager.registerTaskExecutor(taskExecutorRegistration, TIMEOUT_TIME))
+					.get(TIMEOUT_SEC, TimeUnit.SECONDS);

Review comment:
       We could avoid running something in the main thread by using the `ResourceManagerGateway.registerTaskExecutor` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org