You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/12 19:05:29 UTC

[flink] 02/10: [FLINK-12765][coordinator] Change the default resource spec to UNKNOWN

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6857a4660eb84c07425594d4fb36df57761aa1a
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Wed Jul 10 15:16:16 2019 +0800

    [FLINK-12765][coordinator] Change the default resource spec to UNKNOWN
---
 .../flink/api/common/operators/ResourceSpec.java   | 29 +++++++++++++++++++++-
 .../clusterframework/types/ResourceProfile.java    |  4 +++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 5b1b7b1..4ce5911 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -29,6 +29,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Describe the different resource factors of the operator with UDF.
  *
@@ -52,7 +54,9 @@ public class ResourceSpec implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0, 0);
+	public static final ResourceSpec UNKNOWN = new ResourceSpec();
+
+	public static final ResourceSpec DEFAULT = UNKNOWN;
 
 	public static final String GPU_NAME = "GPU";
 
@@ -95,6 +99,13 @@ public class ResourceSpec implements Serializable {
 			int stateSizeInMB,
 			int managedMemoryInMB,
 			Resource... extendedResources) {
+		checkArgument(cpuCores >= 0, "The cpu cores of the resource spec should not be negative.");
+		checkArgument(heapMemoryInMB >= 0, "The heap memory of the resource spec should not be negative");
+		checkArgument(directMemoryInMB >= 0, "The direct memory of the resource spec should not be negative");
+		checkArgument(nativeMemoryInMB >= 0, "The native memory of the resource spec should not be negative");
+		checkArgument(stateSizeInMB >= 0, "The state size of the resource spec should not be negative");
+		checkArgument(managedMemoryInMB >= 0, "The managed memory of the resource spec should not be negative");
+
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
@@ -109,6 +120,18 @@ public class ResourceSpec implements Serializable {
 	}
 
 	/**
+	 * Creates a new ResourceSpec with all fields unknown.
+	 */
+	private ResourceSpec() {
+		this.cpuCores = -1;
+		this.heapMemoryInMB = -1;
+		this.directMemoryInMB = -1;
+		this.nativeMemoryInMB = -1;
+		this.stateSizeInMB = -1;
+		this.managedMemoryInMB = -1;
+	}
+
+	/**
 	 * Used by system internally to merge the other resources of chained operators
 	 * when generating the job graph or merge the resource consumed by state backend.
 	 *
@@ -116,6 +139,10 @@ public class ResourceSpec implements Serializable {
 	 * @return The new resource with merged values.
 	 */
 	public ResourceSpec merge(ResourceSpec other) {
+		if (this == UNKNOWN || other == UNKNOWN) {
+			return UNKNOWN;
+		}
+
 		ResourceSpec target = new ResourceSpec(
 				Math.max(this.cpuCores, other.cpuCores),
 				this.heapMemoryInMB + other.heapMemoryInMB,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 0a02cd0..840825a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -445,6 +445,10 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	public static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
+		if (resourceSpec == ResourceSpec.UNKNOWN) {
+			return UNKNOWN;
+		}
+
 		Map<String, Resource> copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources());
 
 		return new ResourceProfile(