You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by go...@apache.org on 2020/04/24 15:21:16 UTC

[storm] branch master updated: STORM-3259: Fixes reading of NUMA supervisor assignments

This is an automated email from the ASF dual-hosted git repository.

govind pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 5daf0cc  STORM-3259: Fixes reading of NUMA supervisor assignments
     new a2905c8  Merge pull request #3257 from govind-menon/STORM-3259-Containers
5daf0cc is described below

commit 5daf0cce0ddbbb7c6ceee38c5cfec9ba3a858517
Author: Govind Menon <go...@gmail.com>
AuthorDate: Thu Apr 23 13:07:20 2020 -0500

    STORM-3259: Fixes reading of NUMA supervisor assignments
---
 .../storm/daemon/supervisor/ReadClusterState.java  |  4 +-
 .../supervisor/timer/SynchronizeAssignments.java   | 57 +++++++++++++++++-----
 2 files changed, 48 insertions(+), 13 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 7337927..503b44a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -247,7 +247,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
         Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = assignment.get_worker_resources();
         if (nodeInfoWorkerResourcesMap != null) {
             for (Map.Entry<NodeInfo, WorkerResources> entry : nodeInfoWorkerResourcesMap.entrySet()) {
-                if (entry.getKey().get_node().equals(assignmentId)) {
+                if (entry.getKey().get_node().startsWith(assignmentId)) {
                     Set<Long> ports = entry.getKey().get_port();
                     for (Long port : ports) {
                         slotsResources.put(port, entry.getValue());
@@ -267,7 +267,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
         Map<List<Long>, NodeInfo> executorNodePort = assignment.get_executor_node_port();
         if (executorNodePort != null) {
             for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) {
-                if (entry.getValue().get_node().equals(assignmentId)) {
+                if (entry.getValue().get_node().startsWith(assignmentId)) {
                     for (Long port : entry.getValue().get_port()) {
                         LocalAssignment localAssignment = portTasks.get(port.intValue());
                         if (localAssignment == null) {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
index 3960ce9..62f73f2 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
@@ -12,12 +12,19 @@
 
 package org.apache.storm.daemon.supervisor.timer;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+
+import org.apache.storm.ServerConstants;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.supervisor.ReadClusterState;
 import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
 import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.SupervisorAssignments;
 import org.apache.storm.thrift.TException;
 import org.apache.storm.utils.ConfigUtils;
@@ -49,14 +56,21 @@ public class SynchronizeAssignments implements Runnable {
         this.readClusterState = readClusterState;
     }
 
-    private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) {
-        if (null == assignments) {
+    private static void assignedAssignmentsToLocal(IStormClusterState clusterState,
+                                                   List<SupervisorAssignments> supervisorAssignments) {
+        if (null == supervisorAssignments || supervisorAssignments.isEmpty()) {
             //unknown error, just skip
             return;
         }
         Map<String, byte[]> serAssignments = new HashMap<>();
-        for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) {
-            serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
+        for (SupervisorAssignments supervisorAssignment : supervisorAssignments) {
+            if (supervisorAssignment == null) {
+                //unknown error, just skip
+                continue;
+            }
+            for (Map.Entry<String, Assignment> entry : supervisorAssignment.get_storm_assignment().entrySet()) {
+                serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
+            }
         }
         clusterState.syncRemoteAssignments(serAssignments);
     }
@@ -67,7 +81,7 @@ public class SynchronizeAssignments implements Runnable {
         if (null == assignments) {
             getAssignmentsFromMaster(this.supervisor.getConf(), this.supervisor.getStormClusterState(), this.supervisor.getAssignmentId());
         } else {
-            assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), assignments);
+            assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), Collections.singletonList(assignments));
         }
         this.readClusterState.run();
     }
@@ -81,7 +95,7 @@ public class SynchronizeAssignments implements Runnable {
         while (!success) {
             try (NimbusClient master = NimbusClient.getConfiguredClient(supervisor.getConf())) {
                 SupervisorAssignments assignments = master.getClient().getSupervisorAssignments(supervisor.getAssignmentId());
-                assignedAssignmentsToLocal(supervisor.getStormClusterState(), assignments);
+                assignedAssignmentsToLocal(supervisor.getStormClusterState(), Collections.singletonList(assignments));
                 success = true;
             } catch (Exception t) {
                 // just ignore the exception
@@ -99,6 +113,24 @@ public class SynchronizeAssignments implements Runnable {
 
     }
 
+    public List<SupervisorAssignments> getAllAssignmentsFromNumaSupervisors(
+            Nimbus.Iface nimbus, String node
+    ) throws TException {
+        List<SupervisorAssignments> supervisorAssignmentsList = new ArrayList();
+        Map<String, Object> validatedNumaMap = SupervisorUtils.getNumaMap(supervisor.getConf());
+        for (Map.Entry<String, Object> numaEntry : validatedNumaMap.entrySet()) {
+            String numaId = numaEntry.getKey();
+            SupervisorAssignments assignments = nimbus.getSupervisorAssignments(
+                    node + ServerConstants.NUMA_ID_SEPARATOR + numaId
+            );
+            supervisorAssignmentsList.add(assignments);
+        }
+        SupervisorAssignments assignments = nimbus.getSupervisorAssignments(node);
+        supervisorAssignmentsList.add(assignments);
+
+        return supervisorAssignmentsList;
+    }
+
     /**
      * Used by {@link Supervisor} to fetch assignments when start up.
      * @param conf config
@@ -108,16 +140,19 @@ public class SynchronizeAssignments implements Runnable {
     public void getAssignmentsFromMaster(Map conf, IStormClusterState clusterState, String node) {
         if (ConfigUtils.isLocalMode(conf)) {
             try {
-                SupervisorAssignments assignments = this.supervisor.getLocalNimbus().getSupervisorAssignments(node);
-                assignedAssignmentsToLocal(clusterState, assignments);
+                List<SupervisorAssignments> supervisorAssignmentsList =
+                        getAllAssignmentsFromNumaSupervisors(
+                                this.supervisor.getLocalNimbus(), node
+                        );
+                assignedAssignmentsToLocal(clusterState, supervisorAssignmentsList);
             } catch (TException e) {
                 LOG.error("Get assignments from local master exception", e);
             }
         } else {
             try (NimbusClient master = NimbusClient.getConfiguredClient(conf)) {
-                SupervisorAssignments assignments = master.getClient().getSupervisorAssignments(node);
-                LOG.debug("Sync an assignments from master, will start to sync with assignments: {}", assignments);
-                assignedAssignmentsToLocal(clusterState, assignments);
+                List<SupervisorAssignments> supervisorAssignmentsList = getAllAssignmentsFromNumaSupervisors(master.getClient(), node);
+                LOG.debug("Sync an assignments from master, will start to sync with assignments: {}", supervisorAssignmentsList);
+                assignedAssignmentsToLocal(clusterState, supervisorAssignmentsList);
             } catch (Exception t) {
                 LOG.error("Get assignments from master exception", t);
             }