You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2019/11/19 15:01:23 UTC
[storm] branch master updated: STORM-3538 Add Meter for
sendSupervisorAssignments exception
This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 8eec7fd STORM-3538 Add Meter for sendSupervisorAssignments exception
new 5786bb0 Merge pull request #3167 from dandsager1/STORM-3538
8eec7fd is described below
commit 8eec7fd3d721b78c1e8a522c78433a8898dfd4c2
Author: david <da...@verizonmedia.com>
AuthorDate: Fri Nov 15 09:00:40 2019 -0600
STORM-3538 Add Meter for sendSupervisorAssignments exception
---
.../src/jvm/org/apache/storm/Constants.java | 2 ++
.../org/apache/storm/daemon/nimbus/Nimbus.java | 19 +++++++++++------
.../apache/storm/metric/StormMetricsRegistry.java | 4 ++++
.../nimbus/AssignmentDistributionService.java | 24 +++++++++++++++-------
4 files changed, 36 insertions(+), 13 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index 57af8d1..7a1c518 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -55,5 +55,7 @@ public class Constants {
public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb";
public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb";
public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
+
+ public static final String NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS = "nimbus:num-send-assignment-exceptions";
}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index a3b7575..b56a623 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -268,6 +268,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private final Meter shutdownCalls;
private final Meter processWorkerMetricsCalls;
private final Meter mkAssignmentsErrors;
+ private final Meter sendAssignmentExceptions; // used in AssignmentDistributionService.java
//Timer
private final Timer fileUploadDuration;
@@ -305,7 +306,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
IStormClusterState state = nimbus.getStormClusterState();
Assignment oldAssignment = state.assignmentInfo(topoId, null);
state.removeStorm(topoId);
- notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer());
+ notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer(), nimbus.getMetricsRegistry());
nimbus.heartbeatsCache.removeTopo(topoId);
nimbus.getIdToExecutors().getAndUpdate(new Dissoc<>(topoId));
return null;
@@ -517,6 +518,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
this.shutdownCalls = metricsRegistry.registerMeter("nimbus:num-shutdown-calls");
this.processWorkerMetricsCalls = metricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
this.mkAssignmentsErrors = metricsRegistry.registerMeter("nimbus:mkAssignments-Errors");
+ this.sendAssignmentExceptions = metricsRegistry.registerMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS);
this.fileUploadDuration = metricsRegistry.registerTimer("nimbus:files-upload-duration-ms");
this.schedulingDuration = metricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms");
this.numAddedExecPerScheduling = metricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling");
@@ -1578,7 +1580,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
*/
private static void notifySupervisorsAssignments(Map<String, Assignment> assignments,
AssignmentDistributionService service, Map<String, String> nodeHost,
- Map<String, SupervisorDetails> supervisorDetails) {
+ Map<String, SupervisorDetails> supervisorDetails,
+ StormMetricsRegistry metricsRegistry) {
for (Map.Entry<String, String> nodeEntry : nodeHost.entrySet()) {
try {
String nodeId = nodeEntry.getKey();
@@ -1586,7 +1589,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
supervisorAssignments.set_storm_assignment(assignmentsForNode(assignments, nodeEntry.getKey()));
SupervisorDetails details = supervisorDetails.get(nodeId);
Integer serverPort = details != null ? details.getServerPort() : null;
- service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments);
+ service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments, metricsRegistry);
} catch (Throwable tr1) {
//just skip when any error happens wait for next round assignments reassign
LOG.error("Exception when add assignments distribution task for node {}", nodeEntry.getKey());
@@ -1595,10 +1598,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
private static void notifySupervisorsAsKilled(IStormClusterState clusterState, Assignment oldAss,
- AssignmentDistributionService service) {
+ AssignmentDistributionService service, StormMetricsRegistry metricsRegistry) {
Map<String, String> nodeHost = assignmentChangedNodes(oldAss, null);
notifySupervisorsAssignments(clusterState.assignmentsInfo(), service, nodeHost,
- basicSupervisorDetailsMap(clusterState));
+ basicSupervisorDetailsMap(clusterState), metricsRegistry);
}
@VisibleForTesting
@@ -1654,6 +1657,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return assignmentsDistributer;
}
+ private StormMetricsRegistry getMetricsRegistry() {
+ return metricsRegistry;
+ }
+
@VisibleForTesting
public HeartbeatCache getHeartbeatsCache() {
return heartbeatsCache;
@@ -2520,7 +2527,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment));
}
notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes,
- basicSupervisorDetailsMap);
+ basicSupervisorDetailsMap, getMetricsRegistry());
Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>();
for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index cc98804..5db347e 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -68,6 +68,10 @@ public class StormMetricsRegistry {
registry.removeMatching((name, metric) -> nameToMetric.containsKey(name));
}
+ public Meter getMeter(String meterName) {
+ return registry.getMeters().get(meterName);
+ }
+
public void startMetricsReporters(Map<String, Object> daemonConf) {
reporters = MetricsUtils.getPreparableReporters(daemonConf);
for (PreparableReporter reporter : reporters) {
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
index 4f84997..4eb1bb4 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -21,9 +21,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Constants;
import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.SupervisorClient;
@@ -146,7 +149,8 @@ public class AssignmentDistributionService implements Closeable {
* @param serverPort node thrift server port.
* @param assignments the {@link org.apache.storm.generated.SupervisorAssignments}
*/
- public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+ public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+ StormMetricsRegistry metricsRegistry) {
try {
//For some reasons, we can not get supervisor port info, eg: supervisor shutdown,
//Just skip for this scheduling round.
@@ -155,7 +159,8 @@ public class AssignmentDistributionService implements Closeable {
return;
}
- boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS);
+ boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort,
+ assignments, metricsRegistry), 5L, TimeUnit.SECONDS);
if (!success) {
LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node);
}
@@ -211,17 +216,20 @@ public class AssignmentDistributionService implements Closeable {
private String host;
private Integer serverPort;
private SupervisorAssignments assignments;
+ private StormMetricsRegistry metricsRegistry;
- private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+ private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+ StormMetricsRegistry metricsRegistry) {
this.node = node;
this.host = host;
this.serverPort = serverPort;
this.assignments = assignments;
+ this.metricsRegistry = metricsRegistry;
}
public static NodeAssignments getInstance(String node, String host, Integer serverPort,
- SupervisorAssignments assignments) {
- return new NodeAssignments(node, host, serverPort, assignments);
+ SupervisorAssignments assignments, StormMetricsRegistry metricsRegistry) {
+ return new NodeAssignments(node, host, serverPort, assignments, metricsRegistry);
}
//supervisor assignment id/supervisor id
@@ -241,6 +249,9 @@ public class AssignmentDistributionService implements Closeable {
return this.assignments;
}
+ public StormMetricsRegistry getMetricsRegistry() {
+ return metricsRegistry;
+ }
}
/**
@@ -289,14 +300,13 @@ public class AssignmentDistributionService implements Closeable {
try {
client.getIface().sendSupervisorAssignments(assignments.getAssignments());
} catch (Exception e) {
- //just ignore the exception.
+ assignments.getMetricsRegistry().getMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS).mark();
LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());
}
} catch (Throwable e) {
//just ignore any error/exception.
LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage());
}
-
}
}
}