You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2019/12/17 19:55:25 UTC

[storm] branch master updated: [STORM-3551] Add Custom WorkerResource Equality method to Slot

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 68a74d0  [STORM-3551] Add Custom WorkerResource Equality method to Slot
     new 357df98  Merge pull request #3177 from kishorvpatil/storm3551
68a74d0 is described below

commit 68a74d04e809c4fff106d2e733c9a8c45322f8cb
Author: Kishor Patil <kp...@verizonmedia.com>
AuthorDate: Fri Dec 6 16:42:20 2019 -0500

    [STORM-3551] Add Custom WorkerResource Equality method to Slot
---
 .../org/apache/storm/daemon/supervisor/Slot.java   | 87 ++++++++++++++++++++--
 .../apache/storm/daemon/supervisor/SlotTest.java   | 65 +++++++++++++++-
 2 files changed, 144 insertions(+), 8 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index df419b9..85e5f9a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -12,8 +12,10 @@
 
 package org.apache.storm.daemon.supervisor;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -36,13 +38,13 @@ import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileAction;
 import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
 import org.apache.storm.localizer.AsyncLocalizer;
 import org.apache.storm.localizer.BlobChangingCallback;
 import org.apache.storm.localizer.GoodToGo;
 import org.apache.storm.localizer.LocallyCachedBlob;
 import org.apache.storm.metricstore.WorkerMetricsProcessor;
 import org.apache.storm.scheduler.ISupervisor;
-import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.utils.EnumUtil;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.ObjectReader;
@@ -174,9 +176,80 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     }
 
     /**
-     * Decide the equivalence of two local assignments, ignoring the order of executors
-     * This is different from #equal method.
-     * @param first Local assignment A
+     * This method compares WorkerResources while considering any resources are NULL to be 0.0
+     *
+     * @param first  WorkerResources A
+     * @param second WorkerResources B
+     * @return True if A and B are equivalent, treating the absent resources as 0.0
+     */
+    @VisibleForTesting
+    static boolean customWorkerResourcesEquality(WorkerResources first, WorkerResources second) {
+        if (first == null) {
+            return false;
+        }
+        if (first == second) {
+            return true;
+        }
+        if (first.equals(second)) {
+            return true;
+        }
+
+        if (first.get_cpu() != second.get_cpu()) {
+            return false;
+        }
+        if (first.get_mem_on_heap() != second.get_mem_on_heap()) {
+            return false;
+        }
+        if (first.get_mem_off_heap() != second.get_mem_off_heap()) {
+            return false;
+        }
+        if (first.get_shared_mem_off_heap() != second.get_shared_mem_off_heap()) {
+            return false;
+        }
+        if (first.get_shared_mem_on_heap() != second.get_shared_mem_on_heap()) {
+            return false;
+        }
+        if (!customResourceMapEquality(first.get_resources(), second.get_resources())) {
+            return false;
+        }
+        if (!customResourceMapEquality(first.get_shared_resources(), second.get_shared_resources())) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * This method compares Resource Maps while considering any resources are NULL to be 0.0
+     *
+     * @param firstMap  Resource Map A
+     * @param secondMap Resource Map B
+     * @return True if A and B are equivalent, treating the absent resources as 0.0
+     */
+    private static boolean customResourceMapEquality(Map<String, Double> firstMap, Map<String, Double> secondMap) {
+        if (firstMap == null && secondMap == null) {
+            return true;
+        }
+        if (firstMap == null) {
+            firstMap = new HashMap<>();
+        }
+        if (secondMap == null) {
+            secondMap = new HashMap<>();
+        }
+
+        Set<String> keys = new HashSet<>(firstMap.keySet());
+        keys.addAll(secondMap.keySet());
+        for (String key : keys) {
+            if (firstMap.getOrDefault(key, 0.0).doubleValue() != secondMap.getOrDefault(key, 0.0).doubleValue()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Decide the equivalence of two local assignments, ignoring the order of executors This is different from #equal method.
+     *
+     * @param first  Local assignment A
      * @param second Local assignment B
      * @return True if A and B are equivalent, ignoring the order of the executors
      */
@@ -196,9 +269,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
                         return true;
                     }
                     if (firstHasResources && secondHasResources) {
-                        if (first.get_resources().equals(second.get_resources())) {
-                            return true;
-                        }
+                        WorkerResources firstResources = first.get_resources();
+                        WorkerResources secondResources = second.get_resources();
+                        return customWorkerResourcesEquality(firstResources, secondResources);
                     }
                 }
             }
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 9dfe2d7..1cfea8a 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -15,10 +15,13 @@ package org.apache.storm.daemon.supervisor;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -77,6 +80,14 @@ public class SlotTest {
         return resources;
     }
 
+    static WorkerResources mkWorkerResources(Double cpu, Double mem_on_heap, Double mem_off_heap, Map<String, Double> resources) {
+        WorkerResources workerResources = mkWorkerResources(cpu, mem_on_heap, mem_off_heap);
+        if (resources != null) {
+            workerResources.set_resources(resources);
+        }
+        return workerResources;
+    }
+
     static LSWorkerHeartbeat mkWorkerHB(String id, int port, List<ExecutorInfo> exec, Integer timeSecs) {
         LSWorkerHeartbeat ret = new LSWorkerHeartbeat();
         ret.set_topology_id(id);
@@ -108,12 +119,64 @@ public class SlotTest {
     }
 
     @Test
-    public void testEquivilant() {
+    public void testWorkerResourceEquality() {
+        WorkerResources resourcesRNull = mkWorkerResources(100.0, 100.0, 100.0, null);
+        WorkerResources resourcesREmpty = mkWorkerResources(100.0, 100.0, 100.0, Maps.newHashMap());
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesRNull,resourcesREmpty));
+
+        Map resources = new HashMap<String, Double>();
+        resources.put("network.resource.units", 0.0);
+        WorkerResources resourcesRNetwork = mkWorkerResources(100.0, 100.0, 100.0,resources);
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesREmpty, resourcesRNetwork));
+
+
+        Map resourcesNetwork = new HashMap<String, Double>();
+        resourcesNetwork.put("network.resource.units", 50.0);
+        WorkerResources resourcesRNetworkNonZero = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetwork);
+        assertFalse(Slot.customWorkerResourcesEquality(resourcesREmpty, resourcesRNetworkNonZero));
+
+        Map resourcesNetworkOne = new HashMap<String, Double>();
+        resourcesNetworkOne.put("network.resource.units", 50.0);
+        WorkerResources resourcesRNetworkOne = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetworkOne);
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesRNetworkOne, resourcesRNetworkNonZero));
+
+        Map resourcesNetworkTwo = new HashMap<String, Double>();
+        resourcesNetworkTwo.put("network.resource.units", 100.0);
+        WorkerResources resourcesRNetworkTwo = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetworkTwo);
+        assertFalse(Slot.customWorkerResourcesEquality(resourcesRNetworkOne, resourcesRNetworkTwo));
+
+        WorkerResources resourcesCpuNull = mkWorkerResources(null, 100.0,100.0);
+        WorkerResources resourcesCPUZero = mkWorkerResources(0.0, 100.0,100.0);
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesCpuNull, resourcesCPUZero));
+
+        WorkerResources resourcesOnHeapMemNull = mkWorkerResources(100.0, null,100.0);
+        WorkerResources resourcesOnHeapMemZero = mkWorkerResources(100.0, 0.0,100.0);
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesOnHeapMemNull, resourcesOnHeapMemZero));
+
+        WorkerResources resourcesOffHeapMemNull = mkWorkerResources(100.0, 100.0,null);
+        WorkerResources resourcesOffHeapMemZero = mkWorkerResources(100.0, 100.0,0.0);
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesOffHeapMemNull, resourcesOffHeapMemZero));
+
+    }
+
+    @Test
+    public void testEquivalent() {
         LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 100.0, 100.0));
         LocalAssignment aResized = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 200.0, 100.0));
         LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1, 2, 3, 4, 5, 6), mkWorkerResources(100.0, 100.0, 100.0));
         LocalAssignment bReordered = mkLocalAssignment("B", mkExecutorInfoList(6, 5, 4, 3, 2, 1), mkWorkerResources(100.0, 100.0, 100.0));
 
+        LocalAssignment c = mkLocalAssignment("C", mkExecutorInfoList(188, 261),mkWorkerResources(400.0,10000.0,0.0));
+
+        WorkerResources workerResources = mkWorkerResources(400.0, 10000.0, 0.0);
+        Map<String, Double> additionalResources = workerResources.get_resources();
+        if( additionalResources == null) additionalResources = new HashMap<>();
+        additionalResources.put("network.resource.units", 0.0);
+
+        workerResources.set_resources(additionalResources);
+        LocalAssignment cReordered = mkLocalAssignment("C", mkExecutorInfoList(188, 261), workerResources);
+
+        assertTrue(Slot.equivalent(c,cReordered));
         assertTrue(Slot.equivalent(null, null));
         assertTrue(Slot.equivalent(a, a));
         assertTrue(Slot.equivalent(b, bReordered));