You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:29 UTC
[40/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java
new file mode 100644
index 0000000..00fccf9
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.storm.utils.Time;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.LogLevel;
+import org.apache.storm.generated.LogLevelAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class LogConfigManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogConfigManager.class);
+
+ private final AtomicReference<TreeMap<String, LogLevel>> latestLogConfig;
+ private final Map<String, Level> originalLogLevels;
+
+ public LogConfigManager() {
+ this(new AtomicReference<>(new TreeMap<>()));
+ }
+
+ public LogConfigManager(AtomicReference<TreeMap<String, LogLevel>> latestLogConfig) {
+ this.latestLogConfig = latestLogConfig;
+ this.originalLogLevels = getLoggerLevels();
+ LOG.info("Started with log levels: {}", originalLogLevels);
+ }
+
+ public void processLogConfigChange(LogConfig logConfig) {
+ if (null != logConfig) {
+ LOG.debug("Processing received log config: {}", logConfig);
+ TreeMap<String, LogLevel> loggers = new TreeMap<>(logConfig.get_named_logger_level());
+ LoggerContext logContext = (LoggerContext) LogManager.getContext(false);
+ Map<String, LogLevel> newLogConfigs = new HashMap<>();
+ for (Map.Entry<String, LogLevel> entry : loggers.entrySet()) {
+ String msgLoggerName = entry.getKey();
+ msgLoggerName = ("ROOT".equalsIgnoreCase(msgLoggerName)) ? LogManager.ROOT_LOGGER_NAME : msgLoggerName;
+ LogLevel loggerLevel = entry.getValue();
+ // the new-timeouts map now contains logger => timeout
+ if (loggerLevel.is_set_reset_log_level_timeout_epoch()) {
+ LogLevel copy = new LogLevel(loggerLevel);
+ if (originalLogLevels.containsKey(msgLoggerName)) {
+ copy.set_reset_log_level(originalLogLevels.get(msgLoggerName).name());
+ } else {
+ copy.set_reset_log_level(Level.INFO.name());
+ }
+
+ newLogConfigs.put(msgLoggerName, copy);
+ }
+
+ }
+
+ // Look for deleted log timeouts
+ TreeMap<String,LogLevel> latestConf = latestLogConfig.get();
+ if (latestConf != null) {
+ for (String loggerName : latestConf.descendingKeySet()) {
+ if (! newLogConfigs.containsKey(loggerName)) {
+ // if we had a timeout, but the timeout is no longer active
+ setLoggerLevel(logContext, loggerName, latestConf.get(loggerName).get_reset_log_level());
+
+ }
+ }
+ }
+
+ // apply new log settings we just received
+ // the merged configs are only for the reset logic
+ for (String loggerName : new TreeSet<>(logConfig.get_named_logger_level().keySet())) {
+ LogLevel logLevel = logConfig.get_named_logger_level().get(loggerName);
+ loggerName = ("ROOT".equalsIgnoreCase(loggerName)) ? LogManager.ROOT_LOGGER_NAME : loggerName;
+ LogLevelAction action = logLevel.get_action();
+ if (action == LogLevelAction.UPDATE) {
+ setLoggerLevel(logContext, loggerName, logLevel.get_target_log_level());
+ }
+
+ }
+
+ logContext.updateLoggers();
+ latestLogConfig.set(new TreeMap<>(newLogConfigs));
+ LOG.debug("New merged log config is {}", latestLogConfig.get());
+ }
+ }
+
+ // function called on timer to reset log levels last set to DEBUG
+ // also called from processLogConfigChange
+ public void resetLogLevels() {
+ TreeMap<String, LogLevel> latestLogLevelMap = latestLogConfig.get();
+ LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false);
+
+ for (String loggerName : latestLogLevelMap.descendingKeySet()) {
+ LogLevel loggerSetting = latestLogLevelMap.get(loggerName);
+ long timeout = loggerSetting.get_reset_log_level_timeout_epoch();
+ String resetLogLevel = loggerSetting.get_reset_log_level();
+ if (timeout < Time.currentTimeMillis()) {
+ LOG.info("{}: Resetting level to {}", loggerName, resetLogLevel);
+ setLoggerLevel(loggerContext, loggerName, resetLogLevel);
+ }
+ latestLogConfig.getAndUpdate(input -> {
+ TreeMap<String, LogLevel> result = new TreeMap<>(input);
+ result.remove(loggerName);
+ return result;
+ });
+ }
+ loggerContext.updateLoggers();
+ }
+
+ public Map<String, Level> getLoggerLevels() {
+ Configuration loggerConfig = ((LoggerContext) LogManager.getContext(false)).getConfiguration();
+ Map<String, Level> logLevelMap = new HashMap<>();
+ for (Map.Entry<String, LoggerConfig> entry : loggerConfig.getLoggers().entrySet()) {
+ logLevelMap.put(entry.getKey(), entry.getValue().getLevel());
+ }
+ return logLevelMap;
+ }
+
+ public void setLoggerLevel(LoggerContext logContext, String loggerName, String newLevelStr) {
+ Level newLevel = Level.getLevel(newLevelStr);
+ Configuration configuration = logContext.getConfiguration();
+ LoggerConfig loggerConfig = configuration.getLoggerConfig(loggerName);
+ if (loggerConfig.getName().equalsIgnoreCase(loggerName)) {
+ LOG.info("Setting {} log level to: {}", loggerConfig, newLevel);
+ loggerConfig.setLevel(newLevel);
+ } else {
+ // create a new config. Make it additive (true) s.t. inherit parents appenders
+ LoggerConfig newLoggerConfig = new LoggerConfig(loggerName, newLevel, true);
+ LOG.info("Adding config for: {} with level: {}", newLoggerConfig, newLevel);
+ configuration.addLogger(loggerName, newLoggerConfig);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
new file mode 100644
index 0000000..74f5c5f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.security.auth.Subject;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.ExecutorShutdown;
+import org.apache.storm.executor.IRunningExecutor;
+import org.apache.storm.executor.LocalExecutor;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.WorkerBackpressureCallback;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.lmax.disruptor.EventHandler;
+
+import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
+
+public class Worker implements Shutdownable, DaemonCommon {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
+ private final Map conf;
+ private final IContext context;
+ private final String topologyId;
+ private final String assignmentId;
+ private final int port;
+ private final String workerId;
+ private final LogConfigManager logConfigManager;
+
+
+ private WorkerState workerState;
+ private AtomicReference<List<IRunningExecutor>> executorsAtom;
+ private Thread transferThread;
+ private WorkerBackpressureThread backpressureThread;
+
+ private AtomicReference<Credentials> credentialsAtom;
+ private Subject subject;
+ private Collection<IAutoCredentials> autoCreds;
+
+
+ /**
+ * TODO: should worker even take the topologyId as input? this should be
+ * deducable from cluster state (by searching through assignments)
+ * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency
+ *
+ * @param conf - Storm configuration
+ * @param context -
+ * @param topologyId - topology id
+ * @param assignmentId - assignement id
+ * @param port - port on which the worker runs
+ * @param workerId - worker id
+ */
+
+ public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
+ this.conf = conf;
+ this.context = context;
+ this.topologyId = topologyId;
+ this.assignmentId = assignmentId;
+ this.port = port;
+ this.workerId = workerId;
+ this.logConfigManager = new LogConfigManager();
+ }
+
+ public void start() throws Exception {
+ LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
+ conf);
+ // because in local mode, its not a separate
+ // process. supervisor will register it in this case
+ // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
+ if (!ConfigUtils.isLocalMode(conf)) {
+ // Distributed mode
+ SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
+ String pid = Utils.processPid();
+ FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
+ FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
+ Charset.forName("UTF-8"));
+ }
+ final Map topologyConf =
+ ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+ List<ACL> acls = Utils.getWorkerACL(topologyConf);
+ IStateStorage stateStorage =
+ ClusterUtils.mkStateStorage(conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER));
+ IStormClusterState stormClusterState =
+ ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext());
+ Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
+ Map<String, String> initCreds = new HashMap<>();
+ if (initialCredentials != null) {
+ initCreds.putAll(initialCredentials.get_creds());
+ }
+ autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
+ subject = AuthUtils.populateSubject(null, autoCreds, initCreds);
+
+ Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
+ @Override public Object run() throws Exception {
+ workerState =
+ new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
+ stormClusterState);
+
+ // Heartbeat here so that worker process dies if this fails
+ // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
+ // that worker is running and moves on
+ doHeartBeat();
+
+ executorsAtom = new AtomicReference<>(null);
+
+ // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
+ // to the supervisor
+ workerState.heartbeatTimer
+ .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+ try {
+ doHeartBeat();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ workerState.executorHeartbeatTimer
+ .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+ Worker.this::doExecutorHeartbeats);
+
+ workerState.registerCallbacks();
+
+ workerState.refreshConnections(null);
+
+ workerState.activateWorkerWhenAllConnectionsReady();
+
+ workerState.refreshStormActive(null);
+
+ workerState.runWorkerStartHooks();
+
+ List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
+ for (List<Long> e : workerState.getExecutors()) {
+ if (ConfigUtils.isLocalMode(topologyConf)) {
+ newExecutors.add(
+ LocalExecutor.mkExecutor(workerState, e, initCreds)
+ .execute());
+ } else {
+ newExecutors.add(
+ Executor.mkExecutor(workerState, e, initCreds)
+ .execute());
+ }
+ }
+ executorsAtom.set(newExecutors);
+
+ EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
+ .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
+
+ // This thread will publish the messages destined for remote tasks to remote connections
+ transferThread = Utils.asyncLoop(() -> {
+ workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
+ return 0L;
+ });
+
+ DisruptorBackpressureCallback disruptorBackpressureHandler =
+ mkDisruptorBackpressureHandler(workerState);
+ workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
+ workerState.transferQueue
+ .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
+ workerState.transferQueue
+ .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
+ workerState.transferQueue
+ .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
+
+ WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
+ backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
+ if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
+ backpressureThread.start();
+ stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
+
+ int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
+ workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
+ }
+
+ credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
+
+ establishLogSettingCallback();
+
+ workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
+
+ workerState.refreshCredentialsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
+ @Override public void run() {
+ checkCredentialsChanged();
+ if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
+ checkThrottleChanged();
+ }
+ }
+ });
+
+ // The jitter allows the clients to get the data at different times, and avoids thundering herd
+ if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
+ workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
+ }
+
+ workerState.refreshConnectionsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
+
+ workerState.resetLogLevelsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
+
+ workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
+ workerState::refreshStormActive);
+
+ LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+ LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port);
+ return this;
+ };
+ });
+
+ }
+
+ public void doHeartBeat() throws IOException {
+ LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
+ state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
+ workerState.executors.stream()
+ .map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue()))
+ .collect(Collectors.toList()), workerState.port));
+ state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
+ // it shouldn't take supervisor 120 seconds between listing dir and reading it
+ }
+
+ public void doExecutorHeartbeats() {
+ Map<List<Integer>, ExecutorStats> stats;
+ List<IRunningExecutor> executors = this.executorsAtom.get();
+ if (null == executors) {
+ stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.executors);
+ } else {
+ stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
+ .toMap((Function<IRunningExecutor, List<Long>>) IRunningExecutor::getExecutorId,
+ (Function<IRunningExecutor, ExecutorStats>) IRunningExecutor::renderStats)));
+ }
+ Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
+ try {
+ workerState.stormClusterState
+ .workerHeartbeat(workerState.topologyId, workerState.assignmentId, (long) workerState.port,
+ StatsUtil.thriftifyZkWorkerHb(zkHB));
+ } catch (Exception ex) {
+ LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", ex);
+ }
+ }
+
+ public void checkCredentialsChanged() {
+ Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null);
+ if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) {
+ // This does not have to be atomic, worst case we update when one is not needed
+ AuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds());
+ for (IRunningExecutor executor : executorsAtom.get()) {
+ executor.credentialsChanged(newCreds);
+ }
+ credentialsAtom.set(newCreds);
+ }
+ }
+
+ public void checkThrottleChanged() {
+ boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged);
+ workerState.throttleOn.set(throttleOn);
+ }
+
+ public void checkLogConfigChanged() {
+ LogConfig logConfig = workerState.stormClusterState.topologyLogConfig(topologyId, null);
+ logConfigManager.processLogConfigChange(logConfig);
+ establishLogSettingCallback();
+ }
+
+ public void establishLogSettingCallback() {
+ workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged);
+ }
+
+
+ /**
+ * make a handler for the worker's send disruptor queue to
+ * check highWaterMark and lowWaterMark for backpressure
+ */
+ private DisruptorBackpressureCallback mkDisruptorBackpressureHandler(WorkerState workerState) {
+ return new DisruptorBackpressureCallback() {
+ @Override public void highWaterMark() throws Exception {
+ LOG.debug("worker {} transfer-queue is congested, checking backpressure state", workerState.workerId);
+ WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
+ }
+
+ @Override public void lowWaterMark() throws Exception {
+ LOG.debug("worker {} transfer-queue is not congested, checking backpressure state", workerState.workerId);
+ WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
+ }
+ };
+ }
+
+ /**
+ * make a handler that checks and updates worker's backpressure flag
+ */
+ private WorkerBackpressureCallback mkBackpressureHandler() {
+ final List<IRunningExecutor> executors = executorsAtom.get();
+ return new WorkerBackpressureCallback() {
+ @Override public void onEvent(Object obj) {
+ String topologyId = workerState.topologyId;
+ String assignmentId = workerState.assignmentId;
+ int port = workerState.port;
+ IStormClusterState stormClusterState = workerState.stormClusterState;
+ boolean prevBackpressureFlag = workerState.backpressure.get();
+ boolean currBackpressureFlag = prevBackpressureFlag;
+ if (null != executors) {
+ currBackpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
+ .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
+ }
+
+ if (currBackpressureFlag != prevBackpressureFlag) {
+ try {
+ LOG.debug("worker backpressure flag changing from {} to {}", prevBackpressureFlag, currBackpressureFlag);
+ stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureFlag);
+ // doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
+ workerState.backpressure.set(currBackpressureFlag);
+ } catch (Exception ex) {
+ LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", ex);
+ }
+ }
+ }
+ };
+ }
+
+ @Override public void shutdown() {
+ try {
+ LOG.info("Shutting down worker {} {} {}", topologyId, assignmentId, port);
+
+ for (IConnection socket : workerState.cachedNodeToPortSocket.get().values()) {
+ //this will do best effort flushing since the linger period
+ // was set on creation
+ socket.close();
+ }
+ LOG.info("Terminating messaging context");
+ LOG.info("Shutting down executors");
+ for (IRunningExecutor executor : executorsAtom.get()) {
+ ((ExecutorShutdown) executor).shutdown();
+ }
+ LOG.info("Shut down executors");
+
+ // this is fine because the only time this is shared is when it's a local context,
+ // in which case it's a noop
+ workerState.mqContext.term();
+ LOG.info("Shutting down transfer thread");
+ workerState.transferQueue.haltWithInterrupt();
+
+ transferThread.interrupt();
+ transferThread.join();
+ LOG.info("Shut down transfer thread");
+
+ backpressureThread.terminate();
+ LOG.info("Shut down backpressure thread");
+
+ workerState.heartbeatTimer.close();
+ workerState.refreshConnectionsTimer.close();
+ workerState.refreshCredentialsTimer.close();
+ workerState.refreshBackpressureTimer.close();
+ workerState.refreshActiveTimer.close();
+ workerState.executorHeartbeatTimer.close();
+ workerState.userTimer.close();
+ workerState.refreshLoadTimer.close();
+ workerState.resetLogLevelsTimer.close();
+ workerState.closeResources();
+
+ LOG.info("Trigger any worker shutdown hooks");
+ workerState.runWorkerShutdownHooks();
+
+ workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
+ workerState.stormClusterState.removeWorkerBackpressure(topologyId, assignmentId, (long) port);
+ LOG.info("Disconnecting from storm cluster state context");
+ workerState.stormClusterState.disconnect();
+ workerState.stateStorage.close();
+ LOG.info("Shut down worker {} {} {}", topologyId, assignmentId, port);
+ } catch (Exception ex) {
+ throw Utils.wrapInRuntime(ex);
+ }
+
+ }
+
+ @Override public boolean isWaiting() {
+ return workerState.heartbeatTimer.isTimerWaiting()
+ && workerState.refreshConnectionsTimer.isTimerWaiting()
+ && workerState.refreshLoadTimer.isTimerWaiting()
+ && workerState.refreshCredentialsTimer.isTimerWaiting()
+ && workerState.refreshBackpressureTimer.isTimerWaiting()
+ && workerState.refreshActiveTimer.isTimerWaiting()
+ && workerState.executorHeartbeatTimer.isTimerWaiting()
+ && workerState.userTimer.isTimerWaiting();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Preconditions.checkArgument(args.length == 4, "Illegal number of arguemtns. Expected: 4, Actual: " + args.length);
+ String stormId = args[0];
+ String assignmentId = args[1];
+ String portStr = args[2];
+ String workerId = args[3];
+ Map conf = Utils.readStormConfig();
+ Utils.setupDefaultUncaughtExceptionHandler();
+ StormCommon.validateDistributedMode(conf);
+ Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(portStr), workerId);
+ worker.start();
+ Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
new file mode 100644
index 0000000..3913c32
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.StormBase;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.generated.TopologyStatus;
+import org.apache.storm.grouping.Load;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.hooks.BaseWorkerHook;
+import org.apache.storm.messaging.ConnectionWithStatus;
+import org.apache.storm.messaging.DeserializingConnectionCallback;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.messaging.TransportFactory;
+import org.apache.storm.serialization.KryoTupleSerializer;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ThriftTopologyUtils;
+import org.apache.storm.utils.TransferDrainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class WorkerState {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WorkerState.class);
+
+ final Map conf;
+ final IContext mqContext;
+
+ public Map getConf() {
+ return conf;
+ }
+
+ public IConnection getReceiver() {
+ return receiver;
+ }
+
+ public String getTopologyId() {
+ return topologyId;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getWorkerId() {
+ return workerId;
+ }
+
+ public IStateStorage getStateStorage() {
+ return stateStorage;
+ }
+
+ public AtomicBoolean getIsTopologyActive() {
+ return isTopologyActive;
+ }
+
+ public AtomicReference<Map<String, DebugOptions>> getStormComponentToDebug() {
+ return stormComponentToDebug;
+ }
+
+ public Set<List<Long>> getExecutors() {
+ return executors;
+ }
+
+ public List<Integer> getTaskIds() {
+ return taskIds;
+ }
+
+ public Map getTopologyConf() {
+ return topologyConf;
+ }
+
+ public StormTopology getTopology() {
+ return topology;
+ }
+
+ public StormTopology getSystemTopology() {
+ return systemTopology;
+ }
+
+ public Map<Integer, String> getTaskToComponent() {
+ return taskToComponent;
+ }
+
+ public Map<String, Map<String, Fields>> getComponentToStreamToFields() {
+ return componentToStreamToFields;
+ }
+
+ public Map<String, List<Integer>> getComponentToSortedTasks() {
+ return componentToSortedTasks;
+ }
+
+ public AtomicReference<Map<NodeInfo, IConnection>> getCachedNodeToPortSocket() {
+ return cachedNodeToPortSocket;
+ }
+
+ public Map<List<Long>, DisruptorQueue> getExecutorReceiveQueueMap() {
+ return executorReceiveQueueMap;
+ }
+
+ public Runnable getSuicideCallback() {
+ return suicideCallback;
+ }
+
+ public Utils.UptimeComputer getUptime() {
+ return uptime;
+ }
+
+ public Map<String, Object> getDefaultSharedResources() {
+ return defaultSharedResources;
+ }
+
+ public Map<String, Object> getUserSharedResources() {
+ return userSharedResources;
+ }
+
+ final IConnection receiver;
+ final String topologyId;
+ final String assignmentId;
+ final int port;
+ final String workerId;
+ final IStateStorage stateStorage;
+ final IStormClusterState stormClusterState;
+
+ // when worker bootup, worker will start to setup initial connections to
+ // other workers. When all connection is ready, we will enable this flag
+ // and spout and bolt will be activated.
+ // used in worker only, keep it as atomic
+ final AtomicBoolean isWorkerActive;
+ final AtomicBoolean isTopologyActive;
+ final AtomicReference<Map<String, DebugOptions>> stormComponentToDebug;
+
+ // executors and taskIds running in this worker
+ final Set<List<Long>> executors;
+ final List<Integer> taskIds;
+ final Map topologyConf;
+ final StormTopology topology;
+ final StormTopology systemTopology;
+ final Map<Integer, String> taskToComponent;
+ final Map<String, Map<String, Fields>> componentToStreamToFields;
+ final Map<String, List<Integer>> componentToSortedTasks;
+ final ReentrantReadWriteLock endpointSocketLock;
+ final AtomicReference<Map<Integer, NodeInfo>> cachedTaskToNodePort;
+ final AtomicReference<Map<NodeInfo, IConnection>> cachedNodeToPortSocket;
+ final Map<List<Long>, DisruptorQueue> executorReceiveQueueMap;
+ // executor id is in form [start_task_id end_task_id]
+ // short executor id is start_task_id
+ final Map<Integer, DisruptorQueue> shortExecutorReceiveQueueMap;
+ final Map<Integer, Integer> taskToShortExecutor;
+ final Runnable suicideCallback;
+ final Utils.UptimeComputer uptime;
+ final Map<String, Object> defaultSharedResources;
+ final Map<String, Object> userSharedResources;
+ final LoadMapping loadMapping;
+ final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
+ // Whether this worker is going slow
+ final AtomicBoolean backpressure = new AtomicBoolean(false);
+ // If the transfer queue is backed-up
+ final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
+ // a trigger for synchronization with executors
+ final AtomicBoolean backpressureTrigger = new AtomicBoolean(false);
+ // whether the throttle is activated for spouts
+ final AtomicBoolean throttleOn = new AtomicBoolean(false);
+
+ public LoadMapping getLoadMapping() {
+ return loadMapping;
+ }
+
+ public AtomicReference<Map<String, VersionedData<Assignment>>> getAssignmentVersions() {
+ return assignmentVersions;
+ }
+
+ public AtomicBoolean getBackpressureTrigger() {
+ return backpressureTrigger;
+ }
+
+ public AtomicBoolean getThrottleOn() {
+ return throttleOn;
+ }
+
+ public DisruptorQueue getTransferQueue() {
+ return transferQueue;
+ }
+
+ public StormTimer getUserTimer() {
+ return userTimer;
+ }
+
+ final DisruptorQueue transferQueue;
+
+ // Timers
+ final StormTimer heartbeatTimer = mkHaltingTimer("heartbeat-timer");
+ final StormTimer refreshLoadTimer = mkHaltingTimer("refresh-load-timer");
+ final StormTimer refreshConnectionsTimer = mkHaltingTimer("refresh-connections-timer");
+ final StormTimer refreshCredentialsTimer = mkHaltingTimer("refresh-credentials-timer");
+ final StormTimer resetLogLevelsTimer = mkHaltingTimer("reset-log-levels-timer");
+ final StormTimer refreshActiveTimer = mkHaltingTimer("refresh-active-timer");
+ final StormTimer executorHeartbeatTimer = mkHaltingTimer("executor-heartbeat-timer");
+ final StormTimer refreshBackpressureTimer = mkHaltingTimer("refresh-backpressure-timer");
+ final StormTimer userTimer = mkHaltingTimer("user-timer");
+
+ // global variables only used internally in class
+ private final Set<Integer> outboundTasks;
+ private final AtomicLong nextUpdate = new AtomicLong(0);
+ private final boolean trySerializeLocal;
+ private final TransferDrainer drainer;
+
+ private static final long LOAD_REFRESH_INTERVAL_MS = 5000L;
+
+ public WorkerState(Map conf, IContext mqContext, String topologyId, String assignmentId, int port, String workerId,
+ Map topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState)
+ throws IOException, InvalidTopologyException {
+ this.executors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
+ this.transferQueue = new DisruptorQueue("worker-transfer-queue",
+ ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE)),
+ (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS),
+ ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE)),
+ (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
+
+ this.conf = conf;
+ this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
+ this.receiver = this.mqContext.bind(topologyId, port);
+ this.topologyId = topologyId;
+ this.assignmentId = assignmentId;
+ this.port = port;
+ this.workerId = workerId;
+ this.stateStorage = stateStorage;
+ this.stormClusterState = stormClusterState;
+ this.isWorkerActive = new AtomicBoolean(false);
+ this.isTopologyActive = new AtomicBoolean(false);
+ this.stormComponentToDebug = new AtomicReference<>();
+ this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, executors);
+ this.shortExecutorReceiveQueueMap = new HashMap<>();
+ this.taskIds = new ArrayList<>();
+ for (Map.Entry<List<Long>, DisruptorQueue> entry : executorReceiveQueueMap.entrySet()) {
+ this.shortExecutorReceiveQueueMap.put(entry.getKey().get(0).intValue(), entry.getValue());
+ this.taskIds.addAll(StormCommon.executorIdToTasks(entry.getKey()));
+ }
+ Collections.sort(taskIds);
+ this.topologyConf = topologyConf;
+ this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
+ this.systemTopology = StormCommon.systemTopology(topologyConf, topology);
+ this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
+ this.componentToStreamToFields = new HashMap<>();
+ for (String c : ThriftTopologyUtils.getComponentIds(systemTopology)) {
+ Map<String, Fields> streamToFields = new HashMap<>();
+ for (Map.Entry<String, StreamInfo> stream : ThriftTopologyUtils.getComponentCommon(systemTopology, c).get_streams().entrySet()) {
+ streamToFields.put(stream.getKey(), new Fields(stream.getValue().get_output_fields()));
+ }
+ componentToStreamToFields.put(c, streamToFields);
+ }
+ this.componentToSortedTasks = Utils.reverseMap(taskToComponent);
+ this.componentToSortedTasks.values().forEach(Collections::sort);
+ this.endpointSocketLock = new ReentrantReadWriteLock();
+ this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap<>());
+ this.cachedTaskToNodePort = new AtomicReference<>(new HashMap<>());
+ this.taskToShortExecutor = new HashMap<>();
+ for (List<Long> executor : this.executors) {
+ for (Integer task : StormCommon.executorIdToTasks(executor)) {
+ taskToShortExecutor.put(task, executor.get(0).intValue());
+ }
+ }
+ this.suicideCallback = Utils.mkSuicideFn();
+ this.uptime = Utils.makeUptimeComputer();
+ this.defaultSharedResources = makeDefaultResources();
+ this.userSharedResources = makeUserResources();
+ this.loadMapping = new LoadMapping();
+ this.assignmentVersions = new AtomicReference<>(new HashMap<>());
+ this.outboundTasks = workerOutboundTasks();
+ this.trySerializeLocal = topologyConf.containsKey(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE)
+ && (Boolean) topologyConf.get(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
+ if (trySerializeLocal) {
+ LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
+ }
+ this.drainer = new TransferDrainer();
+ }
+
+ public void refreshConnections() {
+ try {
+ refreshConnections(() -> refreshConnectionsTimer.schedule(0, this::refreshConnections));
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ public void refreshConnections(Runnable callback) throws Exception {
+ Integer version = stormClusterState.assignmentVersion(topologyId, callback);
+ version = (null == version) ? 0 : version;
+ VersionedData<Assignment> assignmentVersion = assignmentVersions.get().get(topologyId);
+ Assignment assignment;
+ if (null != assignmentVersion && (assignmentVersion.getVersion() == version)) {
+ assignment = assignmentVersion.getData();
+ } else {
+ VersionedData<Assignment>
+ newAssignmentVersion = new VersionedData<>(version,
+ stormClusterState.assignmentInfoWithVersion(topologyId, callback).getData());
+ assignmentVersions.getAndUpdate(prev -> {
+ Map<String, VersionedData<Assignment>> next = new HashMap<>(prev);
+ next.put(topologyId, newAssignmentVersion);
+ return next;
+ });
+ assignment = newAssignmentVersion.getData();
+ }
+
+ Set<NodeInfo> neededConnections = new HashSet<>();
+ Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>();
+ if (null != assignment) {
+ Map<Integer, NodeInfo> taskToNodePort = StormCommon.taskToNodeport(assignment.get_executor_node_port());
+ for (Map.Entry<Integer, NodeInfo> taskToNodePortEntry : taskToNodePort.entrySet()) {
+ Integer task = taskToNodePortEntry.getKey();
+ if (outboundTasks.contains(task)) {
+ newTaskToNodePort.put(task, taskToNodePortEntry.getValue());
+ if (!taskIds.contains(task)) {
+ neededConnections.add(taskToNodePortEntry.getValue());
+ }
+ }
+ }
+ }
+
+ Set<NodeInfo> currentConnections = cachedNodeToPortSocket.get().keySet();
+ Set<NodeInfo> newConnections = Sets.difference(neededConnections, currentConnections);
+ Set<NodeInfo> removeConnections = Sets.difference(currentConnections, neededConnections);
+
+ // Add new connections atomically
+ cachedNodeToPortSocket.getAndUpdate(prev -> {
+ Map<NodeInfo, IConnection> next = new HashMap<>(prev);
+ for (NodeInfo nodeInfo : newConnections) {
+ next.put(nodeInfo,
+ mqContext.connect(
+ topologyId,
+ assignment.get_node_host().get(nodeInfo.get_node()), // Host
+ nodeInfo.get_port().iterator().next().intValue())); // Port
+ }
+ return next;
+ });
+
+
+ try {
+ endpointSocketLock.writeLock().lock();
+ cachedTaskToNodePort.set(newTaskToNodePort);
+ } finally {
+ endpointSocketLock.writeLock().unlock();
+ }
+
+ for (NodeInfo nodeInfo : removeConnections) {
+ cachedNodeToPortSocket.get().get(nodeInfo).close();
+ }
+
+ // Remove old connections atomically
+ cachedNodeToPortSocket.getAndUpdate(prev -> {
+ Map<NodeInfo, IConnection> next = new HashMap<>(prev);
+ removeConnections.forEach(next::remove);
+ return next;
+ });
+
+ }
+
+ public void refreshStormActive() {
+ refreshStormActive(() -> refreshActiveTimer.schedule(0, this::refreshStormActive));
+ }
+
+ public void refreshStormActive(Runnable callback) {
+ StormBase base = stormClusterState.stormBase(topologyId, callback);
+ isTopologyActive.set(
+ (null != base) &&
+ (base.get_status() == TopologyStatus.ACTIVE) &&
+ (isWorkerActive.get()));
+ if (null != base) {
+ Map<String, DebugOptions> debugOptionsMap = new HashMap<>(base.get_component_debug());
+ for (DebugOptions debugOptions : debugOptionsMap.values()) {
+ if (!debugOptions.is_set_samplingpct()) {
+ debugOptions.set_samplingpct(10);
+ }
+ if (!debugOptions.is_set_enable()) {
+ debugOptions.set_enable(false);
+ }
+ }
+ stormComponentToDebug.set(debugOptionsMap);
+ LOG.debug("Events debug options {}", stormComponentToDebug.get());
+ }
+ }
+
+ public void refreshThrottle() {
+ boolean backpressure = stormClusterState.topologyBackpressure(topologyId, this::refreshThrottle);
+ this.throttleOn.set(backpressure);
+ }
+
+ public void refreshLoad() {
+ Set<Integer> remoteTasks = Sets.difference(new HashSet<Integer>(outboundTasks), new HashSet<>(taskIds));
+ Long now = System.currentTimeMillis();
+ Map<Integer, Double> localLoad = shortExecutorReceiveQueueMap.entrySet().stream().collect(Collectors.toMap(
+ (Function<Map.Entry<Integer, DisruptorQueue>, Integer>) Map.Entry::getKey,
+ (Function<Map.Entry<Integer, DisruptorQueue>, Double>) entry -> {
+ DisruptorQueue.QueueMetrics qMetrics = entry.getValue().getMetrics();
+ return ( (double) qMetrics.population()) / qMetrics.capacity();
+ }));
+
+ Map<Integer, Load> remoteLoad = new HashMap<>();
+ cachedNodeToPortSocket.get().values().stream().forEach(conn -> remoteLoad.putAll(conn.getLoad(remoteTasks)));
+ loadMapping.setLocal(localLoad);
+ loadMapping.setRemote(remoteLoad);
+
+ if (now > nextUpdate.get()) {
+ receiver.sendLoadMetrics(localLoad);
+ nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
+ }
+ }
+
+ /**
+ * we will wait all connections to be ready and then activate the spout/bolt
+ * when the worker bootup
+ */
+ public void activateWorkerWhenAllConnectionsReady() {
+ int delaySecs = 0;
+ int recurSecs = 1;
+ refreshActiveTimer.schedule(delaySecs, new Runnable() {
+ @Override public void run() {
+ if (areAllConnectionsReady()) {
+ LOG.info("All connections are ready for worker {}:{} with id", assignmentId, port, workerId);
+ isWorkerActive.set(Boolean.TRUE);
+ } else {
+ refreshActiveTimer.schedule(recurSecs, () -> activateWorkerWhenAllConnectionsReady(), false, 0);
+ }
+ }
+ });
+ }
+
+ public void registerCallbacks() {
+ LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
+ receiver.registerRecv(new DeserializingConnectionCallback(topologyConf,
+ getWorkerTopologyContext(),
+ this::transferLocal));
+ }
+
+ public void transferLocal(List<AddressedTuple> tupleBatch) {
+ Map<Integer, List<AddressedTuple>> grouped = new HashMap<>();
+ for (AddressedTuple tuple : tupleBatch) {
+ Integer executor = taskToShortExecutor.get(tuple.dest);
+ if (null == executor) {
+ LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
+ continue;
+ }
+ List<AddressedTuple> current = grouped.get(executor);
+ if (null == current) {
+ current = new ArrayList<>();
+ grouped.put(executor, current);
+ }
+ current.add(tuple);
+ }
+
+ for (Map.Entry<Integer, List<AddressedTuple>> entry : grouped.entrySet()) {
+ DisruptorQueue queue = shortExecutorReceiveQueueMap.get(entry.getKey());
+ if (null != queue) {
+ queue.publish(entry.getValue());
+ } else {
+ LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
+ }
+ }
+ }
+
+ public void transfer(KryoTupleSerializer serializer, List<AddressedTuple> tupleBatch) {
+ if (trySerializeLocal) {
+ assertCanSerialize(serializer, tupleBatch);
+ }
+ List<AddressedTuple> local = new ArrayList<>();
+ Map<Integer, List<TaskMessage>> remoteMap = new HashMap<>();
+ for (AddressedTuple addressedTuple : tupleBatch) {
+ int destTask = addressedTuple.getDest();
+ if (taskIds.contains(destTask)) {
+ // Local task
+ local.add(addressedTuple);
+ } else {
+ // Using java objects directly to avoid performance issues in java code
+ if (! remoteMap.containsKey(destTask)) {
+ remoteMap.put(destTask, new ArrayList<>());
+ }
+ remoteMap.get(destTask).add(new TaskMessage(destTask, serializer.serialize(addressedTuple.getTuple())));
+ }
+ }
+
+ if (!local.isEmpty()) {
+ transferLocal(local);
+ }
+ if (!remoteMap.isEmpty()) {
+ transferQueue.publish(remoteMap);
+ }
+ }
+
+ // TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
+ public void sendTuplesToRemoteWorker(HashMap<Integer, ArrayList<TaskMessage>> packets, long seqId, boolean batchEnd) {
+ drainer.add(packets);
+ if (batchEnd) {
+ ReentrantReadWriteLock.ReadLock readLock = endpointSocketLock.readLock();
+ try {
+ readLock.lock();
+ drainer.send(cachedTaskToNodePort.get(), cachedNodeToPortSocket.get());
+ } finally {
+ readLock.unlock();
+ }
+ drainer.clear();
+ }
+ }
+
+
+ private void assertCanSerialize(KryoTupleSerializer serializer, List<AddressedTuple> tuples) {
+ // Check that all of the tuples can be serialized by serializing them
+ for (AddressedTuple addressedTuple : tuples) {
+ serializer.serialize(addressedTuple.getTuple());
+ }
+ }
+
+ public WorkerTopologyContext getWorkerTopologyContext() {
+ try {
+ String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, topologyId));
+ String pidDir = ConfigUtils.workerPidsRoot(conf, topologyId);
+ return new WorkerTopologyContext(systemTopology, topologyConf, taskToComponent, componentToSortedTasks,
+ componentToStreamToFields, topologyId, codeDir, pidDir, port, taskIds,
+ defaultSharedResources,
+ userSharedResources);
+ } catch (IOException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ public void runWorkerStartHooks() {
+ WorkerTopologyContext workerContext = getWorkerTopologyContext();
+ for (ByteBuffer hook : topology.get_worker_hooks()) {
+ byte[] hookBytes = Utils.toByteArray(hook);
+ BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, BaseWorkerHook.class);
+ hookObject.start(topologyConf, workerContext);
+
+ }
+ }
+
+ public void runWorkerShutdownHooks() {
+ for (ByteBuffer hook : topology.get_worker_hooks()) {
+ byte[] hookBytes = Utils.toByteArray(hook);
+ BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, BaseWorkerHook.class);
+ hookObject.shutdown();
+
+ }
+ }
+
+ public void closeResources() {
+ LOG.info("Shutting down default resources");
+ ((ExecutorService) defaultSharedResources.get(WorkerTopologyContext.SHARED_EXECUTOR)).shutdownNow();
+ LOG.info("Shut down default resources");
+ }
+
+ public boolean areAllConnectionsReady() {
+ return cachedNodeToPortSocket.get().values()
+ .stream()
+ .map(WorkerState::isConnectionReady)
+ .reduce((left, right) -> left && right)
+ .orElse(true);
+ }
+
+ public static boolean isConnectionReady(IConnection connection) {
+ return !(connection instanceof ConnectionWithStatus)
+ || ((ConnectionWithStatus) connection).status() == ConnectionWithStatus.Status.Ready;
+ }
+
+ private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
+ int port) {
+ LOG.info("Reading assignments");
+ List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
+ executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
+ Map<List<Long>, NodeInfo> executorToNodePort =
+ stormClusterState.assignmentInfo(topologyId, null).get_executor_node_port();
+ for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
+ NodeInfo nodeInfo = entry.getValue();
+ if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
+ executorsAssignedToThisWorker.add(entry.getKey());
+ }
+ }
+ return executorsAssignedToThisWorker;
+ }
+
+ private Map<List<Long>, DisruptorQueue> mkReceiveQueueMap(Map topologyConf, Set<List<Long>> executors) {
+ Map<List<Long>, DisruptorQueue> receiveQueueMap = new HashMap<>();
+ for (List<Long> executor : executors) {
+ receiveQueueMap.put(executor, new DisruptorQueue("receive-queue",
+ ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE)),
+ (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS),
+ ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE)),
+ (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS)));
+ }
+ return receiveQueueMap;
+ }
+
+ private Map<String, Object> makeDefaultResources() {
+ int threadPoolSize = ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE));
+ return ImmutableMap.of(WorkerTopologyContext.SHARED_EXECUTOR, Executors.newFixedThreadPool(threadPoolSize));
+ }
+
+ private Map<String, Object> makeUserResources() {
+ /* TODO: need to invoke a hook provided by the topology, giving it a chance to create user resources.
+ * this would be part of the initialization hook
+ * need to separate workertopologycontext into WorkerContext and WorkerUserContext.
+ * actually just do it via interfaces. just need to make sure to hide setResource from tasks
+ */
+ return new HashMap<>();
+ }
+
+ private StormTimer mkHaltingTimer(String name) {
+ return new StormTimer(name, (thread, exception) -> {
+ LOG.error("Error when processing event", exception);
+ Utils.exitProcess(20, "Error when processing an event");
+ });
+ }
+
+ /**
+ *
+ * @return seq of task ids that receive messages from this worker
+ */
+ private Set<Integer> workerOutboundTasks() {
+ WorkerTopologyContext context = getWorkerTopologyContext();
+ Set<String> components = new HashSet<>();
+ for (Integer taskId : taskIds) {
+ for (Map<String, Grouping> value : context.getTargets(context.getComponentId(taskId)).values()) {
+ components.addAll(value.keySet());
+ }
+ }
+
+ Set<Integer> outboundTasks = new HashSet<>();
+
+ for (Map.Entry<String, List<Integer>> entry : Utils.reverseMap(taskToComponent).entrySet()) {
+ if (components.contains(entry.getKey())) {
+ outboundTasks.addAll(entry.getValue());
+ }
+ }
+ return outboundTasks;
+ }
+
+ public interface ILocalTransferCallback {
+ void transfer(List<AddressedTuple> tupleBatch);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/dependency/DependencyBlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/dependency/DependencyBlobStoreUtils.java b/storm-client/src/jvm/org/apache/storm/dependency/DependencyBlobStoreUtils.java
new file mode 100644
index 0000000..9ec680c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/dependency/DependencyBlobStoreUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.dependency;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.UUID;
+
+public class DependencyBlobStoreUtils {
+
+ private static final String BLOB_DEPENDENCIES_PREFIX = "dep-";
+
+ public static String generateDependencyBlobKey(String key) {
+ return BLOB_DEPENDENCIES_PREFIX + key;
+ }
+
+ public static String applyUUIDToFileName(String fileName) {
+ String fileNameWithExt = com.google.common.io.Files.getNameWithoutExtension(fileName);
+ String ext = com.google.common.io.Files.getFileExtension(fileName);
+ if (StringUtils.isEmpty(ext)) {
+ fileName = fileName + "-" + UUID.randomUUID();
+ } else {
+ fileName = fileNameWithExt + "-" + UUID.randomUUID() + "." + ext;
+ }
+ return fileName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java b/storm-client/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java
new file mode 100644
index 0000000..d360ae0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.dependency;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DependencyPropertiesParser {
+ public List<File> parseJarsProperties(String prop) {
+ if (prop.trim().isEmpty()) {
+ // handle no input
+ return Collections.emptyList();
+ }
+
+ List<String> dependencies = Arrays.asList(prop.split(","));
+ return Lists.transform(dependencies, new Function<String, File>() {
+ @Override
+ public File apply(String filePath) {
+ return new File(filePath);
+ }
+ });
+ }
+
+ public Map<String, File> parseArtifactsProperties(String prop) {
+ try {
+ Map<String, String> parsed = (Map<String, String>) JSONValue.parseWithException(prop);
+ Map<String, File> packages = new LinkedHashMap<>(parsed.size());
+ for (Map.Entry<String, String> artifactToFilePath : parsed.entrySet()) {
+ packages.put(artifactToFilePath.getKey(), new File(artifactToFilePath.getValue()));
+ }
+
+ return packages;
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
new file mode 100644
index 0000000..6fac380
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.dependency;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class DependencyUploader {
+ public static final Logger LOG = LoggerFactory.getLogger(DependencyUploader.class);
+
+ private final Map<String, Object> conf;
+ private ClientBlobStore blobStore;
+
+ public DependencyUploader() {
+ conf = Utils.readStormConfig();
+ }
+
+ public void init() {
+ //NOOP
+ }
+
+ public void shutdown() {
+ if (blobStore != null) {
+ blobStore.shutdown();
+ }
+ }
+
+ @VisibleForTesting
+ void setBlobStore(ClientBlobStore blobStore) {
+ this.blobStore = blobStore;
+ }
+
+ private synchronized ClientBlobStore getBlobStore() {
+ if (blobStore == null) {
+ blobStore = Utils.getClientBlobStore(conf);
+ }
+ return blobStore;
+ }
+
+ public List<String> uploadFiles(List<File> dependencies, boolean cleanupIfFails) throws IOException, AuthorizationException {
+ checkFilesExist(dependencies);
+
+ List<String> keys = new ArrayList<>(dependencies.size());
+ try {
+ for (File dependency : dependencies) {
+ String fileName = dependency.getName();
+ String key = DependencyBlobStoreUtils.generateDependencyBlobKey(DependencyBlobStoreUtils.applyUUIDToFileName(fileName));
+
+ try {
+ uploadDependencyToBlobStore(key, dependency);
+ } catch (KeyAlreadyExistsException e) {
+ // it should never happened since we apply UUID
+ throw new RuntimeException(e);
+ }
+
+ keys.add(key);
+ }
+ } catch (Throwable e) {
+ if (getBlobStore() != null && cleanupIfFails) {
+ deleteBlobs(keys);
+ }
+ throw new RuntimeException(e);
+ }
+
+ return keys;
+ }
+
+ public List<String> uploadArtifacts(Map<String, File> artifacts) {
+ checkFilesExist(artifacts.values());
+
+ List<String> keys = new ArrayList<>(artifacts.size());
+ try {
+ for (Map.Entry<String, File> artifactToFile : artifacts.entrySet()) {
+ String artifact = artifactToFile.getKey();
+ File dependency = artifactToFile.getValue();
+
+ String key = DependencyBlobStoreUtils.generateDependencyBlobKey(convertArtifactToJarFileName(artifact));
+ try {
+ uploadDependencyToBlobStore(key, dependency);
+ } catch (KeyAlreadyExistsException e) {
+ // we lose the race, but it doesn't matter
+ }
+
+ keys.add(key);
+ }
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+
+ return keys;
+ }
+
+ public void deleteBlobs(List<String> keys) {
+ for (String key : keys) {
+ try {
+ getBlobStore().deleteBlob(key);
+ } catch (Throwable e) {
+ LOG.warn("blob delete failed - key: {} continue...", key);
+ }
+ }
+ }
+
+ private String convertArtifactToJarFileName(String artifact) {
+ return artifact.replace(":", "-") + ".jar";
+ }
+
+ private boolean uploadDependencyToBlobStore(String key, File dependency)
+ throws KeyAlreadyExistsException, AuthorizationException, IOException {
+
+ boolean uploadNew = false;
+ try {
+ // FIXME: we can filter by listKeys() with local blobstore when STORM-1986 is going to be resolved
+ // as a workaround, we call getBlobMeta() for all keys
+ getBlobStore().getBlobMeta(key);
+ } catch (KeyNotFoundException e) {
+ // TODO: do we want to add ACL here?
+ AtomicOutputStream blob = getBlobStore()
+ .createBlob(key, new SettableBlobMeta(new ArrayList<AccessControl>()));
+ Files.copy(dependency.toPath(), blob);
+ blob.close();
+
+ uploadNew = true;
+ }
+
+ return uploadNew;
+ }
+
+ private void checkFilesExist(Collection<File> dependencies) {
+ for (File dependency : dependencies) {
+ if (!dependency.isFile() || !dependency.exists()) {
+ throw new FileNotAvailableException(dependency.getAbsolutePath());
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java b/storm-client/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java
new file mode 100644
index 0000000..442e470
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.dependency;
+
+public class FileNotAvailableException extends RuntimeException {
+ public FileNotAvailableException(String fileName) {
+ super(createMessage(fileName));
+ }
+
+ public FileNotAvailableException(String fileName, Throwable cause) {
+ super(createMessage(fileName), cause);
+ }
+
+ private static String createMessage(String fileName) {
+ return "This file is not available: " + fileName;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
new file mode 100644
index 0000000..956999e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.generated.DistributedRPCInvocations;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.security.auth.ThriftClient;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
+ public static final Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
+ private final AtomicReference<DistributedRPCInvocations.Client> client = new AtomicReference<>();
+ private String host;
+ private int port;
+
+ public DRPCInvocationsClient(Map conf, String host, int port) throws TTransportException {
+ super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null);
+ this.host = host;
+ this.port = port;
+ client.set(new DistributedRPCInvocations.Client(_protocol));
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void reconnectClient() throws TException {
+ if (client.get() == null) {
+ reconnect();
+ client.set(new DistributedRPCInvocations.Client(_protocol));
+ }
+ }
+
+ public boolean isConnected() {
+ return client.get() != null;
+ }
+
+ public void result(String id, String result) throws TException, AuthorizationException {
+ DistributedRPCInvocations.Client c = client.get();
+ try {
+ if (c == null) {
+ throw new TException("Client is not connected...");
+ }
+ c.result(id, result);
+ } catch(AuthorizationException aze) {
+ throw aze;
+ } catch(TException e) {
+ client.compareAndSet(c, null);
+ throw e;
+ }
+ }
+
+ public DRPCRequest fetchRequest(String func) throws TException, AuthorizationException {
+ DistributedRPCInvocations.Client c = client.get();
+ try {
+ if (c == null) {
+ throw new TException("Client is not connected...");
+ }
+ return c.fetchRequest(func);
+ } catch(AuthorizationException aze) {
+ throw aze;
+ } catch(TException e) {
+ client.compareAndSet(c, null);
+ throw e;
+ }
+ }
+
+ public void failRequest(String id) throws TException, AuthorizationException {
+ DistributedRPCInvocations.Client c = client.get();
+ try {
+ if (c == null) {
+ throw new TException("Client is not connected...");
+ }
+ c.failRequest(id);
+ } catch(AuthorizationException aze) {
+ throw aze;
+ } catch(TException e) {
+ client.compareAndSet(c, null);
+ throw e;
+ }
+ }
+
+ public DistributedRPCInvocations.Client getClient() {
+ return client.get();
+ }
+
+ @Override
+ public void failRequestV2(String id, DRPCExecutionException ex) throws AuthorizationException, TException {
+ DistributedRPCInvocations.Client c = client.get();
+ try {
+ if (c == null) {
+ throw new TException("Client is not connected...");
+ }
+ c.failRequestV2(id, ex);
+ } catch(AuthorizationException aze) {
+ throw aze;
+ } catch(TException e) {
+ client.compareAndSet(c, null);
+ throw e;
+ }
+ }
+}