You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/18 13:35:56 UTC
[flink] 02/03: [FLINK-14869][core] Discard zero-valued extended
resources in ResourceSpec
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 727b961fb58f1430d9b334673dc2bed691aa9c73
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu Feb 18 13:34:14 2021 +0800
[FLINK-14869][core] Discard zero-valued extended resources in ResourceSpec
---
.../flink/api/common/operators/ResourceSpec.java | 47 +++++++++-------------
.../api/common/operators/ResourceSpecTest.java | 15 +++++++
2 files changed, 35 insertions(+), 27 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 0023627..fbb9d55 100755
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -27,10 +27,12 @@ import org.apache.flink.configuration.MemorySize;
import javax.annotation.Nullable;
import java.io.Serializable;
-import java.math.BigDecimal;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -84,7 +86,7 @@ public final class ResourceSpec implements Serializable {
@Nullable // can be null only for UNKNOWN
private final MemorySize managedMemory;
- private final Map<String, Resource> extendedResources = new HashMap<>(1);
+ private final Map<String, Resource> extendedResources;
private ResourceSpec(
final Resource cpuCores,
@@ -101,11 +103,10 @@ public final class ResourceSpec implements Serializable {
this.taskOffHeapMemory = checkNotNull(taskOffHeapMemory);
this.managedMemory = checkNotNull(managedMemory);
- for (Resource resource : extendedResources) {
- if (resource != null) {
- this.extendedResources.put(resource.getName(), resource);
- }
- }
+ this.extendedResources =
+ Arrays.stream(extendedResources)
+ .filter(resource -> !checkNotNull(resource).isZero())
+ .collect(Collectors.toMap(Resource::getName, Function.identity()));
}
/** Creates a new ResourceSpec with all fields unknown. */
@@ -114,6 +115,7 @@ public final class ResourceSpec implements Serializable {
this.taskHeapMemory = null;
this.taskOffHeapMemory = null;
this.managedMemory = null;
+ this.extendedResources = new HashMap<>();
}
/**
@@ -160,27 +162,18 @@ public final class ResourceSpec implements Serializable {
other.lessThanOrEqual(this),
"Cannot subtract a larger ResourceSpec from this one.");
- final ResourceSpec target =
- new ResourceSpec(
- this.cpuCores.subtract(other.cpuCores),
- this.taskHeapMemory.subtract(other.taskHeapMemory),
- this.taskOffHeapMemory.subtract(other.taskOffHeapMemory),
- this.managedMemory.subtract(other.managedMemory));
-
- target.extendedResources.putAll(extendedResources);
-
+ Map<String, Resource> resultExtendedResources = new HashMap<>(extendedResources);
for (Resource resource : other.extendedResources.values()) {
- target.extendedResources.merge(
- resource.getName(),
- resource,
- (v1, v2) -> {
- final Resource subtracted = v1.subtract(v2);
- return subtracted.getValue().compareTo(BigDecimal.ZERO) == 0
- ? null
- : subtracted;
- });
+ resultExtendedResources.merge(
+ resource.getName(), resource, (v1, v2) -> v1.subtract(v2));
}
- return target;
+
+ return new ResourceSpec(
+ this.cpuCores.subtract(other.cpuCores),
+ this.taskHeapMemory.subtract(other.taskHeapMemory),
+ this.taskOffHeapMemory.subtract(other.taskOffHeapMemory),
+ this.managedMemory.subtract(other.managedMemory),
+ resultExtendedResources.values().toArray(new Resource[0]));
}
public Resource getCpuCores() {
@@ -334,7 +327,7 @@ public final class ResourceSpec implements Serializable {
private MemorySize taskHeapMemory;
private MemorySize taskOffHeapMemory = MemorySize.ZERO;
private MemorySize managedMemory = MemorySize.ZERO;
- private GPUResource gpuResource;
+ private GPUResource gpuResource = new GPUResource(0.0);
private Builder(CPUResource cpuCores, MemorySize taskHeapMemory) {
this.cpuCores = cpuCores;
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
index 30b8f25..f757808 100755
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
@@ -205,4 +205,19 @@ public class ResourceSpecTest extends TestLogger {
final ResourceSpec subtracted = rs1.subtract(rs2);
assertEquals(ResourceSpec.UNKNOWN, subtracted);
}
+
+ @Test
+ public void testZeroExtendedResourceFromConstructor() {
+ final ResourceSpec resourceSpec =
+ ResourceSpec.newBuilder(1.0, 100).setGPUResource(0.0).build();
+ assertEquals(resourceSpec.getExtendedResources().size(), 0);
+ }
+
+ @Test
+ public void testZeroExtendedResourceFromSubtract() {
+ final ResourceSpec resourceSpec =
+ ResourceSpec.newBuilder(1.0, 100).setGPUResource(1.0).build();
+
+ assertEquals(resourceSpec.subtract(resourceSpec).getExtendedResources().size(), 0);
+ }
}