You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/30 11:52:19 UTC

[flink] 02/02: [FLINK-13241][yarn/mesos] Fix Yarn/MesosResourceManager setting managed memory size into wrong configuration instance.

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b4697f7e41acfb6b76bb7a67d9d19bc577a023c5
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sat Jul 13 08:30:15 2019 +0800

    [FLINK-13241][yarn/mesos] Fix Yarn/MesosResourceManager setting managed memory size into wrong configuration instance.
    
    [FLINK-13241][yarn][test] Update YarnResourceManagerTest#testCreateSlotsPerWorker to compute tmCalculatedResourceProfile based on the RM altered configuration.
    
    [FLINK-13241][yarn][test] Update YarnConfigurationITCase to verify that TMs are started with correct managed memory size.
    
    [FLINK-13241][runtime] Calculating and set managed memory size outside of ResourceManager.
    
    [FLINK-13241][rumtime/yarn][test] Move YarnResourceManagerTest#testCreateSlotsPerWorker to ResourceManagerTest#testCreateWorkerSlotProfiles, and update to verify slot profile calculation with determinate managed memory size.
    
    [FLINK-13241][runtime] Move getResourceManagerConfiguration from ResourceManagerFactory to ResourceManagerUtil.
    
    This closes #9105.
---
 .../clusterframework/MesosResourceManager.java     |  8 +---
 ...tDispatcherResourceManagerComponentFactory.java |  4 +-
 .../runtime/resourcemanager/ResourceManager.java   | 18 ++-------
 .../flink/runtime/util/ResourceManagerUtil.java    | 46 +++++++++++++++++++++
 .../resourcemanager/ResourceManagerTest.java       | 39 ++++++++++++++----
 .../apache/flink/yarn/YarnConfigurationITCase.java | 12 ++++++
 .../org/apache/flink/yarn/YarnResourceManager.java |  9 +----
 .../apache/flink/yarn/YarnResourceManagerTest.java | 47 ----------------------
 8 files changed, 101 insertions(+), 82 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index ad0ed58..0bb8d41 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -42,7 +42,6 @@ import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -184,8 +183,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		this.mesosServices = Preconditions.checkNotNull(mesosServices);
 		this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
 
-		// copy the config, because we might change it for the TaskManagers
-		this.flinkConfig = new Configuration(Preconditions.checkNotNull(flinkConfig));
+		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
 		this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
 
 		this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
@@ -198,9 +196,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		this.workersInLaunch = new HashMap<>(8);
 		this.workersBeingReturned = new HashMap<>(8);
 
-		final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
-		this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(
-			flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
+		this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig);
 		setFailUnfulfillableRequest(true);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 163b039..8b2d9c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.ResourceManagerUtil;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -167,8 +168,9 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
 				hostname,
 				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
 
+			Configuration resourceManagerConfig = ResourceManagerUtil.getResourceManagerConfiguration(configuration);
 			resourceManager = resourceManagerFactory.createResourceManager(
-				configuration,
+				resourceManagerConfig,
 				ResourceID.generate(),
 				rpcService,
 				highAvailabilityServices,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3ee321f..1a77a98 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -23,10 +23,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
@@ -1203,22 +1203,12 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	//  Helper methods
 	// ------------------------------------------------------------------------
 
-	@VisibleForTesting
-	public static Collection<ResourceProfile> updateTaskManagerConfigAndCreateWorkerSlotProfiles(
-			Configuration config, long totalMemoryMB, int numSlots) {
-
-		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
-		final long processMemoryBytes = (totalMemoryMB - cutoffMB) << 20; // megabytes to bytes
-		final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(config, processMemoryBytes);
-
-		updateFlinkConfForManagedMemory(config, managedMemoryBytes);
+	public static Collection<ResourceProfile> createWorkerSlotProfiles(Configuration config) {
+		final int numSlots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+		final long managedMemoryBytes = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes();
 
 		final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
 		return Collections.nCopies(numSlots, resourceProfile);
 	}
-
-	static void updateFlinkConfForManagedMemory(Configuration conf, long managedMemorySize) {
-		conf.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemorySize + "b");
-	}
 }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
new file mode 100644
index 0000000..7a65336
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+
+/**
+ * Utils for ResourceManager.
+ */
+public class ResourceManagerUtil {
+
+	public static Configuration getResourceManagerConfiguration(Configuration flinkConfig) {
+		final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
+		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfig, taskManagerMemoryMB);
+		final long processMemoryBytes = (taskManagerMemoryMB - cutoffMB) << 20; // megabytes to bytes
+		final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(flinkConfig, processMemoryBytes);
+
+		final Configuration resourceManagerConfig = new Configuration(flinkConfig);
+		resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryBytes + "b");
+
+		return resourceManagerConfig;
+	}
+
+	private ResourceManagerUtil() {
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 95e43fc..a9ce427 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -20,7 +20,11 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.HardwareDescription;
@@ -38,6 +42,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -46,7 +51,6 @@ import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -60,6 +64,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -151,12 +156,12 @@ public class ResourceManagerTest extends TestLogger {
 
 		TaskManagerInfo taskManagerInfo = taskManagerInfoFuture.get();
 
-		Assert.assertEquals(taskManagerId, taskManagerInfo.getResourceId());
-		Assert.assertEquals(hardwareDescription, taskManagerInfo.getHardwareDescription());
-		Assert.assertEquals(taskExecutorGateway.getAddress(), taskManagerInfo.getAddress());
-		Assert.assertEquals(dataPort, taskManagerInfo.getDataPort());
-		Assert.assertEquals(0, taskManagerInfo.getNumberSlots());
-		Assert.assertEquals(0, taskManagerInfo.getNumberAvailableSlots());
+		assertEquals(taskManagerId, taskManagerInfo.getResourceId());
+		assertEquals(hardwareDescription, taskManagerInfo.getHardwareDescription());
+		assertEquals(taskExecutorGateway.getAddress(), taskManagerInfo.getAddress());
+		assertEquals(dataPort, taskManagerInfo.getDataPort());
+		assertEquals(0, taskManagerInfo.getNumberSlots());
+		assertEquals(0, taskManagerInfo.getNumberAvailableSlots());
 	}
 
 	private void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, ResourceID taskExecutorId, String taskExecutorAddress) throws Exception {
@@ -270,4 +275,24 @@ public class ResourceManagerTest extends TestLogger {
 
 		return resourceManager;
 	}
+
+	/**
+	 * Tests that RM and TM create the same slot resource profiles.
+	 */
+	@Test
+	public void testCreateWorkerSlotProfiles() {
+		final Configuration config = new Configuration();
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "100m");
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5);
+
+		final ResourceProfile rmCalculatedResourceProfile =
+			ResourceManager.createWorkerSlotProfiles(config).iterator().next();
+
+		final ResourceProfile tmCalculatedResourceProfile =
+			TaskManagerServices.computeSlotResourceProfile(
+				config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
+				MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes());
+
+		assertEquals(rmCalculatedResourceProfile, tmCalculatedResourceProfile);
+	}
 }
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 86b0052..63ff2b2 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -25,8 +25,10 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.RestClient;
@@ -37,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ResourceManagerUtil;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -187,6 +190,10 @@ public class YarnConfigurationITCase extends YarnTestBase {
 					assertThat(
 						(double) taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / (double) expectedHeadSize,
 						is(closeTo(1.0, 0.15)));
+
+					final int expectedManagedMemoryMB = calculateManagedMemorySizeMB(configuration);
+
+					assertThat((int) (taskManagerInfo.getHardwareDescription().getSizeOfManagedMemory() >> 20), is(expectedManagedMemoryMB));
 				} finally {
 					restClient.shutdown(TIMEOUT);
 					clusterClient.shutdown();
@@ -208,4 +215,9 @@ public class YarnConfigurationITCase extends YarnTestBase {
 			return taskManagerInfo.getNumberSlots() > 0;
 		}
 	}
+
+	private static int calculateManagedMemorySizeMB(Configuration configuration) {
+		Configuration resourceManagerConfig = ResourceManagerUtil.getResourceManagerConfiguration(configuration);
+		return MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 1bb29cb..e9a1f2d 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -158,7 +158,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			clusterInformation,
 			fatalErrorHandler,
 			jobManagerMetricGroup);
-		this.flinkConfig  = new Configuration(flinkConfig); // copy, because we alter the config
+		this.flinkConfig  = flinkConfig;
 		this.yarnConfig = new YarnConfiguration();
 		this.env = env;
 		this.workerNodeMap = new ConcurrentHashMap<>();
@@ -184,7 +184,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
 		this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
 
-		this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
+		this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig);
 		setFailUnfulfillableRequest(true);
 	}
 
@@ -323,11 +323,6 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		return resource;
 	}
 
-	@VisibleForTesting
-	Collection<ResourceProfile> getSlotsPerWorker() {
-		return slotsPerWorker;
-	}
-
 	@Override
 	public boolean stopWorker(final YarnWorkerNode workerNode) {
 		final Container container = workerNode.getContainer();
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 19b2f67..df7d85b 100755
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -21,10 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -51,7 +48,6 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.RunnableWithException;
@@ -484,47 +480,4 @@ public class YarnResourceManagerTest extends TestLogger {
 			});
 		}};
 	}
-
-	/**
-	 * Tests that RM and TM calculate same slot resource profile.
-	 */
-	@Test
-	public void testCreateSlotsPerWorker() throws Exception {
-		testCreateSlotsPerWorker(flinkConfig, Resource.newInstance(500, 100));
-
-		Configuration config1 = new Configuration();
-		config1.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5);
-		testCreateSlotsPerWorker(config1, Resource.newInstance(1000, 10));
-
-		Configuration config2 = new Configuration();
-		config2.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "789m");
-		testCreateSlotsPerWorker(config2,  Resource.newInstance(800, 50));
-
-		Configuration config3 = new Configuration();
-		config3.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "300m");
-		config3.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
-		testCreateSlotsPerWorker(config3,  Resource.newInstance(2000, 60));
-
-		Configuration config4 = new Configuration();
-		config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "10m");
-		config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "10m");
-		config4.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
-		testCreateSlotsPerWorker(config4,  Resource.newInstance(1000, 1));
-	}
-
-	private void testCreateSlotsPerWorker(Configuration config, Resource resource) throws  Exception {
-		new Context(config) {{
-			runTest(() -> {
-
-				ResourceProfile rmCalculatedResourceProfile = resourceManager.getSlotsPerWorker().iterator().next();
-
-				ResourceProfile tmCalculatedResourceProfile =
-					TaskManagerServices.computeSlotResourceProfile(
-						config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
-						MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes());
-
-				assertEquals(rmCalculatedResourceProfile, tmCalculatedResourceProfile);
-			});
-		}};
-	}
 }