You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/02/12 14:52:09 UTC
[storm] branch master updated: [STORM-3579] use the topo conf for
thrift client in Worker code
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 65f374c [STORM-3579] use the topo conf for thrift client in Worker code
new cdd3169 Merge pull request #3208 from Ethanlm/STORM-3579
65f374c is described below
commit 65f374c5e9a258d1d485e8c99c334c0ba6122f74
Author: Ethan Li <et...@gmail.com>
AuthorDate: Fri Feb 7 15:55:00 2020 -0600
[STORM-3579] use the topo conf for thrift client in Worker code
---
.../jvm/org/apache/storm/daemon/worker/Worker.java | 48 +++++++++++++---------
.../org/apache/storm/messaging/netty/Login.java | 11 +++--
2 files changed, 36 insertions(+), 23 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 23e577e..b4e4c80 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -47,7 +47,6 @@ import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LogConfig;
-import org.apache.storm.generated.Supervisor;
import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IContext;
@@ -75,6 +74,7 @@ public class Worker implements Shutdownable, DaemonCommon {
private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
private static final Pattern BLOB_VERSION_EXTRACTION = Pattern.compile(".*\\.([0-9]+)$");
private final Map<String, Object> conf;
+ private final Map<String, Object> topologyConf;
private final IContext context;
private final String topologyId;
private final String assignmentId;
@@ -93,7 +93,6 @@ public class Worker implements Shutdownable, DaemonCommon {
private Collection<IAutoCredentials> autoCreds;
private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
-
/**
* TODO: should worker even take the topologyId as input? this should be deducible from cluster state (by searching through assignments)
* what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency.
@@ -106,9 +105,9 @@ public class Worker implements Shutdownable, DaemonCommon {
* @param port - port on which the worker runs
* @param workerId - worker id
*/
-
public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
- int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier) {
+ int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier)
+ throws IOException {
this.conf = conf;
this.context = context;
this.topologyId = topologyId;
@@ -118,7 +117,25 @@ public class Worker implements Shutdownable, DaemonCommon {
this.workerId = workerId;
this.logConfigManager = new LogConfigManager();
this.metricRegistry = new StormMetricRegistry();
- this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+
+ this.topologyConf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
+ if (supervisorIfaceSupplier == null) {
+ this.supervisorIfaceSupplier = () -> {
+ try {
+ return SupervisorClient.getConfiguredClient(topologyConf, Utils.hostname(), supervisorPort);
+ } catch (UnknownHostException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ };
+ } else {
+ this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+ }
+ }
+
+ public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
+ int supervisorPort, int port, String workerId) throws IOException {
+ this(conf, context, topologyId, assignmentId, supervisorPort, port, workerId, null);
}
public static void main(String[] args) throws Exception {
@@ -132,15 +149,7 @@ public class Worker implements Shutdownable, DaemonCommon {
Utils.setupDefaultUncaughtExceptionHandler();
StormCommon.validateDistributedMode(conf);
int supervisorPortInt = Integer.parseInt(supervisorPort);
- Supplier<SupervisorIfaceFactory> supervisorIfaceSuppler = () -> {
- try {
- return SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPortInt);
- } catch (UnknownHostException e) {
- throw Utils.wrapInRuntime(e);
- }
- };
- Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt,
- Integer.parseInt(portStr), workerId, supervisorIfaceSuppler);
+ Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
worker.start();
int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
@@ -161,8 +170,7 @@ public class Worker implements Shutdownable, DaemonCommon {
FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
Charset.forName("UTF-8"));
}
- final Map<String, Object> topologyConf =
- ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
@@ -178,12 +186,12 @@ public class Worker implements Shutdownable, DaemonCommon {
subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds);
Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
- () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials)
+ () -> loadWorker(stateStorage, stormClusterState, initCreds, initialCredentials)
);
}
- private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
+ private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormClusterState,
Map<String, String> initCreds, Credentials initialCredentials)
throws Exception {
workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
@@ -221,7 +229,7 @@ public class Worker implements Shutdownable, DaemonCommon {
List<Executor> execs = new ArrayList<>();
for (List<Long> e : workerState.getLocalExecutors()) {
- if (ConfigUtils.isLocalMode(topologyConf)) {
+ if (ConfigUtils.isLocalMode(conf)) {
Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
execs.add(executor);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {
@@ -443,7 +451,7 @@ public class Worker implements Shutdownable, DaemonCommon {
} catch (Exception tr1) {
//If any error/exception thrown, report directly to nimbus.
LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage());
- try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)) {
+ try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(topologyConf)) {
nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
} catch (Exception tr2) {
//if any error/exception thrown, just ignore.
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
index d6a345e..27b356a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
@@ -34,7 +34,6 @@ import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import org.apache.log4j.Logger;
-import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.shade.org.apache.zookeeper.Shell;
import org.apache.storm.shade.org.apache.zookeeper.client.ZooKeeperSaslClient;
@@ -296,8 +295,14 @@ public class Login {
+ System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + ")");
}
Configuration configuration = this.getConfiguration(jaasConfFile);
- LoginContext loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
- loginContext.login();
+ LoginContext loginContext;
+ try {
+ loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
+ loginContext.login();
+ } catch (LoginException e) {
+ LOG.error("Login using jaas conf " + jaasConfFile + " failed");
+ throw e;
+ }
LOG.info("Successfully logged in to context " + loginContextName + " using " + jaasConfFile);
return loginContext;
}