You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/07 21:56:10 UTC
[flink] 19/21: Fix yarn cut off
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1f24495c0df56473c850605d9e34baf2f7464be1
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Nov 6 09:59:32 2019 +0100
Fix yarn cut off
---
.../client/deployment/ClusterSpecification.java | 6 +-
.../clusterframework/TaskExecutorResourceSpec.java | 4 +
.../TaskExecutorResourceUtils.java | 4 +-
.../ActiveResourceManagerFactory.java | 23 +---
.../ActiveResourceManagerFactoryTest.java | 97 --------------
.../flink/yarn/CliFrontendRunWithYarnTest.java | 3 +-
.../apache/flink/yarn/YarnConfigurationITCase.java | 28 +---
.../flink/yarn/YarnClusterClientFactory.java | 6 +-
.../apache/flink/yarn/YarnClusterDescriptor.java | 18 +--
.../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 2 +-
.../apache/flink/yarn/FlinkYarnSessionCliTest.java | 142 +++++++--------------
11 files changed, 73 insertions(+), 260 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 72975d8..0d8d105 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.deployment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
/**
* Description of the cluster to start by the {@link ClusterDescriptor}.
@@ -68,7 +69,10 @@ public final class ClusterSpecification {
int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
int jobManagerMemoryMb = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
- int taskManagerMemoryMb = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+ int taskManagerMemoryMb = TaskExecutorResourceUtils
+ .resourceSpecFromConfig(configuration)
+ .getTotalProcessMemorySize()
+ .getMebiBytes();
return new ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMb)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
index d6cbe5b..d73e7b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
@@ -156,6 +156,10 @@ public class TaskExecutorResourceSpec implements java.io.Serializable {
return getTotalFlinkMemorySize().add(jvmMetaspaceSize).add(jvmOverheadSize);
}
+ public MemorySize getHeapSize() {
+ return frameworkHeapSize.add(taskHeapSize).add(onHeapManagedMemorySize);
+ }
+
@Override
public String toString() {
return "TaskExecutorResourceSpec {"
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 4b649e4..9c69b62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -46,9 +46,7 @@ public class TaskExecutorResourceUtils {
// ------------------------------------------------------------------------
public static String generateJvmParametersStr(final TaskExecutorResourceSpec taskExecutorResourceSpec) {
- final MemorySize jvmHeapSize = taskExecutorResourceSpec.getFrameworkHeapSize()
- .add(taskExecutorResourceSpec.getTaskHeapSize())
- .add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+ final MemorySize jvmHeapSize = taskExecutorResourceSpec.getHeapSize();
final MemorySize jvmDirectSize = taskExecutorResourceSpec.getTaskOffHeapSize()
.add(taskExecutorResourceSpec.getShuffleMemSize());
final MemorySize jvmMetaspaceSize = taskExecutorResourceSpec.getJvmMetaspaceSize();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
index d292e5a..7444e23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
@@ -19,9 +19,6 @@
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -30,17 +27,11 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import javax.annotation.Nullable;
/**
* Resource manager factory which creates active {@link ResourceManager} implementations.
- *
- * <p>The default implementation will call {@link #createActiveResourceManagerConfiguration}
- * to create a new configuration which is configured with active resource manager relevant
- * configuration options.
- *
* @param <T> type of the {@link ResourceIDRetrievable}
*/
public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievable> implements ResourceManagerFactory<T> {
@@ -57,7 +48,7 @@ public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievab
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
return createActiveResourceManager(
- createActiveResourceManagerConfiguration(configuration),
+ configuration,
resourceId,
rpcService,
highAvailabilityServices,
@@ -68,18 +59,6 @@ public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievab
resourceManagerMetricGroup);
}
- public static Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
- final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(originalConfiguration).getMebiBytes();
- final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(originalConfiguration, taskManagerMemoryMB);
- final long processMemoryBytes = (taskManagerMemoryMB - cutoffMB) << 20; // megabytes to bytes
- final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(originalConfiguration, processMemoryBytes);
-
- final Configuration resourceManagerConfig = new Configuration(originalConfiguration);
- resourceManagerConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, managedMemoryBytes + "b");
-
- return resourceManagerConfig;
- }
-
protected abstract ResourceManager<T> createActiveResourceManager(
Configuration configuration,
ResourceID resourceId,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
deleted file mode 100644
index ff61e65..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-/**
- * Tests for the {@link ActiveResourceManagerFactory}.
- */
-public class ActiveResourceManagerFactoryTest extends TestLogger {
-
- /**
- * Test which ensures that the {@link ActiveResourceManagerFactory} sets the correct managed
- * memory when creating a resource manager.
- */
- @Test
- public void createResourceManager_WithDefaultConfiguration_ShouldSetManagedMemory() throws Exception {
- final Configuration configuration = new Configuration();
-
- final TestingActiveResourceManagerFactory resourceManagerFactory = new TestingActiveResourceManagerFactory();
-
- final TestingRpcService rpcService = new TestingRpcService();
-
- try {
- final ResourceManager<ResourceID> ignored = resourceManagerFactory.createResourceManager(
- configuration,
- ResourceID.generate(),
- rpcService,
- new TestingHighAvailabilityServices(),
- new TestingHeartbeatServices(),
- new TestingFatalErrorHandler(),
- new ClusterInformation("foobar", 1234),
- null,
- UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
- } finally {
- RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
- }
- }
-
- private static final class TestingActiveResourceManagerFactory extends ActiveResourceManagerFactory<ResourceID> {
-
- @Override
- protected ResourceManager<ResourceID> createActiveResourceManager(
- Configuration configuration,
- ResourceID resourceId,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- FatalErrorHandler fatalErrorHandler,
- ClusterInformation clusterInformation,
- @Nullable String webInterfaceUrl,
- ResourceManagerMetricGroup resourceManagerMetricGroup) {
- assertThat(configuration.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE), is(true));
-
- return null;
- }
- }
-}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index 4e7ece3..4c83115 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -38,6 +38,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
+import static org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
@@ -66,7 +67,7 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
public void testRun() throws Exception {
String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath();
- Configuration configuration = new Configuration();
+ Configuration configuration = adjustMemoryConfigurationForLocalExecution(new Configuration());
configuration.setString(JobManagerOptions.ADDRESS, "localhost");
configuration.setInteger(JobManagerOptions.PORT, 8081);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index dbad597..36f90e4 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -24,13 +24,11 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -84,8 +82,11 @@ public class YarnConfigurationITCase extends YarnTestBase {
final YarnClient yarnClient = getYarnClient();
final Configuration configuration = new Configuration(flinkConfiguration);
+ final TaskExecutorResourceSpec spec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
final int masterMemory = 64;
- final int taskManagerMemory = 512;
+ final int taskManagerMemory = spec.getTotalProcessMemorySize().getMebiBytes();
+ final long expectedHeapBytes = spec.getHeapSize().getBytes();
+ final int expectedManagedMemoryMB = spec.getManagedMemorySize().getMebiBytes();
final int slotsPerTaskManager = 3;
// disable heap cutoff min
@@ -175,23 +176,13 @@ public class YarnConfigurationITCase extends YarnTestBase {
assertThat(taskManagerInfo.getNumberSlots(), is(slotsPerTaskManager));
- final ContaineredTaskManagerParameters containeredTaskManagerParameters = ContaineredTaskManagerParameters.create(
- configuration,
- null,
- taskManagerMemory,
- slotsPerTaskManager);
-
- final long expectedHeadSize = containeredTaskManagerParameters.taskManagerHeapSizeMB() << 20L;
-
// We compare here physical memory assigned to a container with the heap memory that we should pass to
// jvm as Xmx parameter. Those value might differ significantly due to system page size or jvm
// implementation therefore we use 15% threshold here.
assertThat(
- (double) taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / (double) expectedHeadSize,
+ (double) taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / expectedHeapBytes,
is(closeTo(1.0, 0.15)));
- final int expectedManagedMemoryMB = calculateManagedMemorySizeMB(configuration);
-
assertThat((int) (taskManagerInfo.getHardwareDescription().getSizeOfManagedMemory() >> 20), is(expectedManagedMemoryMB));
} finally {
restClient.shutdown(TIMEOUT);
@@ -214,9 +205,4 @@ public class YarnConfigurationITCase extends YarnTestBase {
return taskManagerInfo.getNumberSlots() > 0;
}
}
-
- private static int calculateManagedMemorySizeMB(Configuration configuration) {
- Configuration resourceManagerConfig = ActiveResourceManagerFactory.createActiveResourceManagerConfiguration(configuration);
- return MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE)).getMebiBytes();
- }
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index aa138a7..6ea7d63 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -70,7 +71,10 @@ public class YarnClusterClientFactory implements ClusterClientFactory<Applicatio
final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
// Task Managers memory
- final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+ final int taskManagerMemoryMB = TaskExecutorResourceUtils
+ .resourceSpecFromConfig(configuration)
+ .getTotalProcessMemorySize()
+ .getMebiBytes();
int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 74207ff..14005f4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -40,11 +40,10 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.plugin.PluginConfig;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
@@ -458,16 +457,11 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
* @throws FlinkException if the cluster cannot be started with the provided {@link ClusterSpecification}
*/
private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException {
+ flinkConfiguration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, clusterSpecification.getTaskManagerMemoryMB() + "m");
try {
- final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
- // We do the validation by calling the calculation methods here
- // Internally these methods will check whether the cluster can be started with the provided
- // ClusterSpecification and the configured memory requirements
- final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
- TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
+ TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfiguration);
} catch (IllegalArgumentException iae) {
- throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided " +
- "cluster specification. Please increase the memory of the cluster.", iae);
+ throw new FlinkException("Inconsistent cluster specification.", iae);
}
}
@@ -834,10 +828,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
TaskManagerOptions.NUM_TASK_SLOTS,
clusterSpecification.getSlotsPerTaskManager());
- configuration.setString(
- TaskManagerOptions.TOTAL_PROCESS_MEMORY,
- clusterSpecification.getTaskManagerMemoryMB() + "m");
-
// Upload the flink configuration
// write out configuration file
File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 0efa610..0e7df5b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -374,7 +374,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
if (!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {
tmMemoryVal += "m";
}
- effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, tmMemoryVal);
+ effectiveConfiguration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, tmMemoryVal);
}
if (commandLine.hasOption(slots.getOpt())) {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index a357126..9376533 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -107,11 +107,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
String[] params =
new String[] {"-ys", "3"};
- FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
- new Configuration(),
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCliWithTmTotalMemory(1024);
final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
@@ -129,11 +125,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
String[] params =
new String[] {"-yd"};
- FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
- new Configuration(),
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
@@ -151,11 +143,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
String[] params = new String[] {"-yz", zkNamespaceCliInput};
- FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
- new Configuration(),
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
@@ -172,11 +160,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
String[] params = new String[] {"-ynl", nodeLabelCliInput };
- FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
- new Configuration(),
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
@@ -198,11 +182,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
@@ -225,21 +205,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
- new FlinkYarnSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ createFlinkYarnSessionCli(configuration);
}
@Test
public void testResumeFromYarnID() throws Exception {
- final Configuration configuration = new Configuration();
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
@@ -252,12 +223,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
@Test
public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
- final Configuration configuration = new Configuration();
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
@@ -273,12 +239,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
@Test
public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception {
- final Configuration configuration = new Configuration();
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
final String overrideZkNamespace = "my_cluster";
@@ -300,11 +261,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -326,15 +283,11 @@ public class FlinkYarnSessionCliTest extends TestLogger {
final int slotsPerTaskManager = 30;
configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
- configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory + "m");
+ configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, taskManagerMemory + "m");
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
final String[] args = {"-yjm", String.valueOf(jobManagerMemory) + "m", "-ytm", String.valueOf(taskManagerMemory) + "m", "-ys", String.valueOf(slotsPerTaskManager)};
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
@@ -357,16 +310,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
final int jobManagerMemory = 1337;
configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
final int taskManagerMemory = 7331;
- configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory + "m");
+ configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, taskManagerMemory + "m");
final int slotsPerTaskManager = 42;
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
final String[] args = {};
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
@@ -385,11 +334,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
@Test
public void testHeapMemoryPropertyWithoutUnit() throws Exception {
final String[] args = new String[] { "-yjm", "1024", "-ytm", "2048" };
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- new Configuration(),
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
@@ -407,11 +352,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
@Test
public void testHeapMemoryPropertyWithUnitMB() throws Exception {
final String[] args = new String[] { "-yjm", "1024m", "-ytm", "2048m" };
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- new Configuration(),
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -428,11 +369,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
@Test
public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception {
final String[] args = new String[] { "-yjm", "1g", "-ytm", "2g" };
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- new Configuration(),
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -452,11 +389,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
@@ -469,15 +402,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
}
/**
- * Tests the specifying heap memory with config default value for job manager and task manager.
+ * Tests the specifying job manager heap memory with config default value for job manager and task manager.
*/
@Test
- public void testHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- new Configuration(),
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ public void testJobManagerHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
+ int totalMemomory = 1024;
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCliWithTmTotalMemory(totalMemomory);
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
@@ -485,18 +415,14 @@ public class FlinkYarnSessionCliTest extends TestLogger {
final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
- assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
- assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(1024));
+ assertThat(clusterSpecification.getMasterMemoryMB(), is(totalMemomory));
+ assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(totalMemomory));
}
@Test
public void testMultipleYarnShipOptions() throws Exception {
final String[] args = new String[]{"run", "--yarnship", tmp.newFolder().getAbsolutePath(), "--yarnship", tmp.newFolder().getAbsolutePath()};
- final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
- new Configuration(),
- tmp.getRoot().getAbsolutePath(),
- "y",
- "yarn");
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
@@ -527,4 +453,22 @@ public class FlinkYarnSessionCliTest extends TestLogger {
return tmpFolder.getAbsoluteFile();
}
+
+ private FlinkYarnSessionCli createFlinkYarnSessionCli() throws FlinkException {
+ return createFlinkYarnSessionCli(new Configuration());
+ }
+
+ private FlinkYarnSessionCli createFlinkYarnSessionCliWithTmTotalMemory(int totalMemomory) throws FlinkException {
+ Configuration configuration = new Configuration();
+ configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemomory + "m");
+ return createFlinkYarnSessionCli(configuration);
+ }
+
+ private FlinkYarnSessionCli createFlinkYarnSessionCli(Configuration configuration) throws FlinkException {
+ return new FlinkYarnSessionCli(
+ configuration,
+ tmp.getRoot().getAbsolutePath(),
+ "y",
+ "yarn");
+ }
}