You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/07 21:56:10 UTC

[flink] 19/21: Fix yarn cut off

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f24495c0df56473c850605d9e34baf2f7464be1
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Nov 6 09:59:32 2019 +0100

    Fix yarn cut off
---
 .../client/deployment/ClusterSpecification.java    |   6 +-
 .../clusterframework/TaskExecutorResourceSpec.java |   4 +
 .../TaskExecutorResourceUtils.java                 |   4 +-
 .../ActiveResourceManagerFactory.java              |  23 +---
 .../ActiveResourceManagerFactoryTest.java          |  97 --------------
 .../flink/yarn/CliFrontendRunWithYarnTest.java     |   3 +-
 .../apache/flink/yarn/YarnConfigurationITCase.java |  28 +---
 .../flink/yarn/YarnClusterClientFactory.java       |   6 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  18 +--
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |   2 +-
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 142 +++++++--------------
 11 files changed, 73 insertions(+), 260 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 72975d8..0d8d105 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.deployment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 
 /**
  * Description of the cluster to start by the {@link ClusterDescriptor}.
@@ -68,7 +69,10 @@ public final class ClusterSpecification {
 		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
 		int jobManagerMemoryMb = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
-		int taskManagerMemoryMb = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+		int taskManagerMemoryMb = TaskExecutorResourceUtils
+			.resourceSpecFromConfig(configuration)
+			.getTotalProcessMemorySize()
+			.getMebiBytes();
 
 		return new ClusterSpecificationBuilder()
 			.setMasterMemoryMB(jobManagerMemoryMb)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
index d6cbe5b..d73e7b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
@@ -156,6 +156,10 @@ public class TaskExecutorResourceSpec implements java.io.Serializable {
 		return getTotalFlinkMemorySize().add(jvmMetaspaceSize).add(jvmOverheadSize);
 	}
 
+	public MemorySize getHeapSize() {
+		return frameworkHeapSize.add(taskHeapSize).add(onHeapManagedMemorySize);
+	}
+
 	@Override
 	public String toString() {
 		return "TaskExecutorResourceSpec {"
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 4b649e4..9c69b62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -46,9 +46,7 @@ public class TaskExecutorResourceUtils {
 	// ------------------------------------------------------------------------
 
 	public static String generateJvmParametersStr(final TaskExecutorResourceSpec taskExecutorResourceSpec) {
-		final MemorySize jvmHeapSize = taskExecutorResourceSpec.getFrameworkHeapSize()
-			.add(taskExecutorResourceSpec.getTaskHeapSize())
-			.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+		final MemorySize jvmHeapSize = taskExecutorResourceSpec.getHeapSize();
 		final MemorySize jvmDirectSize = taskExecutorResourceSpec.getTaskOffHeapSize()
 			.add(taskExecutorResourceSpec.getShuffleMemSize());
 		final MemorySize jvmMetaspaceSize = taskExecutorResourceSpec.getJvmMetaspaceSize();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
index d292e5a..7444e23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
@@ -19,9 +19,6 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -30,17 +27,11 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 
 import javax.annotation.Nullable;
 
 /**
  * Resource manager factory which creates active {@link ResourceManager} implementations.
- *
- * <p>The default implementation will call {@link #createActiveResourceManagerConfiguration}
- * to create a new configuration which is configured with active resource manager relevant
- * configuration options.
- *
  * @param <T> type of the {@link ResourceIDRetrievable}
  */
 public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievable> implements ResourceManagerFactory<T> {
@@ -57,7 +48,7 @@ public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievab
 			@Nullable String webInterfaceUrl,
 			ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
 		return createActiveResourceManager(
-			createActiveResourceManagerConfiguration(configuration),
+			configuration,
 			resourceId,
 			rpcService,
 			highAvailabilityServices,
@@ -68,18 +59,6 @@ public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievab
 			resourceManagerMetricGroup);
 	}
 
-	public static Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
-		final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(originalConfiguration).getMebiBytes();
-		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(originalConfiguration, taskManagerMemoryMB);
-		final long processMemoryBytes = (taskManagerMemoryMB - cutoffMB) << 20; // megabytes to bytes
-		final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(originalConfiguration, processMemoryBytes);
-
-		final Configuration resourceManagerConfig = new Configuration(originalConfiguration);
-		resourceManagerConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, managedMemoryBytes + "b");
-
-		return resourceManagerConfig;
-	}
-
 	protected abstract ResourceManager<T> createActiveResourceManager(
 		Configuration configuration,
 		ResourceID resourceId,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
deleted file mode 100644
index ff61e65..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-/**
- * Tests for the {@link ActiveResourceManagerFactory}.
- */
-public class ActiveResourceManagerFactoryTest extends TestLogger {
-
-	/**
-	 * Test which ensures that the {@link ActiveResourceManagerFactory} sets the correct managed
-	 * memory when creating a resource manager.
-	 */
-	@Test
-	public void createResourceManager_WithDefaultConfiguration_ShouldSetManagedMemory() throws Exception {
-		final Configuration configuration = new Configuration();
-
-		final TestingActiveResourceManagerFactory resourceManagerFactory = new TestingActiveResourceManagerFactory();
-
-		final TestingRpcService rpcService = new TestingRpcService();
-
-		try {
-			final ResourceManager<ResourceID> ignored = resourceManagerFactory.createResourceManager(
-				configuration,
-				ResourceID.generate(),
-				rpcService,
-				new TestingHighAvailabilityServices(),
-				new TestingHeartbeatServices(),
-				new TestingFatalErrorHandler(),
-				new ClusterInformation("foobar", 1234),
-				null,
-				UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
-		} finally {
-			RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
-		}
-	}
-
-	private static final class TestingActiveResourceManagerFactory extends ActiveResourceManagerFactory<ResourceID> {
-
-		@Override
-		protected ResourceManager<ResourceID> createActiveResourceManager(
-				Configuration configuration,
-				ResourceID resourceId,
-				RpcService rpcService,
-				HighAvailabilityServices highAvailabilityServices,
-				HeartbeatServices heartbeatServices,
-				FatalErrorHandler fatalErrorHandler,
-				ClusterInformation clusterInformation,
-				@Nullable String webInterfaceUrl,
-				ResourceManagerMetricGroup resourceManagerMetricGroup) {
-			assertThat(configuration.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE), is(true));
-
-			return null;
-		}
-	}
-}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index 4e7ece3..4c83115 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -38,6 +38,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
+import static org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
 
@@ -66,7 +67,7 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 	public void testRun() throws Exception {
 		String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath();
 
-		Configuration configuration = new Configuration();
+		Configuration configuration = adjustMemoryConfigurationForLocalExecution(new Configuration());
 		configuration.setString(JobManagerOptions.ADDRESS, "localhost");
 		configuration.setInteger(JobManagerOptions.PORT, 8081);
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index dbad597..36f90e4 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -24,13 +24,11 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -84,8 +82,11 @@ public class YarnConfigurationITCase extends YarnTestBase {
 			final YarnClient yarnClient = getYarnClient();
 			final Configuration configuration = new Configuration(flinkConfiguration);
 
+			final TaskExecutorResourceSpec spec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
 			final int masterMemory = 64;
-			final int taskManagerMemory = 512;
+			final int taskManagerMemory = spec.getTotalProcessMemorySize().getMebiBytes();
+			final long expectedHeapBytes = spec.getHeapSize().getBytes();
+			final int expectedManagedMemoryMB = spec.getManagedMemorySize().getMebiBytes();
 			final int slotsPerTaskManager = 3;
 
 			// disable heap cutoff min
@@ -175,23 +176,13 @@ public class YarnConfigurationITCase extends YarnTestBase {
 
 					assertThat(taskManagerInfo.getNumberSlots(), is(slotsPerTaskManager));
 
-					final ContaineredTaskManagerParameters containeredTaskManagerParameters = ContaineredTaskManagerParameters.create(
-						configuration,
-						null,
-						taskManagerMemory,
-						slotsPerTaskManager);
-
-					final long expectedHeadSize = containeredTaskManagerParameters.taskManagerHeapSizeMB() << 20L;
-
 					// We compare here physical memory assigned to a container with the heap memory that we should pass to
 					// jvm as Xmx parameter. Those value might differ significantly due to system page size or jvm
 					// implementation therefore we use 15% threshold here.
 					assertThat(
-						(double) taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / (double) expectedHeadSize,
+						(double) taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / expectedHeapBytes,
 						is(closeTo(1.0, 0.15)));
 
-					final int expectedManagedMemoryMB = calculateManagedMemorySizeMB(configuration);
-
 					assertThat((int) (taskManagerInfo.getHardwareDescription().getSizeOfManagedMemory() >> 20), is(expectedManagedMemoryMB));
 				} finally {
 					restClient.shutdown(TIMEOUT);
@@ -214,9 +205,4 @@ public class YarnConfigurationITCase extends YarnTestBase {
 			return taskManagerInfo.getNumberSlots() > 0;
 		}
 	}
-
-	private static int calculateManagedMemorySizeMB(Configuration configuration) {
-		Configuration resourceManagerConfig = ActiveResourceManagerFactory.createActiveResourceManagerConfiguration(configuration);
-		return MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE)).getMebiBytes();
-	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index aa138a7..6ea7d63 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -70,7 +71,10 @@ public class YarnClusterClientFactory implements ClusterClientFactory<Applicatio
 		final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
 
 		// Task Managers memory
-		final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+		final int taskManagerMemoryMB = TaskExecutorResourceUtils
+			.resourceSpecFromConfig(configuration)
+			.getTotalProcessMemorySize()
+			.getMebiBytes();
 
 		int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 74207ff..14005f4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -40,11 +40,10 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.plugin.PluginConfig;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
@@ -458,16 +457,11 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 	 * @throws FlinkException if the cluster cannot be started with the provided {@link ClusterSpecification}
 	 */
 	private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException {
+		flinkConfiguration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, clusterSpecification.getTaskManagerMemoryMB() + "m");
 		try {
-			final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
-			// We do the validation by calling the calculation methods here
-			// Internally these methods will check whether the cluster can be started with the provided
-			// ClusterSpecification and the configured memory requirements
-			final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
-			TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
+			TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfiguration);
 		} catch (IllegalArgumentException iae) {
-			throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided " +
-					"cluster specification. Please increase the memory of the cluster.", iae);
+			throw new FlinkException("Inconsistent cluster specification.", iae);
 		}
 	}
 
@@ -834,10 +828,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 				TaskManagerOptions.NUM_TASK_SLOTS,
 				clusterSpecification.getSlotsPerTaskManager());
 
-		configuration.setString(
-				TaskManagerOptions.TOTAL_PROCESS_MEMORY,
-				clusterSpecification.getTaskManagerMemoryMB() + "m");
-
 		// Upload the flink configuration
 		// write out configuration file
 		File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 0efa610..0e7df5b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -374,7 +374,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 			if (!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {
 				tmMemoryVal += "m";
 			}
-			effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, tmMemoryVal);
+			effectiveConfiguration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, tmMemoryVal);
 		}
 
 		if (commandLine.hasOption(slots.getOpt())) {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index a357126..9376533 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -107,11 +107,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		String[] params =
 			new String[] {"-ys", "3"};
 
-		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCliWithTmTotalMemory(1024);
 
 		final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
 
@@ -129,11 +125,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		String[] params =
 			new String[] {"-yd"};
 
-		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
 
 		final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
 
@@ -151,11 +143,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		String[] params = new String[] {"-yz", zkNamespaceCliInput};
 
-		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
 
 		CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
 
@@ -172,11 +160,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		String[] params = new String[] {"-ynl", nodeLabelCliInput };
 
-		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
 
 		CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
 
@@ -198,11 +182,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final Configuration configuration = new Configuration();
 		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
 
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
 
@@ -225,21 +205,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final Configuration configuration = new Configuration();
 		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
 
-		new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		createFlinkYarnSessionCli(configuration);
 	}
 
 	@Test
 	public void testResumeFromYarnID() throws Exception {
-		final Configuration configuration = new Configuration();
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
 
@@ -252,12 +223,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 	@Test
 	public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
-		final Configuration configuration = new Configuration();
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
 
@@ -273,12 +239,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 	@Test
 	public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception {
-		final Configuration configuration = new Configuration();
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 
 		final String overrideZkNamespace = "my_cluster";
 
@@ -300,11 +261,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final Configuration configuration = new Configuration();
 		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
 
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
 
 		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -326,15 +283,11 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final int slotsPerTaskManager = 30;
 
 		configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
-		configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory + "m");
+		configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, taskManagerMemory + "m");
 		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
 
 		final String[] args = {"-yjm", String.valueOf(jobManagerMemory) + "m", "-ytm", String.valueOf(taskManagerMemory) + "m", "-ys", String.valueOf(slotsPerTaskManager)};
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
 
 		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -357,16 +310,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final int jobManagerMemory = 1337;
 		configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
 		final int taskManagerMemory = 7331;
-		configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory + "m");
+		configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, taskManagerMemory + "m");
 		final int slotsPerTaskManager = 42;
 		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
 
 		final String[] args = {};
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
 
 		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -385,11 +334,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 	@Test
 	public void testHeapMemoryPropertyWithoutUnit() throws Exception {
 		final String[] args = new String[] { "-yjm", "1024", "-ytm", "2048" };
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -407,11 +352,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 	@Test
 	public void testHeapMemoryPropertyWithUnitMB() throws Exception {
 		final String[] args = new String[] { "-yjm", "1024m", "-ytm", "2048m" };
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
 		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -428,11 +369,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 	@Test
 	public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception {
 		final String[] args = new String[] { "-yjm", "1g", "-ytm", "2g" };
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
 		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -452,11 +389,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
 		configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
 
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			configuration,
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
 
@@ -469,15 +402,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 	}
 
 	/**
-	 * Tests the specifying heap memory with config default value for job manager and task manager.
+	 * Tests the specifying job manager heap memory with config default value for job manager and task manager.
 	 */
 	@Test
-	public void testHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+	public void testJobManagerHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
+		int totalMemomory = 1024;
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCliWithTmTotalMemory(totalMemomory);
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
 
@@ -485,18 +415,14 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
 		final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
 
-		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
-		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(1024));
+		assertThat(clusterSpecification.getMasterMemoryMB(), is(totalMemomory));
+		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(totalMemomory));
 	}
 
 	@Test
 	public void testMultipleYarnShipOptions() throws Exception {
 		final String[] args = new String[]{"run", "--yarnship", tmp.newFolder().getAbsolutePath(), "--yarnship", tmp.newFolder().getAbsolutePath()};
-		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
-			new Configuration(),
-			tmp.getRoot().getAbsolutePath(),
-			"y",
-			"yarn");
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -527,4 +453,22 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		return tmpFolder.getAbsoluteFile();
 	}
+
+	private FlinkYarnSessionCli createFlinkYarnSessionCli() throws FlinkException {
+		return createFlinkYarnSessionCli(new Configuration());
+	}
+
+	private FlinkYarnSessionCli createFlinkYarnSessionCliWithTmTotalMemory(int totalMemomory) throws FlinkException {
+		Configuration configuration = new Configuration();
+		configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemomory + "m");
+		return createFlinkYarnSessionCli(configuration);
+	}
+
+	private FlinkYarnSessionCli createFlinkYarnSessionCli(Configuration configuration) throws FlinkException {
+		return new FlinkYarnSessionCli(
+			configuration,
+			tmp.getRoot().getAbsolutePath(),
+			"y",
+			"yarn");
+	}
 }