You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/05 19:02:39 UTC

[flink] branch release-1.9 updated: [FLINK-13579] Only set managed memory when starting an active RM

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 789c605  [FLINK-13579] Only set managed memory when starting an active RM
789c605 is described below

commit 789c605d16b97222f2e0213ac5ad60da50901d09
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Aug 5 11:20:22 2019 +0200

    [FLINK-13579] Only set managed memory when starting an active RM
    
    Introduce ActiveResourceManagerFactory which encapsulates the logic to set relevant
    configuration values for the active ResourceManager implementations (e.g. managed
    memory).
    
    Let existing active ResourceManager factory implementations extend ActiveResourceManagerFactory.
    
    Add ActiveResourceManagerFactoryTest to ensure that active ResourceManager relevant
    configuration values are set.
    
    Add StandaloneResourceManagerFactoryTest to ensure that a standalone ResourceManager can be
    started if the memory size is configured to be less than the containered min cutoff size.
    
    This closes #9358.
---
 .../MesosResourceManagerFactory.java               |   5 +-
 ...tDispatcherResourceManagerComponentFactory.java |   4 +-
 .../ActiveResourceManagerFactory.java              |  97 ++++++++++++++++++++
 .../flink/runtime/util/ResourceManagerUtil.java    |  46 ----------
 .../ActiveResourceManagerFactoryTest.java          | 101 +++++++++++++++++++++
 .../StandaloneResourceManagerFactoryTest.java      |  70 ++++++++++++++
 .../apache/flink/yarn/YarnConfigurationITCase.java |   4 +-
 .../yarn/entrypoint/YarnJobClusterEntrypoint.java  |   2 +-
 .../entrypoint/YarnResourceManagerFactory.java     |  14 ++-
 .../entrypoint/YarnSessionClusterEntrypoint.java   |   2 +-
 10 files changed, 287 insertions(+), 58 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
index 534b0a5..5b76740 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -41,7 +42,7 @@ import javax.annotation.Nullable;
 /**
  * {@link ResourceManagerFactory} which creates a {@link MesosResourceManager}.
  */
-public class MesosResourceManagerFactory implements ResourceManagerFactory<RegisteredMesosWorkerNode> {
+public class MesosResourceManagerFactory extends ActiveResourceManagerFactory<RegisteredMesosWorkerNode> {
 
 	@Nonnull
 	private final MesosServices mesosServices;
@@ -63,7 +64,7 @@ public class MesosResourceManagerFactory implements ResourceManagerFactory<Regis
 	}
 
 	@Override
-	public ResourceManager<RegisteredMesosWorkerNode> createResourceManager(
+	public ResourceManager<RegisteredMesosWorkerNode> createActiveResourceManager(
 			Configuration configuration,
 			ResourceID resourceId,
 			RpcService rpcService,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 8b2d9c6..163b039 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.util.ResourceManagerUtil;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -168,9 +167,8 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
 				hostname,
 				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
 
-			Configuration resourceManagerConfig = ResourceManagerUtil.getResourceManagerConfiguration(configuration);
 			resourceManager = resourceManagerFactory.createResourceManager(
-				resourceManagerConfig,
+				configuration,
 				ResourceID.generate(),
 				rpcService,
 				highAvailabilityServices,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
new file mode 100644
index 0000000..9f90106
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+
+import javax.annotation.Nullable;
+
+/**
+ * Resource manager factory which creates active {@link ResourceManager} implementations.
+ *
+ * <p>The default implementation will call {@link #createActiveResourceManagerConfiguration}
+ * to create a new configuration which is configured with active resource manager relevant
+ * configuration options.
+ *
+ * @param <T> type of the {@link ResourceIDRetrievable}
+ */
+public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievable> implements ResourceManagerFactory<T> {
+
+	@Override
+	public ResourceManager<T> createResourceManager(
+			Configuration configuration,
+			ResourceID resourceId,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			FatalErrorHandler fatalErrorHandler,
+			ClusterInformation clusterInformation,
+			@Nullable String webInterfaceUrl,
+			JobManagerMetricGroup jobManagerMetricGroup) throws Exception {
+		return createActiveResourceManager(
+			createActiveResourceManagerConfiguration(configuration),
+			resourceId,
+			rpcService,
+			highAvailabilityServices,
+			heartbeatServices,
+			metricRegistry,
+			fatalErrorHandler,
+			clusterInformation,
+			webInterfaceUrl,
+			jobManagerMetricGroup);
+	}
+
+	public static Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
+		final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(originalConfiguration).getMebiBytes();
+		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(originalConfiguration, taskManagerMemoryMB);
+		final long processMemoryBytes = (taskManagerMemoryMB - cutoffMB) << 20; // megabytes to bytes
+		final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(originalConfiguration, processMemoryBytes);
+
+		final Configuration resourceManagerConfig = new Configuration(originalConfiguration);
+		resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryBytes + "b");
+
+		return resourceManagerConfig;
+	}
+
+	protected abstract ResourceManager<T> createActiveResourceManager(
+		Configuration configuration,
+		ResourceID resourceId,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		HeartbeatServices heartbeatServices,
+		MetricRegistry metricRegistry,
+		FatalErrorHandler fatalErrorHandler,
+		ClusterInformation clusterInformation,
+		@Nullable String webInterfaceUrl,
+		JobManagerMetricGroup jobManagerMetricGroup) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
deleted file mode 100644
index 7a65336..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
-
-/**
- * Utils for ResourceManager.
- */
-public class ResourceManagerUtil {
-
-	public static Configuration getResourceManagerConfiguration(Configuration flinkConfig) {
-		final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
-		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfig, taskManagerMemoryMB);
-		final long processMemoryBytes = (taskManagerMemoryMB - cutoffMB) << 20; // megabytes to bytes
-		final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(flinkConfig, processMemoryBytes);
-
-		final Configuration resourceManagerConfig = new Configuration(flinkConfig);
-		resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryBytes + "b");
-
-		return resourceManagerConfig;
-	}
-
-	private ResourceManagerUtil() {
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
new file mode 100644
index 0000000..b1874e0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for the {@link ActiveResourceManagerFactory}.
+ */
+public class ActiveResourceManagerFactoryTest extends TestLogger {
+
+	/**
+	 * Test which ensures that the {@link ActiveResourceManagerFactory} sets the correct managed
+	 * memory when creating a resource manager.
+	 */
+	@Test
+	public void createResourceManager_WithDefaultConfiguration_ShouldSetManagedMemory() throws Exception {
+		final Configuration configuration = new Configuration();
+
+		final TestingActiveResourceManagerFactory resourceManagerFactory = new TestingActiveResourceManagerFactory();
+
+		final TestingRpcService rpcService = new TestingRpcService();
+
+		try {
+			final ResourceManager<ResourceID> ignored = resourceManagerFactory.createResourceManager(
+				configuration,
+				ResourceID.generate(),
+				rpcService,
+				new TestingHighAvailabilityServices(),
+				new TestingHeartbeatServices(),
+				NoOpMetricRegistry.INSTANCE,
+				new TestingFatalErrorHandler(),
+				new ClusterInformation("foobar", 1234),
+				null,
+				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
+		} finally {
+			RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
+		}
+	}
+
+	private static final class TestingActiveResourceManagerFactory extends ActiveResourceManagerFactory<ResourceID> {
+
+		@Override
+		protected ResourceManager<ResourceID> createActiveResourceManager(
+				Configuration configuration,
+				ResourceID resourceId,
+				RpcService rpcService,
+				HighAvailabilityServices highAvailabilityServices,
+				HeartbeatServices heartbeatServices,
+				MetricRegistry metricRegistry,
+				FatalErrorHandler fatalErrorHandler,
+				ClusterInformation clusterInformation,
+				@Nullable String webInterfaceUrl,
+				JobManagerMetricGroup jobManagerMetricGroup) {
+			assertThat(configuration.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE), is(true));
+
+			return null;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactoryTest.java
new file mode 100644
index 0000000..9e64d48
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactoryTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+/**
+ * Tests for the {@link StandaloneResourceManagerFactory}.
+ */
+public class StandaloneResourceManagerFactoryTest extends TestLogger {
+
+	@Test
+	public void createResourceManager_WithLessMemoryThanContainerizedHeapCutoffMin_ShouldSucceed() throws Exception {
+		final StandaloneResourceManagerFactory resourceManagerFactory = StandaloneResourceManagerFactory.INSTANCE;
+
+		final TestingRpcService rpcService = new TestingRpcService();
+		try {
+			final Configuration configuration = new Configuration();
+			configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, new MemorySize(128 * 1024 * 1024).toString());
+			configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 600);
+
+			final ResourceManager<ResourceID> ignored = resourceManagerFactory.createResourceManager(
+				configuration,
+				ResourceID.generate(),
+				rpcService,
+				new TestingHighAvailabilityServices(),
+				new TestingHeartbeatServices(),
+				NoOpMetricRegistry.INSTANCE,
+				new TestingFatalErrorHandler(),
+				new ClusterInformation("foobar", 1234),
+				null,
+				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
+		} finally {
+			RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
+		}
+	}
+
+}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 63ff2b2..a567a6d 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -39,7 +40,6 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.ResourceManagerUtil;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -217,7 +217,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
 	}
 
 	private static int calculateManagedMemorySizeMB(Configuration configuration) {
-		Configuration resourceManagerConfig = ResourceManagerUtil.getResourceManagerConfiguration(configuration);
+		Configuration resourceManagerConfig = ActiveResourceManagerFactory.createActiveResourceManagerConfiguration(configuration);
 		return MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
 	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 42e666e..15de5a8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -64,7 +64,7 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 	@Override
 	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
 		return new JobDispatcherResourceManagerComponentFactory(
-			YarnResourceManagerFactory.INSTANCE,
+			YarnResourceManagerFactory.getInstance(),
 			FileJobGraphRetriever.createFrom(configuration));
 	}
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
index 24bce10..312bb41 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -39,11 +40,18 @@ import javax.annotation.Nullable;
 /**
  * {@link ResourceManagerFactory} implementation which creates a {@link YarnResourceManager}.
  */
-public enum YarnResourceManagerFactory implements ResourceManagerFactory<YarnWorkerNode> {
-	INSTANCE;
+public class YarnResourceManagerFactory extends ActiveResourceManagerFactory<YarnWorkerNode> {
+
+	private static final YarnResourceManagerFactory INSTANCE = new YarnResourceManagerFactory();
+
+	private YarnResourceManagerFactory() {}
+
+	public static YarnResourceManagerFactory getInstance() {
+		return INSTANCE;
+	}
 
 	@Override
-	public ResourceManager<YarnWorkerNode> createResourceManager(
+	public ResourceManager<YarnWorkerNode> createActiveResourceManager(
 			Configuration configuration,
 			ResourceID resourceId,
 			RpcService rpcService,
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 0f4656e..c22c548 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -61,7 +61,7 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 
 	@Override
 	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
-		return new SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.INSTANCE);
+		return new SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.getInstance());
 	}
 
 	public static void main(String[] args) {