You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2019/12/17 19:55:25 UTC
[storm] branch master updated: [STORM-3551] Add Custom
WorkerResource Equality method to Slot
This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 68a74d0 [STORM-3551] Add Custom WorkerResource Equality method to Slot
new 357df98 Merge pull request #3177 from kishorvpatil/storm3551
68a74d0 is described below
commit 68a74d04e809c4fff106d2e733c9a8c45322f8cb
Author: Kishor Patil <kp...@verizonmedia.com>
AuthorDate: Fri Dec 6 16:42:20 2019 -0500
[STORM-3551] Add Custom WorkerResource Equality method to Slot
---
.../org/apache/storm/daemon/supervisor/Slot.java | 87 ++++++++++++++++++++--
.../apache/storm/daemon/supervisor/SlotTest.java | 65 +++++++++++++++-
2 files changed, 144 insertions(+), 8 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index df419b9..85e5f9a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -12,8 +12,10 @@
package org.apache.storm.daemon.supervisor;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -36,13 +38,13 @@ import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.localizer.BlobChangingCallback;
import org.apache.storm.localizer.GoodToGo;
import org.apache.storm.localizer.LocallyCachedBlob;
import org.apache.storm.metricstore.WorkerMetricsProcessor;
import org.apache.storm.scheduler.ISupervisor;
-import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.utils.EnumUtil;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
@@ -174,9 +176,80 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
}
/**
- * Decide the equivalence of two local assignments, ignoring the order of executors
- * This is different from #equal method.
- * @param first Local assignment A
+ * This method compares WorkerResources while considering any resources are NULL to be 0.0
+ *
+ * @param first WorkerResources A
+ * @param second WorkerResources B
+ * @return True if A and B are equivalent, treating the absent resources as 0.0
+ */
+ @VisibleForTesting
+ static boolean customWorkerResourcesEquality(WorkerResources first, WorkerResources second) {
+ if (first == null) {
+ return false;
+ }
+ if (first == second) {
+ return true;
+ }
+ if (first.equals(second)) {
+ return true;
+ }
+
+ if (first.get_cpu() != second.get_cpu()) {
+ return false;
+ }
+ if (first.get_mem_on_heap() != second.get_mem_on_heap()) {
+ return false;
+ }
+ if (first.get_mem_off_heap() != second.get_mem_off_heap()) {
+ return false;
+ }
+ if (first.get_shared_mem_off_heap() != second.get_shared_mem_off_heap()) {
+ return false;
+ }
+ if (first.get_shared_mem_on_heap() != second.get_shared_mem_on_heap()) {
+ return false;
+ }
+ if (!customResourceMapEquality(first.get_resources(), second.get_resources())) {
+ return false;
+ }
+ if (!customResourceMapEquality(first.get_shared_resources(), second.get_shared_resources())) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * This method compares Resource Maps while considering any resources are NULL to be 0.0
+ *
+ * @param firstMap Resource Map A
+ * @param secondMap Resource Map B
+ * @return True if A and B are equivalent, treating the absent resources as 0.0
+ */
+ private static boolean customResourceMapEquality(Map<String, Double> firstMap, Map<String, Double> secondMap) {
+ if (firstMap == null && secondMap == null) {
+ return true;
+ }
+ if (firstMap == null) {
+ firstMap = new HashMap<>();
+ }
+ if (secondMap == null) {
+ secondMap = new HashMap<>();
+ }
+
+ Set<String> keys = new HashSet<>(firstMap.keySet());
+ keys.addAll(secondMap.keySet());
+ for (String key : keys) {
+ if (firstMap.getOrDefault(key, 0.0).doubleValue() != secondMap.getOrDefault(key, 0.0).doubleValue()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Decide the equivalence of two local assignments, ignoring the order of executors This is different from #equal method.
+ *
+ * @param first Local assignment A
* @param second Local assignment B
* @return True if A and B are equivalent, ignoring the order of the executors
*/
@@ -196,9 +269,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
return true;
}
if (firstHasResources && secondHasResources) {
- if (first.get_resources().equals(second.get_resources())) {
- return true;
- }
+ WorkerResources firstResources = first.get_resources();
+ WorkerResources secondResources = second.get_resources();
+ return customWorkerResourcesEquality(firstResources, secondResources);
}
}
}
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 9dfe2d7..1cfea8a 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -15,10 +15,13 @@ package org.apache.storm.daemon.supervisor;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
+import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -77,6 +80,14 @@ public class SlotTest {
return resources;
}
+ static WorkerResources mkWorkerResources(Double cpu, Double mem_on_heap, Double mem_off_heap, Map<String, Double> resources) {
+ WorkerResources workerResources = mkWorkerResources(cpu, mem_on_heap, mem_off_heap);
+ if (resources != null) {
+ workerResources.set_resources(resources);
+ }
+ return workerResources;
+ }
+
static LSWorkerHeartbeat mkWorkerHB(String id, int port, List<ExecutorInfo> exec, Integer timeSecs) {
LSWorkerHeartbeat ret = new LSWorkerHeartbeat();
ret.set_topology_id(id);
@@ -108,12 +119,64 @@ public class SlotTest {
}
@Test
- public void testEquivilant() {
+ public void testWorkerResourceEquality() {
+ WorkerResources resourcesRNull = mkWorkerResources(100.0, 100.0, 100.0, null);
+ WorkerResources resourcesREmpty = mkWorkerResources(100.0, 100.0, 100.0, Maps.newHashMap());
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesRNull,resourcesREmpty));
+
+ Map resources = new HashMap<String, Double>();
+ resources.put("network.resource.units", 0.0);
+ WorkerResources resourcesRNetwork = mkWorkerResources(100.0, 100.0, 100.0,resources);
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesREmpty, resourcesRNetwork));
+
+
+ Map resourcesNetwork = new HashMap<String, Double>();
+ resourcesNetwork.put("network.resource.units", 50.0);
+ WorkerResources resourcesRNetworkNonZero = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetwork);
+ assertFalse(Slot.customWorkerResourcesEquality(resourcesREmpty, resourcesRNetworkNonZero));
+
+ Map resourcesNetworkOne = new HashMap<String, Double>();
+ resourcesNetworkOne.put("network.resource.units", 50.0);
+ WorkerResources resourcesRNetworkOne = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetworkOne);
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesRNetworkOne, resourcesRNetworkNonZero));
+
+ Map resourcesNetworkTwo = new HashMap<String, Double>();
+ resourcesNetworkTwo.put("network.resource.units", 100.0);
+ WorkerResources resourcesRNetworkTwo = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetworkTwo);
+ assertFalse(Slot.customWorkerResourcesEquality(resourcesRNetworkOne, resourcesRNetworkTwo));
+
+ WorkerResources resourcesCpuNull = mkWorkerResources(null, 100.0,100.0);
+ WorkerResources resourcesCPUZero = mkWorkerResources(0.0, 100.0,100.0);
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesCpuNull, resourcesCPUZero));
+
+ WorkerResources resourcesOnHeapMemNull = mkWorkerResources(100.0, null,100.0);
+ WorkerResources resourcesOnHeapMemZero = mkWorkerResources(100.0, 0.0,100.0);
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesOnHeapMemNull, resourcesOnHeapMemZero));
+
+ WorkerResources resourcesOffHeapMemNull = mkWorkerResources(100.0, 100.0,null);
+ WorkerResources resourcesOffHeapMemZero = mkWorkerResources(100.0, 100.0,0.0);
+ assertTrue(Slot.customWorkerResourcesEquality(resourcesOffHeapMemNull, resourcesOffHeapMemZero));
+
+ }
+
+ @Test
+ public void testEquivalent() {
LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 100.0, 100.0));
LocalAssignment aResized = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 200.0, 100.0));
LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1, 2, 3, 4, 5, 6), mkWorkerResources(100.0, 100.0, 100.0));
LocalAssignment bReordered = mkLocalAssignment("B", mkExecutorInfoList(6, 5, 4, 3, 2, 1), mkWorkerResources(100.0, 100.0, 100.0));
+ LocalAssignment c = mkLocalAssignment("C", mkExecutorInfoList(188, 261),mkWorkerResources(400.0,10000.0,0.0));
+
+ WorkerResources workerResources = mkWorkerResources(400.0, 10000.0, 0.0);
+ Map<String, Double> additionalResources = workerResources.get_resources();
+ if( additionalResources == null) additionalResources = new HashMap<>();
+ additionalResources.put("network.resource.units", 0.0);
+
+ workerResources.set_resources(additionalResources);
+ LocalAssignment cReordered = mkLocalAssignment("C", mkExecutorInfoList(188, 261), workerResources);
+
+ assertTrue(Slot.equivalent(c,cReordered));
assertTrue(Slot.equivalent(null, null));
assertTrue(Slot.equivalent(a, a));
assertTrue(Slot.equivalent(b, bReordered));