You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/02/12 14:54:32 UTC
[storm] branch 2.1.x-branch updated: [STORM-3579] use the topo conf
for thrift client in Worker code
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/2.1.x-branch by this push:
new 90f6383 [STORM-3579] use the topo conf for thrift client in Worker code
90f6383 is described below
commit 90f6383cc126bf8607efbe8b66dbe8d1ed7ae21a
Author: Ethan Li <et...@gmail.com>
AuthorDate: Fri Feb 7 15:55:00 2020 -0600
[STORM-3579] use the topo conf for thrift client in Worker code
---
.../jvm/org/apache/storm/daemon/worker/Worker.java | 48 +++++++++++++---------
1 file changed, 28 insertions(+), 20 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 23e577e..b4e4c80 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -47,7 +47,6 @@ import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LogConfig;
-import org.apache.storm.generated.Supervisor;
import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IContext;
@@ -75,6 +74,7 @@ public class Worker implements Shutdownable, DaemonCommon {
private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
private static final Pattern BLOB_VERSION_EXTRACTION = Pattern.compile(".*\\.([0-9]+)$");
private final Map<String, Object> conf;
+ private final Map<String, Object> topologyConf;
private final IContext context;
private final String topologyId;
private final String assignmentId;
@@ -93,7 +93,6 @@ public class Worker implements Shutdownable, DaemonCommon {
private Collection<IAutoCredentials> autoCreds;
private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
-
/**
* TODO: should worker even take the topologyId as input? this should be deducible from cluster state (by searching through assignments)
* what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency.
@@ -106,9 +105,9 @@ public class Worker implements Shutdownable, DaemonCommon {
* @param port - port on which the worker runs
* @param workerId - worker id
*/
-
public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
- int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier) {
+ int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier)
+ throws IOException {
this.conf = conf;
this.context = context;
this.topologyId = topologyId;
@@ -118,7 +117,25 @@ public class Worker implements Shutdownable, DaemonCommon {
this.workerId = workerId;
this.logConfigManager = new LogConfigManager();
this.metricRegistry = new StormMetricRegistry();
- this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+
+ this.topologyConf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
+ if (supervisorIfaceSupplier == null) {
+ this.supervisorIfaceSupplier = () -> {
+ try {
+ return SupervisorClient.getConfiguredClient(topologyConf, Utils.hostname(), supervisorPort);
+ } catch (UnknownHostException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ };
+ } else {
+ this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+ }
+ }
+
+ public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
+ int supervisorPort, int port, String workerId) throws IOException {
+ this(conf, context, topologyId, assignmentId, supervisorPort, port, workerId, null);
}
public static void main(String[] args) throws Exception {
@@ -132,15 +149,7 @@ public class Worker implements Shutdownable, DaemonCommon {
Utils.setupDefaultUncaughtExceptionHandler();
StormCommon.validateDistributedMode(conf);
int supervisorPortInt = Integer.parseInt(supervisorPort);
- Supplier<SupervisorIfaceFactory> supervisorIfaceSuppler = () -> {
- try {
- return SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPortInt);
- } catch (UnknownHostException e) {
- throw Utils.wrapInRuntime(e);
- }
- };
- Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt,
- Integer.parseInt(portStr), workerId, supervisorIfaceSuppler);
+ Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
worker.start();
int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
@@ -161,8 +170,7 @@ public class Worker implements Shutdownable, DaemonCommon {
FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
Charset.forName("UTF-8"));
}
- final Map<String, Object> topologyConf =
- ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
@@ -178,12 +186,12 @@ public class Worker implements Shutdownable, DaemonCommon {
subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds);
Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
- () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials)
+ () -> loadWorker(stateStorage, stormClusterState, initCreds, initialCredentials)
);
}
- private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
+ private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormClusterState,
Map<String, String> initCreds, Credentials initialCredentials)
throws Exception {
workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
@@ -221,7 +229,7 @@ public class Worker implements Shutdownable, DaemonCommon {
List<Executor> execs = new ArrayList<>();
for (List<Long> e : workerState.getLocalExecutors()) {
- if (ConfigUtils.isLocalMode(topologyConf)) {
+ if (ConfigUtils.isLocalMode(conf)) {
Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
execs.add(executor);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {
@@ -443,7 +451,7 @@ public class Worker implements Shutdownable, DaemonCommon {
} catch (Exception tr1) {
//If any error/exception thrown, report directly to nimbus.
LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage());
- try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)) {
+ try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(topologyConf)) {
nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
} catch (Exception tr2) {
//if any error/exception thrown, just ignore.