You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2019/11/19 15:01:23 UTC

[storm] branch master updated: STORM-3538 Add Meter for sendSupervisorAssignments exception

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eec7fd  STORM-3538 Add Meter for sendSupervisorAssignments exception
     new 5786bb0  Merge pull request #3167 from dandsager1/STORM-3538
8eec7fd is described below

commit 8eec7fd3d721b78c1e8a522c78433a8898dfd4c2
Author: david <da...@verizonmedia.com>
AuthorDate: Fri Nov 15 09:00:40 2019 -0600

    STORM-3538 Add Meter for sendSupervisorAssignments exception
---
 .../src/jvm/org/apache/storm/Constants.java        |  2 ++
 .../org/apache/storm/daemon/nimbus/Nimbus.java     | 19 +++++++++++------
 .../apache/storm/metric/StormMetricsRegistry.java  |  4 ++++
 .../nimbus/AssignmentDistributionService.java      | 24 +++++++++++++++-------
 4 files changed, 36 insertions(+), 13 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index 57af8d1..7a1c518 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -55,5 +55,7 @@ public class Constants {
     public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb";
     public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb";
     public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
+
+    public static final String NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS = "nimbus:num-send-assignment-exceptions";
 }
     
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index a3b7575..b56a623 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -268,6 +268,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private final Meter shutdownCalls;
     private final Meter processWorkerMetricsCalls;
     private final Meter mkAssignmentsErrors;
+    private final Meter sendAssignmentExceptions;   // used in AssignmentDistributionService.java
 
     //Timer
     private final Timer fileUploadDuration;
@@ -305,7 +306,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         IStormClusterState state = nimbus.getStormClusterState();
         Assignment oldAssignment = state.assignmentInfo(topoId, null);
         state.removeStorm(topoId);
-        notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer());
+        notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer(), nimbus.getMetricsRegistry());
         nimbus.heartbeatsCache.removeTopo(topoId);
         nimbus.getIdToExecutors().getAndUpdate(new Dissoc<>(topoId));
         return null;
@@ -517,6 +518,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         this.shutdownCalls = metricsRegistry.registerMeter("nimbus:num-shutdown-calls");
         this.processWorkerMetricsCalls = metricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
         this.mkAssignmentsErrors = metricsRegistry.registerMeter("nimbus:mkAssignments-Errors");
+        this.sendAssignmentExceptions = metricsRegistry.registerMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS);
         this.fileUploadDuration = metricsRegistry.registerTimer("nimbus:files-upload-duration-ms");
         this.schedulingDuration = metricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms");
         this.numAddedExecPerScheduling = metricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling");
@@ -1578,7 +1580,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
      */
     private static void notifySupervisorsAssignments(Map<String, Assignment> assignments,
                                                      AssignmentDistributionService service, Map<String, String> nodeHost,
-                                                     Map<String, SupervisorDetails> supervisorDetails) {
+                                                     Map<String, SupervisorDetails> supervisorDetails,
+                                                     StormMetricsRegistry metricsRegistry) {
         for (Map.Entry<String, String> nodeEntry : nodeHost.entrySet()) {
             try {
                 String nodeId = nodeEntry.getKey();
@@ -1586,7 +1589,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 supervisorAssignments.set_storm_assignment(assignmentsForNode(assignments, nodeEntry.getKey()));
                 SupervisorDetails details = supervisorDetails.get(nodeId);
                 Integer serverPort = details != null ? details.getServerPort() : null;
-                service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments);
+                service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments, metricsRegistry);
             } catch (Throwable tr1) {
                 //just skip when any error happens wait for next round assignments reassign
                 LOG.error("Exception when add assignments distribution task for node {}", nodeEntry.getKey());
@@ -1595,10 +1598,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
 
     private static void notifySupervisorsAsKilled(IStormClusterState clusterState, Assignment oldAss,
-                                                  AssignmentDistributionService service) {
+                                                  AssignmentDistributionService service, StormMetricsRegistry metricsRegistry) {
         Map<String, String> nodeHost = assignmentChangedNodes(oldAss, null);
         notifySupervisorsAssignments(clusterState.assignmentsInfo(), service, nodeHost,
-                                     basicSupervisorDetailsMap(clusterState));
+                                     basicSupervisorDetailsMap(clusterState), metricsRegistry);
     }
 
     @VisibleForTesting
@@ -1654,6 +1657,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return assignmentsDistributer;
     }
 
+    private StormMetricsRegistry getMetricsRegistry() {
+        return metricsRegistry;
+    }
+
     @VisibleForTesting
     public HeartbeatCache getHeartbeatsCache() {
         return heartbeatsCache;
@@ -2520,7 +2527,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment));
             }
             notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes,
-                    basicSupervisorDetailsMap);
+                                        basicSupervisorDetailsMap, getMetricsRegistry());
 
             Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>();
             for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index cc98804..5db347e 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -68,6 +68,10 @@ public class StormMetricsRegistry {
         registry.removeMatching((name, metric) -> nameToMetric.containsKey(name));
     }
 
+    public Meter getMeter(String meterName) {
+        return registry.getMeters().get(meterName);
+    }
+
     public void startMetricsReporters(Map<String, Object> daemonConf) {
         reporters = MetricsUtils.getPreparableReporters(daemonConf);
         for (PreparableReporter reporter : reporters) {
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
index 4f84997..4eb1bb4 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -21,9 +21,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Constants;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.SupervisorClient;
@@ -146,7 +149,8 @@ public class AssignmentDistributionService implements Closeable {
      * @param serverPort node thrift server port.
      * @param assignments the {@link org.apache.storm.generated.SupervisorAssignments}
      */
-    public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+    public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+                                      StormMetricsRegistry metricsRegistry) {
         try {
             //For some reasons, we can not get supervisor port info, eg: supervisor shutdown,
             //Just skip for this scheduling round.
@@ -155,7 +159,8 @@ public class AssignmentDistributionService implements Closeable {
                 return;
             }
 
-            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS);
+            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort,
+                                                assignments, metricsRegistry), 5L, TimeUnit.SECONDS);
             if (!success) {
                 LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node);
             }
@@ -211,17 +216,20 @@ public class AssignmentDistributionService implements Closeable {
         private String host;
         private Integer serverPort;
         private SupervisorAssignments assignments;
+        private StormMetricsRegistry metricsRegistry;
 
-        private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+        private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+                                StormMetricsRegistry metricsRegistry) {
             this.node = node;
             this.host = host;
             this.serverPort = serverPort;
             this.assignments = assignments;
+            this.metricsRegistry = metricsRegistry;
         }
 
         public static NodeAssignments getInstance(String node, String host, Integer serverPort,
-                                                  SupervisorAssignments assignments) {
-            return new NodeAssignments(node, host, serverPort, assignments);
+                                                  SupervisorAssignments assignments, StormMetricsRegistry metricsRegistry) {
+            return new NodeAssignments(node, host, serverPort, assignments, metricsRegistry);
         }
 
         //supervisor assignment id/supervisor id
@@ -241,6 +249,9 @@ public class AssignmentDistributionService implements Closeable {
             return this.assignments;
         }
 
+        public StormMetricsRegistry getMetricsRegistry() {
+            return metricsRegistry;
+        }
     }
 
     /**
@@ -289,14 +300,13 @@ public class AssignmentDistributionService implements Closeable {
                     try {
                         client.getIface().sendSupervisorAssignments(assignments.getAssignments());
                     } catch (Exception e) {
-                        //just ignore the exception.
+                        assignments.getMetricsRegistry().getMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS).mark();
                         LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());
                     }
                 } catch (Throwable e) {
                     //just ignore any error/exception.
                     LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage());
                 }
-
             }
         }
     }