You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/07/29 19:04:53 UTC

[storm] branch master updated: STORM-3737: Shared Worker Registry (#3373)

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a41049  STORM-3737: Shared Worker Registry (#3373)
6a41049 is described below

commit 6a410493e27da96b893625df13e7c75e217f6fc7
Author: Lakshman Sai <la...@gmail.com>
AuthorDate: Fri Jul 30 00:32:41 2021 +0530

    STORM-3737: Shared Worker Registry (#3373)
    
    * STORM-3737: Shared Worker Registry
    authored-by: lakshman-sai <la...@flipkart.com>
---
 storm-client/src/jvm/org/apache/storm/Constants.java            | 2 ++
 storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java | 7 ++++++-
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index 6d539c6..c3cd080 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -57,5 +57,7 @@ public class Constants {
 
     public static final String NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS = "nimbus:num-send-assignment-exceptions";
     public static final String SUPERVISOR_HEALTH_CHECK_TIMEOUTS = "supervisor:health-check-timeouts";
+
+    public static final String WORKER_METRICS_REGISTRY = "worker-metrics-registry";
 }
     
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 6196893..ad7d5a7 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -12,7 +12,10 @@
 
 package org.apache.storm.daemon.worker;
 
+import static org.apache.storm.Constants.WORKER_METRICS_REGISTRY;
+
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.SharedMetricRegistries;
 import java.io.File;
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -29,6 +32,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.security.auth.Subject;
+
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.cluster.ClusterStateContext;
@@ -187,6 +191,7 @@ public class Worker implements Shutdownable, DaemonCommon {
         IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
 
         metricRegistry.start(topologyConf, port);
+        SharedMetricRegistries.add(WORKER_METRICS_REGISTRY, metricRegistry.getRegistry());
 
         Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
         Map<String, String> initCreds = new HashMap<>();
@@ -552,7 +557,7 @@ public class Worker implements Shutdownable, DaemonCommon {
             }
 
             metricRegistry.stop();
-
+            SharedMetricRegistries.remove(WORKER_METRICS_REGISTRY);
             LOG.info("Shut down worker {} {} {}", topologyId, assignmentId, port);
         } catch (Exception ex) {
             throw Utils.wrapInRuntime(ex);