You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/08 13:34:03 UTC

[flink] branch master updated (3bcd47f -> ddda636)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3bcd47f  [FLINK-12968][table-common] Add an utility for finding a common type from a set of types
     new a8c0fb5  [hotfix][runtime] Compare native memory in ResourceProfile#equals().
     new c8b93b3  [FLINK-12812][runtime] Set resource profiles for task slots
     new 5a05693  [hotfix] [runtime] Improve slot profile computation in TaskManagerServices
     new 92fe83f  [hotfix] [runtime] Split heap/managed memory computation into smaller reusable methods
     new 44ff144  [FLINK-12812] [runtime] (follow-up) Fix bytes/megabytes mixup in managed memory for slot resource profile
     new 494e12d  [FLINK-12812] [runtime] (follow-up) Deduplicate managed memory configuration logic between ResourceManager and TaskManagerServices
     new 21d7503  [FLINK-12812] [runtime] (follow-up) Consolidate profile/memory configuration logic in ResourceManagers.
     new bfd4543  [FLINK-12812] [runtime] (follow-up) Test refers to slot profile computed in ResourceManager.
     new 662c82a  [hotfix] [core] Simplify access to default values TM/JM memory sizes.
     new ddda636  [hotfix] [yarn] Various minor code style fixes in YarnResourceManagerTest

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/api/common/operators/ResourceSpec.java   |  33 +++++-
 .../flink/configuration/ConfigurationUtils.java    |   4 +-
 ...TaskManagerHeapSizeCalculationJavaBashTest.java |  23 ++--
 .../clusterframework/MesosResourceManager.java     |   6 +-
 .../clusterframework/types/ResourceProfile.java    |  33 +++++-
 .../runtime/resourcemanager/ResourceManager.java   |  22 +++-
 .../runtime/taskexecutor/TaskManagerServices.java  | 121 ++++++++++++++-------
 .../ContaineredTaskManagerParametersTest.java      |  22 +++-
 .../types/ResourceProfileTest.java                 |  27 ++++-
 .../NettyShuffleEnvironmentConfigurationTest.java  |   2 +-
 .../org/apache/flink/yarn/YarnResourceManager.java |   9 +-
 .../apache/flink/yarn/YarnResourceManagerTest.java |  71 ++++++++++--
 12 files changed, 277 insertions(+), 96 deletions(-)
 mode change 100644 => 100755 flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 mode change 100644 => 100755 flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
 mode change 100644 => 100755 flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 mode change 100644 => 100755 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 mode change 100644 => 100755 flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 mode change 100644 => 100755 flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
 mode change 100644 => 100755 flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
 mode change 100644 => 100755 flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 mode change 100644 => 100755 flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java


[flink] 01/10: [hotfix][runtime] Compare native memory in ResourceProfile#equals().

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8c0fb5d455bd2dd9911a9559d546561c8b86024
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu Jul 4 16:09:31 2019 +0800

    [hotfix][runtime] Compare native memory in ResourceProfile#equals().
---
 .../runtime/clusterframework/types/ResourceProfile.java   |  1 +
 .../clusterframework/types/ResourceProfileTest.java       | 15 +++++++++++++++
 2 files changed, 16 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index e0922c1..8137d9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -284,6 +284,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			return this.cpuCores == that.cpuCores &&
 					this.heapMemoryInMB == that.heapMemoryInMB &&
 					this.directMemoryInMB == that.directMemoryInMB &&
+					this.nativeMemoryInMB == that.nativeMemoryInMB &&
 					this.networkMemoryInMB == that.networkMemoryInMB &&
 					Objects.equals(extendedResources, that.extendedResources);
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index 6f54d7f..3a42c33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -98,6 +98,21 @@ public class ResourceProfileTest {
 				setGPUResource(2.2).
 				build();
 		assertTrue(ResourceProfile.fromResourceSpec(rs3, 100).equals(ResourceProfile.fromResourceSpec(rs5, 100)));
+
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp2 = new ResourceProfile(1.1, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp3 = new ResourceProfile(1.0, 110, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp4 = new ResourceProfile(1.0, 100, 110, 100, 100, Collections.emptyMap());
+		ResourceProfile rp5 = new ResourceProfile(1.0, 100, 100, 110, 100, Collections.emptyMap());
+		ResourceProfile rp6 = new ResourceProfile(1.0, 100, 100, 100, 110, Collections.emptyMap());
+		ResourceProfile rp7 = new ResourceProfile(1.0, 100, 100, 100, 100, Collections.emptyMap());
+
+		assertFalse(rp1.equals(rp2));
+		assertFalse(rp1.equals(rp3));
+		assertFalse(rp1.equals(rp4));
+		assertFalse(rp1.equals(rp5));
+		assertFalse(rp1.equals(rp6));
+		assertTrue(rp1.equals(rp7));
 	}
 
 	@Test


[flink] 02/10: [FLINK-12812][runtime] Set resource profiles for task slots

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c8b93b354d6d72ef3cacc9b0b9adcdb1df26e625
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu Jul 4 17:10:30 2019 +0800

    [FLINK-12812][runtime] Set resource profiles for task slots
    
      - Add managed memory in ResourceSpec and ResourceProfile.
      - Set resource profiles for task slots when starting TaskManagers.
      - Set resource profiles for pending task manager slots.
---
 .../flink/api/common/operators/ResourceSpec.java   | 33 ++++++++++--
 .../clusterframework/MesosResourceManager.java     |  7 ++-
 .../clusterframework/types/ResourceProfile.java    | 32 +++++++++---
 .../runtime/resourcemanager/ResourceManager.java   | 32 +++++++++++-
 .../runtime/taskexecutor/TaskManagerServices.java  | 20 +++++++-
 .../types/ResourceProfileTest.java                 | 28 +++++-----
 .../org/apache/flink/yarn/YarnResourceManager.java |  6 ++-
 .../apache/flink/yarn/YarnResourceManagerTest.java | 59 +++++++++++++++++++++-
 8 files changed, 185 insertions(+), 32 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 7bc2948..5b1b7b1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -43,6 +43,7 @@ import java.util.Objects;
  *     <li>Direct Memory Size</li>
  *     <li>Native Memory Size</li>
  *     <li>State Size</li>
+ *     <li>Managed Memory Size</li>
  *     <li>Extended resources</li>
  * </ol>
  */
@@ -51,7 +52,7 @@ public class ResourceSpec implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0);
+	public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0, 0);
 
 	public static final String GPU_NAME = "GPU";
 
@@ -70,6 +71,9 @@ public class ResourceSpec implements Serializable {
 	/** How many state size in mb are used. */
 	private final int stateSizeInMB;
 
+	/** The required amount of managed memory (in MB). */
+	private final int managedMemoryInMB;
+
 	private final Map<String, Resource> extendedResources = new HashMap<>(1);
 
 	/**
@@ -80,6 +84,7 @@ public class ResourceSpec implements Serializable {
 	 * @param directMemoryInMB The size of the java nio direct memory, in megabytes.
 	 * @param nativeMemoryInMB The size of the native memory, in megabytes.
 	 * @param stateSizeInMB The state size for storing in checkpoint.
+	 * @param managedMemoryInMB The size of managed memory, in megabytes.
 	 * @param extendedResources The extended resources, associated with the resource manager used
 	 */
 	protected ResourceSpec(
@@ -88,12 +93,14 @@ public class ResourceSpec implements Serializable {
 			int directMemoryInMB,
 			int nativeMemoryInMB,
 			int stateSizeInMB,
+			int managedMemoryInMB,
 			Resource... extendedResources) {
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
 		this.nativeMemoryInMB = nativeMemoryInMB;
 		this.stateSizeInMB = stateSizeInMB;
+		this.managedMemoryInMB = managedMemoryInMB;
 		for (Resource resource : extendedResources) {
 			if (resource != null) {
 				this.extendedResources.put(resource.getName(), resource);
@@ -114,7 +121,8 @@ public class ResourceSpec implements Serializable {
 				this.heapMemoryInMB + other.heapMemoryInMB,
 				this.directMemoryInMB + other.directMemoryInMB,
 				this.nativeMemoryInMB + other.nativeMemoryInMB,
-				this.stateSizeInMB + other.stateSizeInMB);
+				this.stateSizeInMB + other.stateSizeInMB,
+				this.managedMemoryInMB + other.managedMemoryInMB);
 		target.extendedResources.putAll(extendedResources);
 		for (Resource resource : other.extendedResources.values()) {
 			target.extendedResources.merge(resource.getName(), resource, (v1, v2) -> v1.merge(v2));
@@ -142,6 +150,10 @@ public class ResourceSpec implements Serializable {
 		return this.stateSizeInMB;
 	}
 
+	public int getManagedMemory() {
+		return this.managedMemoryInMB;
+	}
+
 	public double getGPUResource() {
 		Resource gpuResource = extendedResources.get(GPU_NAME);
 		if (gpuResource != null) {
@@ -162,7 +174,7 @@ public class ResourceSpec implements Serializable {
 	 */
 	public boolean isValid() {
 		if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
-				this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) {
+				this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0 && managedMemoryInMB >= 0) {
 			for (Resource resource : extendedResources.values()) {
 				if (resource.getValue() < 0) {
 					return false;
@@ -187,7 +199,8 @@ public class ResourceSpec implements Serializable {
 		int cmp3 = Integer.compare(this.directMemoryInMB, other.directMemoryInMB);
 		int cmp4 = Integer.compare(this.nativeMemoryInMB, other.nativeMemoryInMB);
 		int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB);
-		if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) {
+		int cmp6 = Integer.compare(this.managedMemoryInMB, other.managedMemoryInMB);
+		if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0 && cmp6 <= 0) {
 			for (Resource resource : extendedResources.values()) {
 				if (!other.extendedResources.containsKey(resource.getName()) ||
 					other.extendedResources.get(resource.getName()).getResourceAggregateType() != resource.getResourceAggregateType() ||
@@ -211,6 +224,7 @@ public class ResourceSpec implements Serializable {
 					this.directMemoryInMB == that.directMemoryInMB &&
 					this.nativeMemoryInMB == that.nativeMemoryInMB &&
 					this.stateSizeInMB == that.stateSizeInMB &&
+					this.managedMemoryInMB == that.managedMemoryInMB &&
 					Objects.equals(this.extendedResources, that.extendedResources);
 		} else {
 			return false;
@@ -225,6 +239,7 @@ public class ResourceSpec implements Serializable {
 		result = 31 * result + directMemoryInMB;
 		result = 31 * result + nativeMemoryInMB;
 		result = 31 * result + stateSizeInMB;
+		result = 31 * result + managedMemoryInMB;
 		result = 31 * result + extendedResources.hashCode();
 		return result;
 	}
@@ -240,7 +255,8 @@ public class ResourceSpec implements Serializable {
 				", heapMemoryInMB=" + heapMemoryInMB +
 				", directMemoryInMB=" + directMemoryInMB +
 				", nativeMemoryInMB=" + nativeMemoryInMB +
-				", stateSizeInMB=" + stateSizeInMB + extend +
+				", stateSizeInMB=" + stateSizeInMB +
+				", managedMemoryInMB=" + managedMemoryInMB + extend +
 				'}';
 	}
 
@@ -258,6 +274,7 @@ public class ResourceSpec implements Serializable {
 		private int directMemoryInMB;
 		private int nativeMemoryInMB;
 		private int stateSizeInMB;
+		private int managedMemoryInMB;
 		private GPUResource gpuResource;
 
 		public Builder setCpuCores(double cpuCores) {
@@ -285,6 +302,11 @@ public class ResourceSpec implements Serializable {
 			return this;
 		}
 
+		public Builder setManagedMemoryInMB(int managedMemory) {
+			this.managedMemoryInMB = managedMemory;
+			return this;
+		}
+
 		public Builder setGPUResource(double gpus) {
 			this.gpuResource = new GPUResource(gpus);
 			return this;
@@ -297,6 +319,7 @@ public class ResourceSpec implements Serializable {
 				directMemoryInMB,
 				nativeMemoryInMB,
 				stateSizeInMB,
+				managedMemoryInMB,
 				gpuResource);
 		}
 	}
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 04cf7cc..3be156e 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
@@ -198,7 +199,11 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		this.workersBeingReturned = new HashMap<>(8);
 
 		final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
-		this.slotsPerWorker = createSlotsPerWorker(containeredTaskManagerParameters.numSlots());
+		this.slotsPerWorker = createSlotsPerWorker(flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
+
+		// set the exact managed memory size into configuration, to make sure TMs will derive the same size
+		final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
+		this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
 	}
 
 	protected ActorRef createSelfActor() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 8137d9f..21eb9b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -51,7 +51,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1);
 
 	/** ResourceProfile which matches any other ResourceProfile. */
-	public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
+	public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
 
 	// ------------------------------------------------------------------------
 
@@ -70,6 +70,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	/** Memory used for the task in the slot to communicate with its upstreams. Set by job master. */
 	private final int networkMemoryInMB;
 
+	/** The required amount of managed memory (in MB). */
+	private final int managedMemoryInMB;
+
 	/** A extensible field for user specified resources from {@link ResourceSpec}. */
 	private final Map<String, Resource> extendedResources = new HashMap<>(1);
 
@@ -83,6 +86,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @param directMemoryInMB The size of the direct memory, in megabytes.
 	 * @param nativeMemoryInMB The size of the native memory, in megabytes.
 	 * @param networkMemoryInMB The size of the memory for input and output, in megabytes.
+	 * @param managedMemoryInMB The size of managed memory, in megabytes.
 	 * @param extendedResources The extended resources such as GPU and FPGA
 	 */
 	public ResourceProfile(
@@ -91,12 +95,14 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			int directMemoryInMB,
 			int nativeMemoryInMB,
 			int networkMemoryInMB,
+			int managedMemoryInMB,
 			Map<String, Resource> extendedResources) {
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
 		this.nativeMemoryInMB = nativeMemoryInMB;
 		this.networkMemoryInMB = networkMemoryInMB;
+		this.managedMemoryInMB = managedMemoryInMB;
 		if (extendedResources != null) {
 			this.extendedResources.putAll(extendedResources);
 		}
@@ -109,7 +115,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @param heapMemoryInMB The size of the heap memory, in megabytes.
 	 */
 	public ResourceProfile(double cpuCores, int heapMemoryInMB) {
-		this(cpuCores, heapMemoryInMB, 0, 0, 0, Collections.emptyMap());
+		this(cpuCores, heapMemoryInMB, 0, 0, 0, 0, Collections.emptyMap());
 	}
 
 	/**
@@ -123,6 +129,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 				other.directMemoryInMB,
 				other.nativeMemoryInMB,
 				other.networkMemoryInMB,
+				other.managedMemoryInMB,
 				other.extendedResources);
 	}
 
@@ -173,12 +180,20 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	/**
+	 * Get the managed memory needed in MB.
+	 * @return The managed memory in MB.
+	 */
+	public int getManagedMemoryInMB() {
+		return managedMemoryInMB;
+	}
+
+	/**
 	 * Get the total memory needed in MB.
 	 *
 	 * @return The total memory in MB
 	 */
 	public int getMemoryInMB() {
-		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + networkMemoryInMB;
+		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + networkMemoryInMB + managedMemoryInMB;
 	}
 
 	/**
@@ -187,7 +202,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @return The operator memory in MB
 	 */
 	public int getOperatorsMemoryInMB() {
-		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
+		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + managedMemoryInMB;
 	}
 
 	/**
@@ -215,7 +230,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 				heapMemoryInMB >= required.getHeapMemoryInMB() &&
 				directMemoryInMB >= required.getDirectMemoryInMB() &&
 				nativeMemoryInMB >= required.getNativeMemoryInMB() &&
-				networkMemoryInMB >= required.getNetworkMemoryInMB()) {
+				networkMemoryInMB >= required.getNetworkMemoryInMB() &&
+				managedMemoryInMB >= required.getManagedMemoryInMB()) {
 			for (Map.Entry<String, Resource> resource : required.extendedResources.entrySet()) {
 				if (!extendedResources.containsKey(resource.getKey()) ||
 						!extendedResources.get(resource.getKey()).getResourceAggregateType().equals(resource.getValue().getResourceAggregateType()) ||
@@ -270,6 +286,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 		result = 31 * result + directMemoryInMB;
 		result = 31 * result + nativeMemoryInMB;
 		result = 31 * result + networkMemoryInMB;
+		result = 31 * result + managedMemoryInMB;
 		result = 31 * result + extendedResources.hashCode();
 		return result;
 	}
@@ -286,6 +303,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 					this.directMemoryInMB == that.directMemoryInMB &&
 					this.nativeMemoryInMB == that.nativeMemoryInMB &&
 					this.networkMemoryInMB == that.networkMemoryInMB &&
+					this.managedMemoryInMB == that.managedMemoryInMB &&
 					Objects.equals(extendedResources, that.extendedResources);
 		}
 		return false;
@@ -302,7 +320,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			", heapMemoryInMB=" + heapMemoryInMB +
 			", directMemoryInMB=" + directMemoryInMB +
 			", nativeMemoryInMB=" + nativeMemoryInMB +
-			", networkMemoryInMB=" + networkMemoryInMB + resources +
+			", networkMemoryInMB=" + networkMemoryInMB +
+			", managedMemoryInMB=" + managedMemoryInMB + resources +
 			'}';
 	}
 
@@ -315,6 +334,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 				resourceSpec.getDirectMemory(),
 				resourceSpec.getNativeMemory(),
 				networkMemory,
+				resourceSpec.getManagedMemory(),
 				copiedExtendedResources);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 0ed9247..59e4c39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -22,8 +22,12 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
@@ -64,6 +68,7 @@ import org.apache.flink.runtime.taskexecutor.FileType;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -81,6 +86,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -1191,8 +1197,30 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	//  Helper methods
 	// ------------------------------------------------------------------------
 
-	protected static Collection<ResourceProfile> createSlotsPerWorker(int numSlots) {
-		return Collections.nCopies(numSlots, ResourceProfile.ANY);
+	@VisibleForTesting
+	public static Collection<ResourceProfile> createSlotsPerWorker(Configuration config, long totalMemoryMB, int numSlots) {
+		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
+
+		final long networkMB = NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(
+			(totalMemoryMB - cutoffMB) << 20, // megabytes to bytes
+			config) >> 20; // bytes to megabytes
+
+		final long managedMB;
+		if (config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
+			managedMB = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
+		} else {
+			managedMB = (long) ((totalMemoryMB - cutoffMB - networkMB) * config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION));
+		}
+
+		ResourceProfile resourceProfile = new ResourceProfile(
+			Double.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			(int) (managedMB / numSlots),
+			Collections.emptyMap());
+		return Collections.nCopies(numSlots, resourceProfile);
 	}
 }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 8de72f1..dfcd5e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -257,13 +258,15 @@ public class TaskManagerServices {
 
 		// this call has to happen strictly after the network stack has been initialized
 		final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration);
+		final long managedMemorySize = memoryManager.getMemorySize();
 
 		final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
 
-		final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
+		final int numOfSlots = taskManagerServicesConfiguration.getNumberOfSlots();
+		final List<ResourceProfile> resourceProfiles = new ArrayList<>(numOfSlots);
 
 		for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
-			resourceProfiles.add(ResourceProfile.ANY);
+			resourceProfiles.add(computeSlotResourceProfile(numOfSlots, managedMemorySize));
 		}
 
 		final TimerService<AllocationID> timerService = new TimerService<>(
@@ -507,4 +510,17 @@ public class TaskManagerServices {
 			}
 		}
 	}
+
+	@VisibleForTesting
+	public static ResourceProfile computeSlotResourceProfile(int numOfSlots, long managedMemorySize) {
+		int managedMemoryPerSlot = (int) (managedMemorySize / numOfSlots);
+		return new ResourceProfile(
+			Double.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			managedMemoryPerSlot,
+			Collections.emptyMap());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index 3a42c33..a6c5f58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -31,10 +31,10 @@ public class ResourceProfileTest {
 
 	@Test
 	public void testMatchRequirement() throws Exception {
-		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, Collections.emptyMap());
-		ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, Collections.emptyMap());
-		ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, Collections.emptyMap());
-		ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, Collections.emptyMap());
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, 0, Collections.emptyMap());
+		ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, 0, Collections.emptyMap());
+		ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, 0, Collections.emptyMap());
+		ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, 0, Collections.emptyMap());
 
 		assertFalse(rp1.isMatching(rp2));
 		assertTrue(rp2.isMatching(rp1));
@@ -50,7 +50,7 @@ public class ResourceProfileTest {
 		assertTrue(rp4.isMatching(rp3));
 		assertTrue(rp4.isMatching(rp4));
 
-		ResourceProfile rp5 = new ResourceProfile(2.0, 100, 100, 100, 100, null);
+		ResourceProfile rp5 = new ResourceProfile(2.0, 100, 100, 100, 100, 100, null);
 		assertFalse(rp4.isMatching(rp5));
 
 		ResourceSpec rs1 = ResourceSpec.newBuilder().
@@ -99,20 +99,22 @@ public class ResourceProfileTest {
 				build();
 		assertTrue(ResourceProfile.fromResourceSpec(rs3, 100).equals(ResourceProfile.fromResourceSpec(rs5, 100)));
 
-		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, Collections.emptyMap());
-		ResourceProfile rp2 = new ResourceProfile(1.1, 100, 100, 100, 100, Collections.emptyMap());
-		ResourceProfile rp3 = new ResourceProfile(1.0, 110, 100, 100, 100, Collections.emptyMap());
-		ResourceProfile rp4 = new ResourceProfile(1.0, 100, 110, 100, 100, Collections.emptyMap());
-		ResourceProfile rp5 = new ResourceProfile(1.0, 100, 100, 110, 100, Collections.emptyMap());
-		ResourceProfile rp6 = new ResourceProfile(1.0, 100, 100, 100, 110, Collections.emptyMap());
-		ResourceProfile rp7 = new ResourceProfile(1.0, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp2 = new ResourceProfile(1.1, 100, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp3 = new ResourceProfile(1.0, 110, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp4 = new ResourceProfile(1.0, 100, 110, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp5 = new ResourceProfile(1.0, 100, 100, 110, 100, 100, Collections.emptyMap());
+		ResourceProfile rp6 = new ResourceProfile(1.0, 100, 100, 100, 110, 100, Collections.emptyMap());
+		ResourceProfile rp7 = new ResourceProfile(1.0, 100, 100, 100, 100, 110, Collections.emptyMap());
+		ResourceProfile rp8 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap());
 
 		assertFalse(rp1.equals(rp2));
 		assertFalse(rp1.equals(rp3));
 		assertFalse(rp1.equals(rp4));
 		assertFalse(rp1.equals(rp5));
 		assertFalse(rp1.equals(rp6));
-		assertTrue(rp1.equals(rp7));
+		assertFalse(rp1.equals(rp7));
+		assertTrue(rp1.equals(rp8));
 	}
 
 	@Test
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 65baab5..a1321ea 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -184,7 +184,11 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
 		this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
 
-		this.slotsPerWorker = createSlotsPerWorker(numberOfTaskSlots);
+		this.slotsPerWorker = createSlotsPerWorker(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
+
+		// set the exact managed memory size into configuration, to make sure TMs will derive the same size
+		final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
+		this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
 	}
 
 	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index eb75b66..5d10a10 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -21,7 +21,10 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -42,6 +45,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -53,6 +57,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
@@ -261,6 +266,10 @@ public class YarnResourceManagerTest extends TestLogger {
 		 * Create mock RM dependencies.
 		 */
 		Context() throws Exception {
+			this(flinkConfig);
+		}
+
+		Context(Configuration configuration) throws  Exception {
 			rpcService = new TestingRpcService();
 			rmServices = new MockResourceManagerRuntimeServices();
 
@@ -271,7 +280,7 @@ public class YarnResourceManagerTest extends TestLogger {
 							rpcService,
 							RM_ADDRESS,
 							rmResourceID,
-							flinkConfig,
+							configuration,
 							env,
 							rmServices.highAvailabilityServices,
 							rmServices.heartbeatServices,
@@ -419,7 +428,7 @@ public class YarnResourceManagerTest extends TestLogger {
 				final SlotReport slotReport = new SlotReport(
 					new SlotStatus(
 						new SlotID(taskManagerResourceId, 1),
-						new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap())));
+						new ResourceProfile(10, 1, 1, 1, 0, 0, Collections.emptyMap())));
 
 				CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
 					.registerTaskExecutor(
@@ -522,4 +531,50 @@ public class YarnResourceManagerTest extends TestLogger {
 			});
 		}};
 	}
+
+	/**
+	 * Tests that RM and TM calculate same slot resource profile.
+	 * @throws Exception
+	 */
+	@Test
+	public void testCreateSlotsPerWorker() throws Exception {
+		testCreateSlotsPerWorker(flinkConfig, Resource.newInstance(500, 100));
+
+		Configuration config1 = new Configuration();
+		config1.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5);
+		testCreateSlotsPerWorker(config1, Resource.newInstance(1000, 10));
+
+		Configuration config2 = new Configuration();
+		config2.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "789m");
+		testCreateSlotsPerWorker(config2,  Resource.newInstance(800, 50));
+
+		Configuration config3 = new Configuration();
+		config3.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "300m");
+		config3.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
+		testCreateSlotsPerWorker(config3,  Resource.newInstance(2000, 60));
+
+		Configuration config4 = new Configuration();
+		config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "10m");
+		config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "10m");
+		config4.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
+		testCreateSlotsPerWorker(config4,  Resource.newInstance(1000, 1));
+	}
+
+	private void testCreateSlotsPerWorker(Configuration config, Resource resource) throws  Exception {
+		new Context(config) {{
+			runTest(() -> {
+
+				ResourceProfile rmCalculatedResourceProfile =
+					ResourceManager.createSlotsPerWorker(config, resource.getMemory(), config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
+						.iterator().next();
+
+				ResourceProfile tmCalculatedResourceProfile =
+					TaskManagerServices.computeSlotResourceProfile(
+						config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
+						MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes());
+
+				assertTrue(rmCalculatedResourceProfile.equals(tmCalculatedResourceProfile));
+			});
+		}};
+	}
 }


[flink] 06/10: [FLINK-12812] [runtime] (follow-up) Deduplicate managed memory configuration logic between ResourceManager and TaskManagerServices

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 494e12d57d4cc780749bfabe9a6bc7fec9c6887d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 18:41:18 2019 +0200

    [FLINK-12812] [runtime] (follow-up) Deduplicate managed memory configuration logic between ResourceManager and TaskManagerServices
---
 .../runtime/resourcemanager/ResourceManager.java   | 26 ++++------------------
 .../runtime/taskexecutor/TaskManagerServices.java  |  1 -
 .../apache/flink/yarn/YarnResourceManagerTest.java |  7 +++---
 3 files changed, 8 insertions(+), 26 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
old mode 100644
new mode 100755
index 59e4c39..4dfe5bc
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -68,7 +66,7 @@ import org.apache.flink.runtime.taskexecutor.FileType;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -86,7 +84,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -1200,26 +1197,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	@VisibleForTesting
 	public static Collection<ResourceProfile> createSlotsPerWorker(Configuration config, long totalMemoryMB, int numSlots) {
 		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
+		final long processMemoryBytes = (totalMemoryMB - cutoffMB) << 20; // megabytes to bytes
+		final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(config, processMemoryBytes);
 
-		final long networkMB = NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(
-			(totalMemoryMB - cutoffMB) << 20, // megabytes to bytes
-			config) >> 20; // bytes to megabytes
+		final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
 
-		final long managedMB;
-		if (config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
-			managedMB = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
-		} else {
-			managedMB = (long) ((totalMemoryMB - cutoffMB - networkMB) * config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION));
-		}
-
-		ResourceProfile resourceProfile = new ResourceProfile(
-			Double.MAX_VALUE,
-			Integer.MAX_VALUE,
-			Integer.MAX_VALUE,
-			Integer.MAX_VALUE,
-			Integer.MAX_VALUE,
-			(int) (managedMB / numSlots),
-			Collections.emptyMap());
 		return Collections.nCopies(numSlots, resourceProfile);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index d4e27e0..3db847dd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -525,7 +525,6 @@ public class TaskManagerServices {
 		}
 	}
 
-	@VisibleForTesting
 	public static ResourceProfile computeSlotResourceProfile(int numOfSlots, long managedMemorySize) {
 		int managedMemoryPerSlotMB = (int) bytesToMegabytes(managedMemorySize / numOfSlots);
 		return new ResourceProfile(
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 5d10a10..2b504c0 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -565,13 +565,14 @@ public class YarnResourceManagerTest extends TestLogger {
 			runTest(() -> {
 
 				ResourceProfile rmCalculatedResourceProfile =
-					ResourceManager.createSlotsPerWorker(config, resource.getMemory(), config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
-						.iterator().next();
+					ResourceManager.updateTaskManagerConfigAndCreateWorkerSlotProfiles(
+						config, resource.getMemory(), config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
+								.iterator().next();
 
 				ResourceProfile tmCalculatedResourceProfile =
 					TaskManagerServices.computeSlotResourceProfile(
 						config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
-						MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes());
+						MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes());
 
 				assertTrue(rmCalculatedResourceProfile.equals(tmCalculatedResourceProfile));
 			});


[flink] 07/10: [FLINK-12812] [runtime] (follow-up) Consolidate profile/memory configuration logic in ResourceManagers.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21d7503098ea1b01191f4d6c3fffb1aac4b0d45a
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 19:12:33 2019 +0200

    [FLINK-12812] [runtime] (follow-up) Consolidate profile/memory configuration logic in ResourceManagers.
---
 .../mesos/runtime/clusterframework/MesosResourceManager.java | 11 ++++-------
 .../flink/runtime/resourcemanager/ResourceManager.java       | 12 ++++++++++--
 .../main/java/org/apache/flink/yarn/YarnResourceManager.java |  8 ++------
 3 files changed, 16 insertions(+), 15 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
old mode 100644
new mode 100755
index 3be156e..e4ef99a
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -20,7 +20,6 @@ package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
@@ -185,7 +184,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		this.mesosServices = Preconditions.checkNotNull(mesosServices);
 		this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
 
-		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+		// copy the config, because we might change it for the TaskManagers
+		this.flinkConfig = new Configuration(Preconditions.checkNotNull(flinkConfig));
 		this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
 
 		this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer());
@@ -199,11 +199,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		this.workersBeingReturned = new HashMap<>(8);
 
 		final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
-		this.slotsPerWorker = createSlotsPerWorker(flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
-
-		// set the exact managed memory size into configuration, to make sure TMs will derive the same size
-		final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
-		this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
+		this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(
+			flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
 	}
 
 	protected ActorRef createSelfActor() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 4dfe5bc..b845d1d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -1195,14 +1196,21 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	// ------------------------------------------------------------------------
 
 	@VisibleForTesting
-	public static Collection<ResourceProfile> createSlotsPerWorker(Configuration config, long totalMemoryMB, int numSlots) {
+	public static Collection<ResourceProfile> updateTaskManagerConfigAndCreateWorkerSlotProfiles(
+			Configuration config, long totalMemoryMB, int numSlots) {
+
 		final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
 		final long processMemoryBytes = (totalMemoryMB - cutoffMB) << 20; // megabytes to bytes
 		final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(config, processMemoryBytes);
 
-		final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
+		updateFlinkConfForManagedMemory(config, managedMemoryBytes);
 
+		final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
 		return Collections.nCopies(numSlots, resourceProfile);
 	}
+
+	static void updateFlinkConfForManagedMemory(Configuration conf, long managedMemorySize) {
+		conf.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemorySize + "b");
+	}
 }
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
old mode 100644
new mode 100755
index a1321ea..6ebf669
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -158,7 +158,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			clusterInformation,
 			fatalErrorHandler,
 			jobManagerMetricGroup);
-		this.flinkConfig  = flinkConfig;
+		this.flinkConfig  = new Configuration(flinkConfig); // copy, because we alter the config
 		this.yarnConfig = new YarnConfiguration();
 		this.env = env;
 		this.workerNodeMap = new ConcurrentHashMap<>();
@@ -184,11 +184,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
 		this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
 
-		this.slotsPerWorker = createSlotsPerWorker(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
-
-		// set the exact managed memory size into configuration, to make sure TMs will derive the same size
-		final int managedMemoryPerWorkerMB = this.slotsPerWorker.iterator().next().getManagedMemoryInMB() * slotsPerWorker.size();
-		this.flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryPerWorkerMB + "m");
+		this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
 	}
 
 	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(


[flink] 10/10: [hotfix] [yarn] Various minor code style fixes in YarnResourceManagerTest

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ddda6369a52ed2530432362a9813bf66dbc40f3e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 17:08:27 2019 +0200

    [hotfix] [yarn] Various minor code style fixes in YarnResourceManagerTest
---
 .../apache/flink/yarn/YarnResourceManagerTest.java    | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)

diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
old mode 100644
new mode 100755
index 3e95431..df3fc7f
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -110,9 +110,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -251,13 +251,13 @@ public class YarnResourceManagerTest extends TestLogger {
 		// domain objects for test purposes
 		final ResourceProfile resourceProfile1 = ResourceProfile.UNKNOWN;
 
-		public ContainerId task = ContainerId.newInstance(
-				ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1);
 		public String taskHost = "host1";
 
 		public NMClient mockNMClient = mock(NMClient.class);
-		public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient =
-				mock(AMRMClientAsync.class);
+
+		@SuppressWarnings("unchecked")
+		public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient = mock(AMRMClientAsync.class);
+
 		public JobManagerMetricGroup mockJMMetricGroup =
 				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup();
 
@@ -306,8 +306,6 @@ public class YarnResourceManagerTest extends TestLogger {
 			private final JobLeaderIdService jobLeaderIdService;
 			private final SlotManager slotManager;
 
-			private UUID rmLeaderSessionId;
-
 			MockResourceManagerRuntimeServices() throws Exception {
 				highAvailabilityServices = new TestingHighAvailabilityServices();
 				rmLeaderElectionService = new TestingLeaderElectionService();
@@ -327,7 +325,7 @@ public class YarnResourceManagerTest extends TestLogger {
 			}
 
 			void grantLeadership() throws Exception {
-				rmLeaderSessionId = UUID.randomUUID();
+				UUID rmLeaderSessionId = UUID.randomUUID();
 				rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
 			}
 		}
@@ -533,7 +531,6 @@ public class YarnResourceManagerTest extends TestLogger {
 
 	/**
 	 * Tests that RM and TM calculate same slot resource profile.
-	 * @throws Exception
 	 */
 	@Test
 	public void testCreateSlotsPerWorker() throws Exception {
@@ -570,7 +567,7 @@ public class YarnResourceManagerTest extends TestLogger {
 						config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
 						MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes());
 
-				assertTrue(rmCalculatedResourceProfile.equals(tmCalculatedResourceProfile));
+				assertEquals(rmCalculatedResourceProfile, tmCalculatedResourceProfile);
 			});
 		}};
 	}


[flink] 08/10: [FLINK-12812] [runtime] (follow-up) Test refers to slot profile computed in ResourceManager.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bfd45436e05cccfd57f2fe0df094f242da618922
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 21:27:50 2019 +0200

    [FLINK-12812] [runtime] (follow-up) Test refers to slot profile computed in ResourceManager.
    
    This helps catch a previous bug in which the test re-computed the managed memory, passing a test value
    in MEGABYTES. The original call in the ResourceManager passed a value in BYTES.
---
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java    | 5 +++++
 .../test/java/org/apache/flink/yarn/YarnResourceManagerTest.java    | 6 +-----
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6ebf669..f5e6c99 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -323,6 +323,11 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		return resource;
 	}
 
+	@VisibleForTesting
+	Collection<ResourceProfile> getSlotsPerWorker() {
+		return slotsPerWorker;
+	}
+
 	@Override
 	public boolean stopWorker(final YarnWorkerNode workerNode) {
 		final Container container = workerNode.getContainer();
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 2b504c0..3e95431 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -564,10 +563,7 @@ public class YarnResourceManagerTest extends TestLogger {
 		new Context(config) {{
 			runTest(() -> {
 
-				ResourceProfile rmCalculatedResourceProfile =
-					ResourceManager.updateTaskManagerConfigAndCreateWorkerSlotProfiles(
-						config, resource.getMemory(), config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
-								.iterator().next();
+				ResourceProfile rmCalculatedResourceProfile = resourceManager.getSlotsPerWorker().iterator().next();
 
 				ResourceProfile tmCalculatedResourceProfile =
 					TaskManagerServices.computeSlotResourceProfile(


[flink] 09/10: [hotfix] [core] Simplify access to default values TM/JM memory sizes.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 662c82ac13131eb27280446298938ba6acaf3825
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 17:37:14 2019 +0200

    [hotfix] [core] Simplify access to default values TM/JM memory sizes.
---
 .../main/java/org/apache/flink/configuration/ConfigurationUtils.java  | 4 ++--
 .../apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java | 2 +-
 .../taskexecutor/NettyShuffleEnvironmentConfigurationTest.java        | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
old mode 100644
new mode 100755
index 7b717bd..7f63353
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -52,7 +52,7 @@ public class ConfigurationUtils {
 			return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");
 		} else {
 			//use default value
-			return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
+			return MemorySize.parse(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.defaultValue());
 		}
 	}
 
@@ -71,7 +71,7 @@ public class ConfigurationUtils {
 			return MemorySize.parse(configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) + "m");
 		} else {
 			//use default value
-			return MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY));
+			return MemorySize.parse(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.defaultValue());
 		}
 	}
 
diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
old mode 100644
new mode 100755
index 3e9ec37..9d93180
--- a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
@@ -164,7 +164,7 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 		config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));
 
 		if (managedMemSizeMB == 0) {
-			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
+			config.removeConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE);
 		} else {
 			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSizeMB + "m");
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
old mode 100644
new mode 100755
index 1aa4817..84a08c1
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
@@ -247,7 +247,7 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "10m"); // 10MB
 		assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
 
-		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0"); // use fraction of given memory
+		config.removeConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE); // use fraction of given memory
 		config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
 		assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
 	}


[flink] 03/10: [hotfix] [runtime] Improve slot profile computation in TaskManagerServices

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5a056935778d8ae9d9502a2f3833d7fc72b77d22
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 18:46:38 2019 +0200

    [hotfix] [runtime] Improve slot profile computation in TaskManagerServices
    
    In particular, this avoid repeatedly performing the computation, for each slot.
---
 .../apache/flink/runtime/taskexecutor/TaskManagerServices.java    | 8 ++------
 1 file changed, 2 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index dfcd5e2..ee00dde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -50,7 +50,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
@@ -263,11 +262,8 @@ public class TaskManagerServices {
 		final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
 
 		final int numOfSlots = taskManagerServicesConfiguration.getNumberOfSlots();
-		final List<ResourceProfile> resourceProfiles = new ArrayList<>(numOfSlots);
-
-		for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
-			resourceProfiles.add(computeSlotResourceProfile(numOfSlots, managedMemorySize));
-		}
+		final List<ResourceProfile> resourceProfiles =
+			Collections.nCopies(numOfSlots, computeSlotResourceProfile(numOfSlots, managedMemorySize));
 
 		final TimerService<AllocationID> timerService = new TimerService<>(
 			new ScheduledThreadPoolExecutor(1),


[flink] 04/10: [hotfix] [runtime] Split heap/managed memory computation into smaller reusable methods

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 92fe83ffa88659d310a18666a57d1562fcba8fcb
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 18:34:47 2019 +0200

    [hotfix] [runtime] Split heap/managed memory computation into smaller reusable methods
---
 ...TaskManagerHeapSizeCalculationJavaBashTest.java | 21 ++---
 .../runtime/taskexecutor/TaskManagerServices.java  | 98 ++++++++++++++--------
 .../ContaineredTaskManagerParametersTest.java      | 22 +++--
 3 files changed, 88 insertions(+), 53 deletions(-)

diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
index 87cf194..3e9ec37 100644
--- a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
@@ -279,18 +279,15 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 			String.valueOf(config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION))};
 		String scriptOutput = executeScript(command);
 
-		long absoluteTolerance = (long) (javaHeapSizeMB * tolerance);
-		if (absoluteTolerance < 1) {
-			assertEquals("Different heap sizes with configuration: " + config.toString(),
-				String.valueOf(javaHeapSizeMB), scriptOutput);
-		} else {
-			Long scriptHeapSizeMB = Long.valueOf(scriptOutput);
-			assertThat(
-				"Different heap sizes (Java: " + javaHeapSizeMB + ", Script: " + scriptHeapSizeMB +
-					") with configuration: " + config.toString(), scriptHeapSizeMB,
-				allOf(greaterThanOrEqualTo(javaHeapSizeMB - absoluteTolerance),
-					lessThanOrEqualTo(javaHeapSizeMB + absoluteTolerance)));
-		}
+		// we need a tolerance of at least one, to compensate for MB/byte conversion rounding errors
+		long absoluteTolerance = Math.max(1L, (long) (javaHeapSizeMB * tolerance));
+
+		Long scriptHeapSizeMB = Long.valueOf(scriptOutput);
+		assertThat(
+			"Different heap sizes (Java: " + javaHeapSizeMB + ", Script: " + scriptHeapSizeMB +
+				") with configuration: " + config.toString(), scriptHeapSizeMB,
+			allOf(greaterThanOrEqualTo(javaHeapSizeMB - absoluteTolerance),
+				lessThanOrEqualTo(javaHeapSizeMB + absoluteTolerance)));
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
old mode 100644
new mode 100755
index ee00dde..e211fc0
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -423,50 +423,68 @@ public class TaskManagerServices {
 	public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
 		Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
 
-		// subtract the Java memory used for network buffers (always off-heap)
-		final long networkBufMB = NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(
-			totalJavaMemorySizeMB << 20, // megabytes to bytes
-			config) >> 20; // bytes to megabytes
-		final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;
-
-		// split the available Java memory between heap and off-heap
-
-		final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
-
-		final long heapSizeMB;
-		if (useOffHeap) {
-
-			long offHeapSize;
-			String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
-			if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
-				try {
-					offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
-				} catch (IllegalArgumentException e) {
-					throw new IllegalConfigurationException(
-						"Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
-				}
-			} else {
-				offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
-			}
+		// all values below here are in bytes
 
-			if (offHeapSize <= 0) {
-				// calculate off-heap section via fraction
-				double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
-				offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
-			}
+		final long totalProcessMemory = megabytesToBytes(totalJavaMemorySizeMB);
+		final long networkReservedMemory = getReservedNetworkMemory(config, totalProcessMemory);
+		final long heapAndManagedMemory = totalProcessMemory - networkReservedMemory;
+
+		if (config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
+			final long managedMemorySize = getManagedMemoryFromHeapAndManaged(config, heapAndManagedMemory);
 
-			ConfigurationParserUtils.checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
+			ConfigurationParserUtils.checkConfigParameter(managedMemorySize < heapAndManagedMemory, managedMemorySize,
 				TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
-					"Managed memory size too large for " + networkBufMB +
+					"Managed memory size too large for " + (networkReservedMemory >> 20) +
 						" MB network buffer memory and a total of " + totalJavaMemorySizeMB +
 						" MB JVM memory");
 
-			heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
-		} else {
-			heapSizeMB = remainingJavaMemorySizeMB;
+			return bytesToMegabytes(heapAndManagedMemory - managedMemorySize);
+		}
+		else {
+			return bytesToMegabytes(heapAndManagedMemory);
+		}
+	}
+
+	/**
+	 * Gets the size of managed memory from the JVM process size, which at that point includes
+	 * network buffer memory, managed memory, and non-flink-managed heap memory.
+	 * All values are in bytes.
+	 */
+	public static long getManagedMemoryFromProcessMemory(Configuration config, long totalProcessMemory) {
+		final long heapAndManagedMemory = totalProcessMemory - getReservedNetworkMemory(config, totalProcessMemory);
+		return getManagedMemoryFromHeapAndManaged(config, heapAndManagedMemory);
+	}
+
+	/**
+	 * Gets the size of managed memory from the heap size after subtracting network buffer memory.
+	 * All values are in bytes.
+	 */
+	public static long getManagedMemoryFromHeapAndManaged(Configuration config, long heapAndManagedMemory) {
+		if (config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
+			// take the configured absolute value
+			final String sizeValue = config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+			try {
+				return MemorySize.parse(sizeValue, MEGA_BYTES).getBytes();
+			}
+			catch (IllegalArgumentException e) {
+				throw new IllegalConfigurationException(
+					"Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
+			}
+		}
+		else {
+			// calculate managed memory size via fraction
+			final float fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+			return (long) (fraction * heapAndManagedMemory);
 		}
+	}
 
-		return heapSizeMB;
+	/**
+	 * Gets the amount of memory reserved for networking, given the total JVM memory.
+	 * All values are in bytes.
+	 */
+	public static long getReservedNetworkMemory(Configuration config, long totalProcessMemory) {
+		// subtract the Java memory used for network buffers (always off-heap)
+		return NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(totalProcessMemory, config);
 	}
 
 	/**
@@ -519,4 +537,12 @@ public class TaskManagerServices {
 			managedMemoryPerSlot,
 			Collections.emptyMap());
 	}
+
+	private static long bytesToMegabytes(long bytes) {
+		return bytes >> 20;
+	}
+
+	private static long megabytesToBytes(long megabytes) {
+		return megabytes << 20;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
old mode 100644
new mode 100755
index c49f8e6..a1f4cad
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
@@ -21,13 +21,16 @@ package org.apache.flink.runtime.clusterframework;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_OFF_HEAP;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -52,10 +55,19 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
 			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
 
 		long cutoff = Math.max((long) (CONTAINER_MEMORY * memoryCutoffRatio), minCutoff);
-		final long networkBufMB = NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(
-			(CONTAINER_MEMORY - cutoff) << 20, // megabytes to bytes
-			conf) >> 20; // bytes to megabytes
-		assertEquals(networkBufMB + cutoff, params.taskManagerDirectMemoryLimitMB());
+		final long networkBufMB = TaskManagerServices.getReservedNetworkMemory(
+			conf,
+			(CONTAINER_MEMORY - cutoff) << 20 // megabytes to bytes
+		) >> 20; // bytes to megabytes
+
+		// this is unfortunately necessary due to rounding errors that happen due to back and
+		// forth conversion between bytes and megabytes
+		// ideally all logic calculates precisely with bytes and we use coarser units only un
+		// user-facing configuration and parametrization classes
+		assertThat(networkBufMB + cutoff,
+			anyOf(
+				equalTo(params.taskManagerDirectMemoryLimitMB()),
+				equalTo(params.taskManagerDirectMemoryLimitMB() - 1)));
 	}
 
 	/**


[flink] 05/10: [FLINK-12812] [runtime] (follow-up) Fix bytes/megabytes mixup in managed memory for slot resource profile

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 44ff1443fc37c91a60b53687df167406859610aa
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Jul 7 21:18:31 2019 +0200

    [FLINK-12812] [runtime] (follow-up) Fix bytes/megabytes mixup in managed memory for slot resource profile
---
 .../org/apache/flink/runtime/taskexecutor/TaskManagerServices.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e211fc0..d4e27e0 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -527,14 +527,14 @@ public class TaskManagerServices {
 
 	@VisibleForTesting
 	public static ResourceProfile computeSlotResourceProfile(int numOfSlots, long managedMemorySize) {
-		int managedMemoryPerSlot = (int) (managedMemorySize / numOfSlots);
+		int managedMemoryPerSlotMB = (int) bytesToMegabytes(managedMemorySize / numOfSlots);
 		return new ResourceProfile(
 			Double.MAX_VALUE,
 			Integer.MAX_VALUE,
 			Integer.MAX_VALUE,
 			Integer.MAX_VALUE,
 			Integer.MAX_VALUE,
-			managedMemoryPerSlot,
+			managedMemoryPerSlotMB,
 			Collections.emptyMap());
 	}