You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/30 11:52:19 UTC
[flink] 02/02: [FLINK-13241][yarn/mesos] Fix
Yarn/MesosResourceManager setting managed memory size into wrong
configuration instance.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b4697f7e41acfb6b76bb7a67d9d19bc577a023c5
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sat Jul 13 08:30:15 2019 +0800
[FLINK-13241][yarn/mesos] Fix Yarn/MesosResourceManager setting managed memory size into wrong configuration instance.
[FLINK-13241][yarn][test] Update YarnResourceManagerTest#testCreateSlotsPerWorker to compute tmCalculatedResourceProfile based on the RM altered configuration.
[FLINK-13241][yarn][test] Update YarnConfigurationITCase to verify that TMs are started with correct managed memory size.
[FLINK-13241][runtime] Calculating and set managed memory size outside of ResourceManager.
[FLINK-13241][rumtime/yarn][test] Move YarnResourceManagerTest#testCreateSlotsPerWorker to ResourceManagerTest#testCreateWorkerSlotProfiles, and update to verify slot profile calculation with determinate managed memory size.
[FLINK-13241][runtime] Move getResourceManagerConfiguration from ResourceManagerFactory to ResourceManagerUtil.
This closes #9105.
---
.../clusterframework/MesosResourceManager.java | 8 +---
...tDispatcherResourceManagerComponentFactory.java | 4 +-
.../runtime/resourcemanager/ResourceManager.java | 18 ++-------
.../flink/runtime/util/ResourceManagerUtil.java | 46 +++++++++++++++++++++
.../resourcemanager/ResourceManagerTest.java | 39 ++++++++++++++----
.../apache/flink/yarn/YarnConfigurationITCase.java | 12 ++++++
.../org/apache/flink/yarn/YarnResourceManager.java | 9 +----
.../apache/flink/yarn/YarnResourceManagerTest.java | 47 ----------------------
8 files changed, 101 insertions(+), 82 deletions(-)
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index ad0ed58..0bb8d41 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -42,7 +42,6 @@ import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -184,8 +183,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
this.mesosServices = Preconditions.checkNotNull(mesosServices);
this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
- // copy the config, because we might change it for the TaskManagers
- this.flinkConfig = new Configuration(Preconditions.checkNotNull(flinkConfig));
+ this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
@@ -198,9 +196,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
this.workersInLaunch = new HashMap<>(8);
this.workersBeingReturned = new HashMap<>(8);
- final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
- this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(
- flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
+ this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig);
setFailUnfulfillableRequest(true);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 163b039..8b2d9c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.ResourceManagerUtil;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -167,8 +168,9 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend
hostname,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+ Configuration resourceManagerConfig = ResourceManagerUtil.getResourceManagerConfiguration(configuration);
resourceManager = resourceManagerFactory.createResourceManager(
- configuration,
+ resourceManagerConfig,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3ee321f..1a77a98 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -23,10 +23,10 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
@@ -1203,22 +1203,12 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
// Helper methods
// ------------------------------------------------------------------------
- @VisibleForTesting
- public static Collection<ResourceProfile> updateTaskManagerConfigAndCreateWorkerSlotProfiles(
- Configuration config, long totalMemoryMB, int numSlots) {
-
- final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
- final long processMemoryBytes = (totalMemoryMB - cutoffMB) << 20; // megabytes to bytes
- final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(config, processMemoryBytes);
-
- updateFlinkConfForManagedMemory(config, managedMemoryBytes);
+ public static Collection<ResourceProfile> createWorkerSlotProfiles(Configuration config) {
+ final int numSlots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+ final long managedMemoryBytes = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes();
final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
return Collections.nCopies(numSlots, resourceProfile);
}
-
- static void updateFlinkConfForManagedMemory(Configuration conf, long managedMemorySize) {
- conf.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemorySize + "b");
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
new file mode 100644
index 0000000..7a65336
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+
+/**
+ * Utils for ResourceManager.
+ */
+public class ResourceManagerUtil {
+
+ public static Configuration getResourceManagerConfiguration(Configuration flinkConfig) {
+ final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
+ final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfig, taskManagerMemoryMB);
+ final long processMemoryBytes = (taskManagerMemoryMB - cutoffMB) << 20; // megabytes to bytes
+ final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(flinkConfig, processMemoryBytes);
+
+ final Configuration resourceManagerConfig = new Configuration(flinkConfig);
+ resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryBytes + "b");
+
+ return resourceManagerConfig;
+ }
+
+ private ResourceManagerUtil() {
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 95e43fc..a9ce427 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -20,7 +20,11 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
@@ -38,6 +42,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -46,7 +51,6 @@ import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -60,6 +64,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/**
@@ -151,12 +156,12 @@ public class ResourceManagerTest extends TestLogger {
TaskManagerInfo taskManagerInfo = taskManagerInfoFuture.get();
- Assert.assertEquals(taskManagerId, taskManagerInfo.getResourceId());
- Assert.assertEquals(hardwareDescription, taskManagerInfo.getHardwareDescription());
- Assert.assertEquals(taskExecutorGateway.getAddress(), taskManagerInfo.getAddress());
- Assert.assertEquals(dataPort, taskManagerInfo.getDataPort());
- Assert.assertEquals(0, taskManagerInfo.getNumberSlots());
- Assert.assertEquals(0, taskManagerInfo.getNumberAvailableSlots());
+ assertEquals(taskManagerId, taskManagerInfo.getResourceId());
+ assertEquals(hardwareDescription, taskManagerInfo.getHardwareDescription());
+ assertEquals(taskExecutorGateway.getAddress(), taskManagerInfo.getAddress());
+ assertEquals(dataPort, taskManagerInfo.getDataPort());
+ assertEquals(0, taskManagerInfo.getNumberSlots());
+ assertEquals(0, taskManagerInfo.getNumberAvailableSlots());
}
private void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, ResourceID taskExecutorId, String taskExecutorAddress) throws Exception {
@@ -270,4 +275,24 @@ public class ResourceManagerTest extends TestLogger {
return resourceManager;
}
+
+ /**
+ * Tests that RM and TM create the same slot resource profiles.
+ */
+ @Test
+ public void testCreateWorkerSlotProfiles() {
+ final Configuration config = new Configuration();
+ config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "100m");
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5);
+
+ final ResourceProfile rmCalculatedResourceProfile =
+ ResourceManager.createWorkerSlotProfiles(config).iterator().next();
+
+ final ResourceProfile tmCalculatedResourceProfile =
+ TaskManagerServices.computeSlotResourceProfile(
+ config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
+ MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes());
+
+ assertEquals(rmCalculatedResourceProfile, tmCalculatedResourceProfile);
+ }
}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 86b0052..63ff2b2 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -25,8 +25,10 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.RestClient;
@@ -37,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ResourceManagerUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -187,6 +190,10 @@ public class YarnConfigurationITCase extends YarnTestBase {
assertThat(
(double) taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / (double) expectedHeadSize,
is(closeTo(1.0, 0.15)));
+
+ final int expectedManagedMemoryMB = calculateManagedMemorySizeMB(configuration);
+
+ assertThat((int) (taskManagerInfo.getHardwareDescription().getSizeOfManagedMemory() >> 20), is(expectedManagedMemoryMB));
} finally {
restClient.shutdown(TIMEOUT);
clusterClient.shutdown();
@@ -208,4 +215,9 @@ public class YarnConfigurationITCase extends YarnTestBase {
return taskManagerInfo.getNumberSlots() > 0;
}
}
+
+ private static int calculateManagedMemorySizeMB(Configuration configuration) {
+ Configuration resourceManagerConfig = ResourceManagerUtil.getResourceManagerConfiguration(configuration);
+ return MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+ }
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 1bb29cb..e9a1f2d 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -158,7 +158,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
clusterInformation,
fatalErrorHandler,
jobManagerMetricGroup);
- this.flinkConfig = new Configuration(flinkConfig); // copy, because we alter the config
+ this.flinkConfig = flinkConfig;
this.yarnConfig = new YarnConfiguration();
this.env = env;
this.workerNodeMap = new ConcurrentHashMap<>();
@@ -184,7 +184,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
- this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
+ this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig);
setFailUnfulfillableRequest(true);
}
@@ -323,11 +323,6 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
return resource;
}
- @VisibleForTesting
- Collection<ResourceProfile> getSlotsPerWorker() {
- return slotsPerWorker;
- }
-
@Override
public boolean stopWorker(final YarnWorkerNode workerNode) {
final Container container = workerNode.getContainer();
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 19b2f67..df7d85b 100755
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -21,10 +21,7 @@ package org.apache.flink.yarn;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -51,7 +48,6 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
@@ -484,47 +480,4 @@ public class YarnResourceManagerTest extends TestLogger {
});
}};
}
-
- /**
- * Tests that RM and TM calculate same slot resource profile.
- */
- @Test
- public void testCreateSlotsPerWorker() throws Exception {
- testCreateSlotsPerWorker(flinkConfig, Resource.newInstance(500, 100));
-
- Configuration config1 = new Configuration();
- config1.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5);
- testCreateSlotsPerWorker(config1, Resource.newInstance(1000, 10));
-
- Configuration config2 = new Configuration();
- config2.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "789m");
- testCreateSlotsPerWorker(config2, Resource.newInstance(800, 50));
-
- Configuration config3 = new Configuration();
- config3.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "300m");
- config3.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
- testCreateSlotsPerWorker(config3, Resource.newInstance(2000, 60));
-
- Configuration config4 = new Configuration();
- config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "10m");
- config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "10m");
- config4.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
- testCreateSlotsPerWorker(config4, Resource.newInstance(1000, 1));
- }
-
- private void testCreateSlotsPerWorker(Configuration config, Resource resource) throws Exception {
- new Context(config) {{
- runTest(() -> {
-
- ResourceProfile rmCalculatedResourceProfile = resourceManager.getSlotsPerWorker().iterator().next();
-
- ResourceProfile tmCalculatedResourceProfile =
- TaskManagerServices.computeSlotResourceProfile(
- config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
- MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes());
-
- assertEquals(rmCalculatedResourceProfile, tmCalculatedResourceProfile);
- });
- }};
- }
}