You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by go...@apache.org on 2020/04/24 15:21:16 UTC
[storm] branch master updated: STORM-3259: Fixes reading of NUMA
supervisor assignments
This is an automated email from the ASF dual-hosted git repository.
govind pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 5daf0cc STORM-3259: Fixes reading of NUMA supervisor assignments
new a2905c8 Merge pull request #3257 from govind-menon/STORM-3259-Containers
5daf0cc is described below
commit 5daf0cce0ddbbb7c6ceee38c5cfec9ba3a858517
Author: Govind Menon <go...@gmail.com>
AuthorDate: Thu Apr 23 13:07:20 2020 -0500
STORM-3259: Fixes reading of NUMA supervisor assignments
---
.../storm/daemon/supervisor/ReadClusterState.java | 4 +-
.../supervisor/timer/SynchronizeAssignments.java | 57 +++++++++++++++++-----
2 files changed, 48 insertions(+), 13 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 7337927..503b44a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -247,7 +247,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = assignment.get_worker_resources();
if (nodeInfoWorkerResourcesMap != null) {
for (Map.Entry<NodeInfo, WorkerResources> entry : nodeInfoWorkerResourcesMap.entrySet()) {
- if (entry.getKey().get_node().equals(assignmentId)) {
+ if (entry.getKey().get_node().startsWith(assignmentId)) {
Set<Long> ports = entry.getKey().get_port();
for (Long port : ports) {
slotsResources.put(port, entry.getValue());
@@ -267,7 +267,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
Map<List<Long>, NodeInfo> executorNodePort = assignment.get_executor_node_port();
if (executorNodePort != null) {
for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) {
- if (entry.getValue().get_node().equals(assignmentId)) {
+ if (entry.getValue().get_node().startsWith(assignmentId)) {
for (Long port : entry.getValue().get_port()) {
LocalAssignment localAssignment = portTasks.get(port.intValue());
if (localAssignment == null) {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
index 3960ce9..62f73f2 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
@@ -12,12 +12,19 @@
package org.apache.storm.daemon.supervisor.timer;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+
+import org.apache.storm.ServerConstants;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.supervisor.ReadClusterState;
import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.SupervisorAssignments;
import org.apache.storm.thrift.TException;
import org.apache.storm.utils.ConfigUtils;
@@ -49,14 +56,21 @@ public class SynchronizeAssignments implements Runnable {
this.readClusterState = readClusterState;
}
- private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) {
- if (null == assignments) {
+ private static void assignedAssignmentsToLocal(IStormClusterState clusterState,
+ List<SupervisorAssignments> supervisorAssignments) {
+ if (null == supervisorAssignments || supervisorAssignments.isEmpty()) {
//unknown error, just skip
return;
}
Map<String, byte[]> serAssignments = new HashMap<>();
- for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) {
- serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
+ for (SupervisorAssignments supervisorAssignment : supervisorAssignments) {
+ if (supervisorAssignment == null) {
+ //unknown error, just skip
+ continue;
+ }
+ for (Map.Entry<String, Assignment> entry : supervisorAssignment.get_storm_assignment().entrySet()) {
+ serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
+ }
}
clusterState.syncRemoteAssignments(serAssignments);
}
@@ -67,7 +81,7 @@ public class SynchronizeAssignments implements Runnable {
if (null == assignments) {
getAssignmentsFromMaster(this.supervisor.getConf(), this.supervisor.getStormClusterState(), this.supervisor.getAssignmentId());
} else {
- assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), assignments);
+ assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), Collections.singletonList(assignments));
}
this.readClusterState.run();
}
@@ -81,7 +95,7 @@ public class SynchronizeAssignments implements Runnable {
while (!success) {
try (NimbusClient master = NimbusClient.getConfiguredClient(supervisor.getConf())) {
SupervisorAssignments assignments = master.getClient().getSupervisorAssignments(supervisor.getAssignmentId());
- assignedAssignmentsToLocal(supervisor.getStormClusterState(), assignments);
+ assignedAssignmentsToLocal(supervisor.getStormClusterState(), Collections.singletonList(assignments));
success = true;
} catch (Exception t) {
// just ignore the exception
@@ -99,6 +113,24 @@ public class SynchronizeAssignments implements Runnable {
}
+ public List<SupervisorAssignments> getAllAssignmentsFromNumaSupervisors(
+ Nimbus.Iface nimbus, String node
+ ) throws TException {
+ List<SupervisorAssignments> supervisorAssignmentsList = new ArrayList();
+ Map<String, Object> validatedNumaMap = SupervisorUtils.getNumaMap(supervisor.getConf());
+ for (Map.Entry<String, Object> numaEntry : validatedNumaMap.entrySet()) {
+ String numaId = numaEntry.getKey();
+ SupervisorAssignments assignments = nimbus.getSupervisorAssignments(
+ node + ServerConstants.NUMA_ID_SEPARATOR + numaId
+ );
+ supervisorAssignmentsList.add(assignments);
+ }
+ SupervisorAssignments assignments = nimbus.getSupervisorAssignments(node);
+ supervisorAssignmentsList.add(assignments);
+
+ return supervisorAssignmentsList;
+ }
+
/**
* Used by {@link Supervisor} to fetch assignments when start up.
* @param conf config
@@ -108,16 +140,19 @@ public class SynchronizeAssignments implements Runnable {
public void getAssignmentsFromMaster(Map conf, IStormClusterState clusterState, String node) {
if (ConfigUtils.isLocalMode(conf)) {
try {
- SupervisorAssignments assignments = this.supervisor.getLocalNimbus().getSupervisorAssignments(node);
- assignedAssignmentsToLocal(clusterState, assignments);
+ List<SupervisorAssignments> supervisorAssignmentsList =
+ getAllAssignmentsFromNumaSupervisors(
+ this.supervisor.getLocalNimbus(), node
+ );
+ assignedAssignmentsToLocal(clusterState, supervisorAssignmentsList);
} catch (TException e) {
LOG.error("Get assignments from local master exception", e);
}
} else {
try (NimbusClient master = NimbusClient.getConfiguredClient(conf)) {
- SupervisorAssignments assignments = master.getClient().getSupervisorAssignments(node);
- LOG.debug("Sync an assignments from master, will start to sync with assignments: {}", assignments);
- assignedAssignmentsToLocal(clusterState, assignments);
+ List<SupervisorAssignments> supervisorAssignmentsList = getAllAssignmentsFromNumaSupervisors(master.getClient(), node);
+ LOG.debug("Sync an assignments from master, will start to sync with assignments: {}", supervisorAssignmentsList);
+ assignedAssignmentsToLocal(clusterState, supervisorAssignmentsList);
} catch (Exception t) {
LOG.error("Get assignments from master exception", t);
}