You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/12 19:05:29 UTC
[flink] 02/10: [FLINK-12765][coordinator] Change the default
resource spec to UNKNOWN
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f6857a4660eb84c07425594d4fb36df57761aa1a
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Wed Jul 10 15:16:16 2019 +0800
[FLINK-12765][coordinator] Change the default resource spec to UNKNOWN
---
.../flink/api/common/operators/ResourceSpec.java | 29 +++++++++++++++++++++-
.../clusterframework/types/ResourceProfile.java | 4 +++
2 files changed, 32 insertions(+), 1 deletion(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 5b1b7b1..4ce5911 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -29,6 +29,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* Describe the different resource factors of the operator with UDF.
*
@@ -52,7 +54,9 @@ public class ResourceSpec implements Serializable {
private static final long serialVersionUID = 1L;
- public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0, 0);
+ public static final ResourceSpec UNKNOWN = new ResourceSpec();
+
+ public static final ResourceSpec DEFAULT = UNKNOWN;
public static final String GPU_NAME = "GPU";
@@ -95,6 +99,13 @@ public class ResourceSpec implements Serializable {
int stateSizeInMB,
int managedMemoryInMB,
Resource... extendedResources) {
+ checkArgument(cpuCores >= 0, "The cpu cores of the resource spec should not be negative.");
+ checkArgument(heapMemoryInMB >= 0, "The heap memory of the resource spec should not be negative");
+ checkArgument(directMemoryInMB >= 0, "The direct memory of the resource spec should not be negative");
+ checkArgument(nativeMemoryInMB >= 0, "The native memory of the resource spec should not be negative");
+ checkArgument(stateSizeInMB >= 0, "The state size of the resource spec should not be negative");
+ checkArgument(managedMemoryInMB >= 0, "The managed memory of the resource spec should not be negative");
+
this.cpuCores = cpuCores;
this.heapMemoryInMB = heapMemoryInMB;
this.directMemoryInMB = directMemoryInMB;
@@ -109,6 +120,18 @@ public class ResourceSpec implements Serializable {
}
/**
+ * Creates a new ResourceSpec with all fields unknown.
+ */
+ private ResourceSpec() {
+ this.cpuCores = -1;
+ this.heapMemoryInMB = -1;
+ this.directMemoryInMB = -1;
+ this.nativeMemoryInMB = -1;
+ this.stateSizeInMB = -1;
+ this.managedMemoryInMB = -1;
+ }
+
+ /**
* Used by system internally to merge the other resources of chained operators
* when generating the job graph or merge the resource consumed by state backend.
*
@@ -116,6 +139,10 @@ public class ResourceSpec implements Serializable {
* @return The new resource with merged values.
*/
public ResourceSpec merge(ResourceSpec other) {
+ if (this == UNKNOWN || other == UNKNOWN) {
+ return UNKNOWN;
+ }
+
ResourceSpec target = new ResourceSpec(
Math.max(this.cpuCores, other.cpuCores),
this.heapMemoryInMB + other.heapMemoryInMB,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 0a02cd0..840825a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -445,6 +445,10 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
}
public static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
+ if (resourceSpec == ResourceSpec.UNKNOWN) {
+ return UNKNOWN;
+ }
+
Map<String, Resource> copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources());
return new ResourceProfile(