You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/02/12 14:54:32 UTC

[storm] branch 2.1.x-branch updated: [STORM-3579] use the topo conf for thrift client in Worker code

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/2.1.x-branch by this push:
     new 90f6383  [STORM-3579]  use the topo conf for thrift client in Worker code
90f6383 is described below

commit 90f6383cc126bf8607efbe8b66dbe8d1ed7ae21a
Author: Ethan Li <et...@gmail.com>
AuthorDate: Fri Feb 7 15:55:00 2020 -0600

    [STORM-3579]  use the topo conf for thrift client in Worker code
---
 .../jvm/org/apache/storm/daemon/worker/Worker.java | 48 +++++++++++++---------
 1 file changed, 28 insertions(+), 20 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 23e577e..b4e4c80 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -47,7 +47,6 @@ import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LogConfig;
-import org.apache.storm.generated.Supervisor;
 import org.apache.storm.generated.SupervisorWorkerHeartbeat;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
@@ -75,6 +74,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
     private static final Pattern BLOB_VERSION_EXTRACTION = Pattern.compile(".*\\.([0-9]+)$");
     private final Map<String, Object> conf;
+    private final Map<String, Object> topologyConf;
     private final IContext context;
     private final String topologyId;
     private final String assignmentId;
@@ -93,7 +93,6 @@ public class Worker implements Shutdownable, DaemonCommon {
     private Collection<IAutoCredentials> autoCreds;
     private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
 
-
     /**
      * TODO: should worker even take the topologyId as input? this should be deducible from cluster state (by searching through assignments)
      * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency.
@@ -106,9 +105,9 @@ public class Worker implements Shutdownable, DaemonCommon {
      * @param port           - port on which the worker runs
      * @param workerId       - worker id
      */
-
     public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
-                  int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier) {
+                  int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier)
+        throws IOException {
         this.conf = conf;
         this.context = context;
         this.topologyId = topologyId;
@@ -118,7 +117,25 @@ public class Worker implements Shutdownable, DaemonCommon {
         this.workerId = workerId;
         this.logConfigManager = new LogConfigManager();
         this.metricRegistry = new StormMetricRegistry();
-        this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+
+        this.topologyConf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
+        if (supervisorIfaceSupplier == null) {
+            this.supervisorIfaceSupplier = () -> {
+                try {
+                    return SupervisorClient.getConfiguredClient(topologyConf, Utils.hostname(), supervisorPort);
+                } catch (UnknownHostException e) {
+                    throw Utils.wrapInRuntime(e);
+                }
+            };
+        } else {
+            this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+        }
+    }
+
+    public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
+                  int supervisorPort, int port, String workerId) throws IOException {
+        this(conf, context, topologyId, assignmentId, supervisorPort, port, workerId, null);
     }
 
     public static void main(String[] args) throws Exception {
@@ -132,15 +149,7 @@ public class Worker implements Shutdownable, DaemonCommon {
         Utils.setupDefaultUncaughtExceptionHandler();
         StormCommon.validateDistributedMode(conf);
         int supervisorPortInt = Integer.parseInt(supervisorPort);
-        Supplier<SupervisorIfaceFactory> supervisorIfaceSuppler = () -> {
-            try {
-                return SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPortInt);
-            } catch (UnknownHostException e) {
-                throw Utils.wrapInRuntime(e);
-            }
-        };
-        Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt,
-                                   Integer.parseInt(portStr), workerId, supervisorIfaceSuppler);
+        Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
         worker.start();
         int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
         LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
@@ -161,8 +170,7 @@ public class Worker implements Shutdownable, DaemonCommon {
             FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
                                         Charset.forName("UTF-8"));
         }
-        final Map<String, Object> topologyConf =
-            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
         ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
         IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
         IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
@@ -178,12 +186,12 @@ public class Worker implements Shutdownable, DaemonCommon {
         subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds);
 
         Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
-            () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials)
+            () -> loadWorker(stateStorage, stormClusterState, initCreds, initialCredentials)
         );
 
     }
 
-    private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
+    private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormClusterState,
                               Map<String, String> initCreds, Credentials initialCredentials)
         throws Exception {
         workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
@@ -221,7 +229,7 @@ public class Worker implements Shutdownable, DaemonCommon {
 
         List<Executor> execs = new ArrayList<>();
         for (List<Long> e : workerState.getLocalExecutors()) {
-            if (ConfigUtils.isLocalMode(topologyConf)) {
+            if (ConfigUtils.isLocalMode(conf)) {
                 Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
                 execs.add(executor);
                 for (int i = 0; i < executor.getTaskIds().size(); ++i) {
@@ -443,7 +451,7 @@ public class Worker implements Shutdownable, DaemonCommon {
         } catch (Exception tr1) {
             //If any error/exception thrown, report directly to nimbus.
             LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage());
-            try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)) {
+            try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(topologyConf)) {
                 nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
             } catch (Exception tr2) {
                 //if any error/exception thrown, just ignore.