You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/18 13:35:56 UTC

[flink] 02/03: [FLINK-14869][core] Discard zero-valued extended resources in ResourceSpec

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 727b961fb58f1430d9b334673dc2bed691aa9c73
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu Feb 18 13:34:14 2021 +0800

    [FLINK-14869][core] Discard zero-valued extended resources in ResourceSpec
---
 .../flink/api/common/operators/ResourceSpec.java   | 47 +++++++++-------------
 .../api/common/operators/ResourceSpecTest.java     | 15 +++++++
 2 files changed, 35 insertions(+), 27 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 0023627..fbb9d55 100755
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -27,10 +27,12 @@ import org.apache.flink.configuration.MemorySize;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
-import java.math.BigDecimal;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -84,7 +86,7 @@ public final class ResourceSpec implements Serializable {
     @Nullable // can be null only for UNKNOWN
     private final MemorySize managedMemory;
 
-    private final Map<String, Resource> extendedResources = new HashMap<>(1);
+    private final Map<String, Resource> extendedResources;
 
     private ResourceSpec(
             final Resource cpuCores,
@@ -101,11 +103,10 @@ public final class ResourceSpec implements Serializable {
         this.taskOffHeapMemory = checkNotNull(taskOffHeapMemory);
         this.managedMemory = checkNotNull(managedMemory);
 
-        for (Resource resource : extendedResources) {
-            if (resource != null) {
-                this.extendedResources.put(resource.getName(), resource);
-            }
-        }
+        this.extendedResources =
+                Arrays.stream(extendedResources)
+                        .filter(resource -> !checkNotNull(resource).isZero())
+                        .collect(Collectors.toMap(Resource::getName, Function.identity()));
     }
 
     /** Creates a new ResourceSpec with all fields unknown. */
@@ -114,6 +115,7 @@ public final class ResourceSpec implements Serializable {
         this.taskHeapMemory = null;
         this.taskOffHeapMemory = null;
         this.managedMemory = null;
+        this.extendedResources = new HashMap<>();
     }
 
     /**
@@ -160,27 +162,18 @@ public final class ResourceSpec implements Serializable {
                 other.lessThanOrEqual(this),
                 "Cannot subtract a larger ResourceSpec from this one.");
 
-        final ResourceSpec target =
-                new ResourceSpec(
-                        this.cpuCores.subtract(other.cpuCores),
-                        this.taskHeapMemory.subtract(other.taskHeapMemory),
-                        this.taskOffHeapMemory.subtract(other.taskOffHeapMemory),
-                        this.managedMemory.subtract(other.managedMemory));
-
-        target.extendedResources.putAll(extendedResources);
-
+        Map<String, Resource> resultExtendedResources = new HashMap<>(extendedResources);
         for (Resource resource : other.extendedResources.values()) {
-            target.extendedResources.merge(
-                    resource.getName(),
-                    resource,
-                    (v1, v2) -> {
-                        final Resource subtracted = v1.subtract(v2);
-                        return subtracted.getValue().compareTo(BigDecimal.ZERO) == 0
-                                ? null
-                                : subtracted;
-                    });
+            resultExtendedResources.merge(
+                    resource.getName(), resource, (v1, v2) -> v1.subtract(v2));
         }
-        return target;
+
+        return new ResourceSpec(
+                this.cpuCores.subtract(other.cpuCores),
+                this.taskHeapMemory.subtract(other.taskHeapMemory),
+                this.taskOffHeapMemory.subtract(other.taskOffHeapMemory),
+                this.managedMemory.subtract(other.managedMemory),
+                resultExtendedResources.values().toArray(new Resource[0]));
     }
 
     public Resource getCpuCores() {
@@ -334,7 +327,7 @@ public final class ResourceSpec implements Serializable {
         private MemorySize taskHeapMemory;
         private MemorySize taskOffHeapMemory = MemorySize.ZERO;
         private MemorySize managedMemory = MemorySize.ZERO;
-        private GPUResource gpuResource;
+        private GPUResource gpuResource = new GPUResource(0.0);
 
         private Builder(CPUResource cpuCores, MemorySize taskHeapMemory) {
             this.cpuCores = cpuCores;
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
index 30b8f25..f757808 100755
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
@@ -205,4 +205,19 @@ public class ResourceSpecTest extends TestLogger {
         final ResourceSpec subtracted = rs1.subtract(rs2);
         assertEquals(ResourceSpec.UNKNOWN, subtracted);
     }
+
+    @Test
+    public void testZeroExtendedResourceFromConstructor() {
+        final ResourceSpec resourceSpec =
+                ResourceSpec.newBuilder(1.0, 100).setGPUResource(0.0).build();
+        assertEquals(resourceSpec.getExtendedResources().size(), 0);
+    }
+
+    @Test
+    public void testZeroExtendedResourceFromSubtract() {
+        final ResourceSpec resourceSpec =
+                ResourceSpec.newBuilder(1.0, 100).setGPUResource(1.0).build();
+
+        assertEquals(resourceSpec.subtract(resourceSpec).getExtendedResources().size(), 0);
+    }
 }