You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:29 UTC

[40/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java
new file mode 100644
index 0000000..00fccf9
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.storm.utils.Time;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.LogLevel;
+import org.apache.storm.generated.LogLevelAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class LogConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LogConfigManager.class);
+
+    private final AtomicReference<TreeMap<String, LogLevel>> latestLogConfig;
+    private final Map<String, Level> originalLogLevels;
+
+    public LogConfigManager() {
+        this(new AtomicReference<>(new TreeMap<>()));
+    }
+
+    public LogConfigManager(AtomicReference<TreeMap<String, LogLevel>> latestLogConfig) {
+        this.latestLogConfig = latestLogConfig;
+        this.originalLogLevels = getLoggerLevels();
+        LOG.info("Started with log levels: {}", originalLogLevels);
+    }
+
+    public void processLogConfigChange(LogConfig logConfig) {
+        if (null != logConfig) {
+            LOG.debug("Processing received log config: {}", logConfig);
+            TreeMap<String, LogLevel> loggers = new TreeMap<>(logConfig.get_named_logger_level());
+            LoggerContext logContext = (LoggerContext) LogManager.getContext(false);
+            Map<String, LogLevel> newLogConfigs = new HashMap<>();
+            for (Map.Entry<String, LogLevel> entry : loggers.entrySet()) {
+                String msgLoggerName = entry.getKey();
+                msgLoggerName = ("ROOT".equalsIgnoreCase(msgLoggerName)) ? LogManager.ROOT_LOGGER_NAME : msgLoggerName;
+                LogLevel loggerLevel = entry.getValue();
+                // the new-timeouts map now contains logger => timeout
+                if (loggerLevel.is_set_reset_log_level_timeout_epoch()) {
+                    LogLevel copy = new LogLevel(loggerLevel);
+                    if (originalLogLevels.containsKey(msgLoggerName)) {
+                        copy.set_reset_log_level(originalLogLevels.get(msgLoggerName).name());
+                    } else {
+                        copy.set_reset_log_level(Level.INFO.name());
+                    }
+
+                    newLogConfigs.put(msgLoggerName, copy);
+                }
+
+            }
+
+            // Look for deleted log timeouts
+            TreeMap<String,LogLevel> latestConf = latestLogConfig.get();
+            if (latestConf != null) {
+                for (String loggerName : latestConf.descendingKeySet()) {
+                    if (! newLogConfigs.containsKey(loggerName)) {
+                        // if we had a timeout, but the timeout is no longer active
+                        setLoggerLevel(logContext, loggerName, latestConf.get(loggerName).get_reset_log_level());
+
+                    }
+                }
+            }
+
+            // apply new log settings we just received
+            // the merged configs are only for the reset logic
+            for (String loggerName : new TreeSet<>(logConfig.get_named_logger_level().keySet())) {
+                LogLevel logLevel = logConfig.get_named_logger_level().get(loggerName);
+                loggerName = ("ROOT".equalsIgnoreCase(loggerName)) ? LogManager.ROOT_LOGGER_NAME : loggerName;
+                LogLevelAction action = logLevel.get_action();
+                if (action == LogLevelAction.UPDATE) {
+                    setLoggerLevel(logContext, loggerName, logLevel.get_target_log_level());
+                }
+
+            }
+
+            logContext.updateLoggers();
+            latestLogConfig.set(new TreeMap<>(newLogConfigs));
+            LOG.debug("New merged log config is {}", latestLogConfig.get());
+        }
+    }
+
+    // function called on timer to reset log levels last set to DEBUG
+    // also called from processLogConfigChange
+    public void resetLogLevels() {
+        TreeMap<String, LogLevel> latestLogLevelMap = latestLogConfig.get();
+        LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false);
+
+        for (String loggerName : latestLogLevelMap.descendingKeySet()) {
+            LogLevel loggerSetting = latestLogLevelMap.get(loggerName);
+            long timeout = loggerSetting.get_reset_log_level_timeout_epoch();
+            String resetLogLevel = loggerSetting.get_reset_log_level();
+            if (timeout < Time.currentTimeMillis()) {
+                LOG.info("{}: Resetting level to {}", loggerName, resetLogLevel);
+                setLoggerLevel(loggerContext, loggerName, resetLogLevel);
+            }
+            latestLogConfig.getAndUpdate(input -> {
+                TreeMap<String, LogLevel> result = new TreeMap<>(input);
+                result.remove(loggerName);
+                return result;
+            });
+        }
+        loggerContext.updateLoggers();
+    }
+
+    public Map<String, Level> getLoggerLevels() {
+        Configuration loggerConfig = ((LoggerContext) LogManager.getContext(false)).getConfiguration();
+        Map<String, Level> logLevelMap = new HashMap<>();
+        for (Map.Entry<String, LoggerConfig> entry : loggerConfig.getLoggers().entrySet()) {
+            logLevelMap.put(entry.getKey(), entry.getValue().getLevel());
+        }
+        return logLevelMap;
+    }
+
+    public void setLoggerLevel(LoggerContext logContext, String loggerName, String newLevelStr) {
+        Level newLevel = Level.getLevel(newLevelStr);
+        Configuration configuration = logContext.getConfiguration();
+        LoggerConfig loggerConfig = configuration.getLoggerConfig(loggerName);
+        if (loggerConfig.getName().equalsIgnoreCase(loggerName)) {
+            LOG.info("Setting {} log level to: {}", loggerConfig, newLevel);
+            loggerConfig.setLevel(newLevel);
+        } else {
+            // create a new config. Make it additive (true) s.t. inherit parents appenders
+            LoggerConfig newLoggerConfig = new LoggerConfig(loggerName, newLevel, true);
+            LOG.info("Adding config for: {} with level: {}", newLoggerConfig, newLevel);
+            configuration.addLogger(loggerName, newLoggerConfig);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
new file mode 100644
index 0000000..74f5c5f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.security.auth.Subject;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.apache.storm.Config;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.ExecutorShutdown;
+import org.apache.storm.executor.IRunningExecutor;
+import org.apache.storm.executor.LocalExecutor;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.WorkerBackpressureCallback;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.lmax.disruptor.EventHandler;
+
+import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
+
+public class Worker implements Shutdownable, DaemonCommon {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
+    private final Map conf;
+    private final IContext context;
+    private final String topologyId;
+    private final String assignmentId;
+    private final int port;
+    private final String workerId;
+    private final LogConfigManager logConfigManager;
+
+
+    private WorkerState workerState;
+    private AtomicReference<List<IRunningExecutor>> executorsAtom;
+    private Thread transferThread;
+    private WorkerBackpressureThread backpressureThread;
+
+    private AtomicReference<Credentials> credentialsAtom;
+    private Subject subject;
+    private Collection<IAutoCredentials> autoCreds;
+
+
+    /**
+     * TODO: should worker even take the topologyId as input? this should be
+     * deducable from cluster state (by searching through assignments)
+     * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency
+     *
+     * @param conf         - Storm configuration
+     * @param context      -
+     * @param topologyId   - topology id
+     * @param assignmentId - assignement id
+     * @param port         - port on which the worker runs
+     * @param workerId     - worker id
+     */
+
+    public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
+        this.conf = conf;
+        this.context = context;
+        this.topologyId = topologyId;
+        this.assignmentId = assignmentId;
+        this.port = port;
+        this.workerId = workerId;
+        this.logConfigManager = new LogConfigManager();
+    }
+
+    public void start() throws Exception {
+        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
+            conf);
+        // because in local mode, its not a separate
+        // process. supervisor will register it in this case
+        // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
+        if (!ConfigUtils.isLocalMode(conf)) {
+            // Distributed mode
+            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
+            String pid = Utils.processPid();
+            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
+            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
+                Charset.forName("UTF-8"));
+        }
+        final Map topologyConf =
+            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+        List<ACL> acls = Utils.getWorkerACL(topologyConf);
+        IStateStorage stateStorage =
+            ClusterUtils.mkStateStorage(conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER));
+        IStormClusterState stormClusterState =
+            ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext());
+        Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
+        Map<String, String> initCreds = new HashMap<>();
+        if (initialCredentials != null) {
+            initCreds.putAll(initialCredentials.get_creds());
+        }
+        autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
+        subject = AuthUtils.populateSubject(null, autoCreds, initCreds);
+
+        Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
+            @Override public Object run() throws Exception {
+                workerState =
+                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
+                        stormClusterState);
+
+                // Heartbeat here so that worker process dies if this fails
+                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
+                // that worker is running and moves on
+                doHeartBeat();
+
+                executorsAtom = new AtomicReference<>(null);
+
+                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
+                // to the supervisor
+                workerState.heartbeatTimer
+                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+                        try {
+                            doHeartBeat();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+
+                workerState.executorHeartbeatTimer
+                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+                        Worker.this::doExecutorHeartbeats);
+
+                workerState.registerCallbacks();
+
+                workerState.refreshConnections(null);
+
+                workerState.activateWorkerWhenAllConnectionsReady();
+
+                workerState.refreshStormActive(null);
+
+                workerState.runWorkerStartHooks();
+
+                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
+                for (List<Long> e : workerState.getExecutors()) {
+                    if (ConfigUtils.isLocalMode(topologyConf)) {
+                        newExecutors.add(
+                            LocalExecutor.mkExecutor(workerState, e, initCreds)
+                                .execute());
+                    } else {
+                        newExecutors.add(
+                            Executor.mkExecutor(workerState, e, initCreds)
+                                .execute());
+                    }
+                }
+                executorsAtom.set(newExecutors);
+
+                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
+                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
+
+                // This thread will publish the messages destined for remote tasks to remote connections
+                transferThread = Utils.asyncLoop(() -> {
+                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
+                    return 0L;
+                });
+
+                DisruptorBackpressureCallback disruptorBackpressureHandler =
+                    mkDisruptorBackpressureHandler(workerState);
+                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
+                workerState.transferQueue
+                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
+                workerState.transferQueue
+                    .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
+                workerState.transferQueue
+                    .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
+
+                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
+                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
+                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
+                    backpressureThread.start();
+                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
+                    
+                    int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
+                    workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
+                }
+
+                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
+
+                establishLogSettingCallback();
+
+                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
+
+                workerState.refreshCredentialsTimer.scheduleRecurring(0,
+                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
+                        @Override public void run() {
+                            checkCredentialsChanged();
+                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
+                               checkThrottleChanged();
+                            }
+                        }
+                    });
+              
+                // The jitter allows the clients to get the data at different times, and avoids thundering herd
+                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
+                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
+                }
+
+                workerState.refreshConnectionsTimer.scheduleRecurring(0,
+                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
+
+                workerState.resetLogLevelsTimer.scheduleRecurring(0,
+                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
+
+                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
+                    workerState::refreshStormActive);
+
+                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
+                return this;
+            };
+        });
+
+    }
+
+    public void doHeartBeat() throws IOException {
+        LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
+        state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
+            workerState.executors.stream()
+                .map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue()))
+                .collect(Collectors.toList()), workerState.port));
+        state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
+        // it shouldn't take supervisor 120 seconds between listing dir and reading it
+    }
+
+    public void doExecutorHeartbeats() {
+        Map<List<Integer>, ExecutorStats> stats;
+        List<IRunningExecutor> executors = this.executorsAtom.get();
+        if (null == executors) {
+            stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.executors);
+        } else {
+            stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
+                .toMap((Function<IRunningExecutor, List<Long>>) IRunningExecutor::getExecutorId,
+                    (Function<IRunningExecutor, ExecutorStats>) IRunningExecutor::renderStats)));
+        }
+        Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
+        try {
+            workerState.stormClusterState
+                .workerHeartbeat(workerState.topologyId, workerState.assignmentId, (long) workerState.port,
+                    StatsUtil.thriftifyZkWorkerHb(zkHB));
+        } catch (Exception ex) {
+            LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", ex);
+        }
+    }
+
+    public void checkCredentialsChanged() {
+        Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null);
+        if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) {
+            // This does not have to be atomic, worst case we update when one is not needed
+            AuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds());
+            for (IRunningExecutor executor : executorsAtom.get()) {
+                executor.credentialsChanged(newCreds);
+            }
+            credentialsAtom.set(newCreds);
+        }
+    }
+
+    public void checkThrottleChanged() {
+        boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged);
+        workerState.throttleOn.set(throttleOn);
+    }
+
+    public void checkLogConfigChanged() {
+        LogConfig logConfig = workerState.stormClusterState.topologyLogConfig(topologyId, null);
+        logConfigManager.processLogConfigChange(logConfig);
+        establishLogSettingCallback();
+    }
+
+    public void establishLogSettingCallback() {
+        workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged);
+    }
+
+
+    /**
+     * make a handler for the worker's send disruptor queue to
+     * check highWaterMark and lowWaterMark for backpressure
+     */
+    private DisruptorBackpressureCallback mkDisruptorBackpressureHandler(WorkerState workerState) {
+        return new DisruptorBackpressureCallback() {
+            @Override public void highWaterMark() throws Exception {
+                LOG.debug("worker {} transfer-queue is congested, checking backpressure state", workerState.workerId);
+                WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
+            }
+
+            @Override public void lowWaterMark() throws Exception {
+                LOG.debug("worker {} transfer-queue is not congested, checking backpressure state", workerState.workerId);
+                WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
+            }
+        };
+    }
+
+    /**
+     * make a handler that checks and updates worker's backpressure flag
+     */
+    private WorkerBackpressureCallback mkBackpressureHandler() {
+        final List<IRunningExecutor> executors = executorsAtom.get();
+        return new WorkerBackpressureCallback() {
+            @Override public void onEvent(Object obj) {
+                String topologyId = workerState.topologyId;
+                String assignmentId = workerState.assignmentId;
+                int port = workerState.port;
+                IStormClusterState stormClusterState = workerState.stormClusterState;
+                boolean prevBackpressureFlag = workerState.backpressure.get();
+                boolean currBackpressureFlag = prevBackpressureFlag;
+                if (null != executors) {
+                    currBackpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
+                        .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
+                }
+
+                if (currBackpressureFlag != prevBackpressureFlag) {
+                    try {
+                        LOG.debug("worker backpressure flag changing from {} to {}", prevBackpressureFlag, currBackpressureFlag);
+                        stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureFlag);
+                        // doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
+                        workerState.backpressure.set(currBackpressureFlag);
+                    } catch (Exception ex) {
+                        LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", ex);
+                    }
+                }
+            }
+        };
+    }
+
+    @Override public void shutdown() {
+        try {
+            LOG.info("Shutting down worker {} {} {}", topologyId, assignmentId, port);
+
+            for (IConnection socket : workerState.cachedNodeToPortSocket.get().values()) {
+                //this will do best effort flushing since the linger period
+                // was set on creation
+                socket.close();
+            }
+            LOG.info("Terminating messaging context");
+            LOG.info("Shutting down executors");
+            for (IRunningExecutor executor : executorsAtom.get()) {
+                ((ExecutorShutdown) executor).shutdown();
+            }
+            LOG.info("Shut down executors");
+
+            // this is fine because the only time this is shared is when it's a local context,
+            // in which case it's a noop
+            workerState.mqContext.term();
+            LOG.info("Shutting down transfer thread");
+            workerState.transferQueue.haltWithInterrupt();
+
+            transferThread.interrupt();
+            transferThread.join();
+            LOG.info("Shut down transfer thread");
+
+            backpressureThread.terminate();
+            LOG.info("Shut down backpressure thread");
+
+            workerState.heartbeatTimer.close();
+            workerState.refreshConnectionsTimer.close();
+            workerState.refreshCredentialsTimer.close();
+            workerState.refreshBackpressureTimer.close();
+            workerState.refreshActiveTimer.close();
+            workerState.executorHeartbeatTimer.close();
+            workerState.userTimer.close();
+            workerState.refreshLoadTimer.close();
+            workerState.resetLogLevelsTimer.close();
+            workerState.closeResources();
+
+            LOG.info("Trigger any worker shutdown hooks");
+            workerState.runWorkerShutdownHooks();
+
+            workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
+            workerState.stormClusterState.removeWorkerBackpressure(topologyId, assignmentId, (long) port);
+            LOG.info("Disconnecting from storm cluster state context");
+            workerState.stormClusterState.disconnect();
+            workerState.stateStorage.close();
+            LOG.info("Shut down worker {} {} {}", topologyId, assignmentId, port);
+        } catch (Exception ex) {
+            throw Utils.wrapInRuntime(ex);
+        }
+
+    }
+
+    @Override public boolean isWaiting() {
+        return workerState.heartbeatTimer.isTimerWaiting()
+            && workerState.refreshConnectionsTimer.isTimerWaiting()
+            && workerState.refreshLoadTimer.isTimerWaiting()
+            && workerState.refreshCredentialsTimer.isTimerWaiting()
+            && workerState.refreshBackpressureTimer.isTimerWaiting()
+            && workerState.refreshActiveTimer.isTimerWaiting()
+            && workerState.executorHeartbeatTimer.isTimerWaiting()
+            && workerState.userTimer.isTimerWaiting();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Preconditions.checkArgument(args.length == 4, "Illegal number of arguemtns. Expected: 4, Actual: " + args.length);
+        String stormId = args[0];
+        String assignmentId = args[1];
+        String portStr = args[2];
+        String workerId = args[3];
+        Map conf = Utils.readStormConfig();
+        Utils.setupDefaultUncaughtExceptionHandler();
+        StormCommon.validateDistributedMode(conf);
+        Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(portStr), workerId);
+        worker.start();
+        Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
new file mode 100644
index 0000000..3913c32
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.StormBase;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.generated.TopologyStatus;
+import org.apache.storm.grouping.Load;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.hooks.BaseWorkerHook;
+import org.apache.storm.messaging.ConnectionWithStatus;
+import org.apache.storm.messaging.DeserializingConnectionCallback;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.messaging.TransportFactory;
+import org.apache.storm.serialization.KryoTupleSerializer;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ThriftTopologyUtils;
+import org.apache.storm.utils.TransferDrainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class WorkerState {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WorkerState.class);
+
+    final Map conf;
+    final IContext mqContext;
+
+    public Map getConf() {
+        return conf;
+    }
+
+    public IConnection getReceiver() {
+        return receiver;
+    }
+
+    public String getTopologyId() {
+        return topologyId;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public String getWorkerId() {
+        return workerId;
+    }
+
+    public IStateStorage getStateStorage() {
+        return stateStorage;
+    }
+
+    public AtomicBoolean getIsTopologyActive() {
+        return isTopologyActive;
+    }
+
+    public AtomicReference<Map<String, DebugOptions>> getStormComponentToDebug() {
+        return stormComponentToDebug;
+    }
+
+    public Set<List<Long>> getExecutors() {
+        return executors;
+    }
+
+    public List<Integer> getTaskIds() {
+        return taskIds;
+    }
+
+    public Map getTopologyConf() {
+        return topologyConf;
+    }
+
+    public StormTopology getTopology() {
+        return topology;
+    }
+
+    public StormTopology getSystemTopology() {
+        return systemTopology;
+    }
+
+    public Map<Integer, String> getTaskToComponent() {
+        return taskToComponent;
+    }
+
+    public Map<String, Map<String, Fields>> getComponentToStreamToFields() {
+        return componentToStreamToFields;
+    }
+
+    public Map<String, List<Integer>> getComponentToSortedTasks() {
+        return componentToSortedTasks;
+    }
+
+    public AtomicReference<Map<NodeInfo, IConnection>> getCachedNodeToPortSocket() {
+        return cachedNodeToPortSocket;
+    }
+
+    public Map<List<Long>, DisruptorQueue> getExecutorReceiveQueueMap() {
+        return executorReceiveQueueMap;
+    }
+
+    public Runnable getSuicideCallback() {
+        return suicideCallback;
+    }
+
+    public Utils.UptimeComputer getUptime() {
+        return uptime;
+    }
+
+    public Map<String, Object> getDefaultSharedResources() {
+        return defaultSharedResources;
+    }
+
+    public Map<String, Object> getUserSharedResources() {
+        return userSharedResources;
+    }
+
+    final IConnection receiver;
+    final String topologyId;
+    final String assignmentId;
+    final int port;
+    final String workerId;
+    final IStateStorage stateStorage;
+    final IStormClusterState stormClusterState;
+
+    // when worker bootup, worker will start to setup initial connections to
+    // other workers. When all connection is ready, we will enable this flag
+    // and spout and bolt will be activated.
+    // used in worker only, keep it as atomic
+    final AtomicBoolean isWorkerActive;
+    final AtomicBoolean isTopologyActive;
+    final AtomicReference<Map<String, DebugOptions>> stormComponentToDebug;
+
+    // executors and taskIds running in this worker
+    final Set<List<Long>> executors;
+    final List<Integer> taskIds;
+    final Map topologyConf;
+    final StormTopology topology;
+    final StormTopology systemTopology;
+    final Map<Integer, String> taskToComponent;
+    final Map<String, Map<String, Fields>> componentToStreamToFields;
+    final Map<String, List<Integer>> componentToSortedTasks;
+    final ReentrantReadWriteLock endpointSocketLock;
+    final AtomicReference<Map<Integer, NodeInfo>> cachedTaskToNodePort;
+    final AtomicReference<Map<NodeInfo, IConnection>> cachedNodeToPortSocket;
+    final Map<List<Long>, DisruptorQueue> executorReceiveQueueMap;
+    // executor id is in form [start_task_id end_task_id]
+    // short executor id is start_task_id
+    final Map<Integer, DisruptorQueue> shortExecutorReceiveQueueMap;
+    final Map<Integer, Integer> taskToShortExecutor;
+    final Runnable suicideCallback;
+    final Utils.UptimeComputer uptime;
+    final Map<String, Object> defaultSharedResources;
+    final Map<String, Object> userSharedResources;
+    final LoadMapping loadMapping;
+    final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
+    // Whether this worker is going slow
+    final AtomicBoolean backpressure = new AtomicBoolean(false);
+    // If the transfer queue is backed-up
+    final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
+    // a trigger for synchronization with executors
+    final AtomicBoolean backpressureTrigger = new AtomicBoolean(false);
+    // whether the throttle is activated for spouts
+    final AtomicBoolean throttleOn = new AtomicBoolean(false);
+
+    public LoadMapping getLoadMapping() {
+        return loadMapping;
+    }
+
+    public AtomicReference<Map<String, VersionedData<Assignment>>> getAssignmentVersions() {
+        return assignmentVersions;
+    }
+
+    public AtomicBoolean getBackpressureTrigger() {
+        return backpressureTrigger;
+    }
+
+    public AtomicBoolean getThrottleOn() {
+        return throttleOn;
+    }
+
+    public DisruptorQueue getTransferQueue() {
+        return transferQueue;
+    }
+
+    public StormTimer getUserTimer() {
+        return userTimer;
+    }
+
+    final DisruptorQueue transferQueue;
+
+    // Timers
+    final StormTimer heartbeatTimer = mkHaltingTimer("heartbeat-timer");
+    final StormTimer refreshLoadTimer = mkHaltingTimer("refresh-load-timer");
+    final StormTimer refreshConnectionsTimer = mkHaltingTimer("refresh-connections-timer");
+    final StormTimer refreshCredentialsTimer = mkHaltingTimer("refresh-credentials-timer");
+    final StormTimer resetLogLevelsTimer = mkHaltingTimer("reset-log-levels-timer");
+    final StormTimer refreshActiveTimer = mkHaltingTimer("refresh-active-timer");
+    final StormTimer executorHeartbeatTimer = mkHaltingTimer("executor-heartbeat-timer");
+    final StormTimer refreshBackpressureTimer = mkHaltingTimer("refresh-backpressure-timer");
+    final StormTimer userTimer = mkHaltingTimer("user-timer");
+
+    // global variables only used internally in class
+    private final Set<Integer> outboundTasks;
+    private final AtomicLong nextUpdate = new AtomicLong(0);
+    private final boolean trySerializeLocal;
+    private final TransferDrainer drainer;
+
+    private static final long LOAD_REFRESH_INTERVAL_MS = 5000L;
+
+    public WorkerState(Map conf, IContext mqContext, String topologyId, String assignmentId, int port, String workerId,
+        Map topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState)
+        throws IOException, InvalidTopologyException {
+        this.executors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
+        this.transferQueue = new DisruptorQueue("worker-transfer-queue",
+            ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE)),
+            (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS),
+            ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE)),
+            (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
+
+        this.conf = conf;
+        this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
+        this.receiver = this.mqContext.bind(topologyId, port);
+        this.topologyId = topologyId;
+        this.assignmentId = assignmentId;
+        this.port = port;
+        this.workerId = workerId;
+        this.stateStorage = stateStorage;
+        this.stormClusterState = stormClusterState;
+        this.isWorkerActive = new AtomicBoolean(false);
+        this.isTopologyActive = new AtomicBoolean(false);
+        this.stormComponentToDebug = new AtomicReference<>();
+        this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, executors);
+        this.shortExecutorReceiveQueueMap = new HashMap<>();
+        this.taskIds = new ArrayList<>();
+        for (Map.Entry<List<Long>, DisruptorQueue> entry : executorReceiveQueueMap.entrySet()) {
+            this.shortExecutorReceiveQueueMap.put(entry.getKey().get(0).intValue(), entry.getValue());
+            this.taskIds.addAll(StormCommon.executorIdToTasks(entry.getKey()));
+        }
+        Collections.sort(taskIds);
+        this.topologyConf = topologyConf;
+        this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
+        this.systemTopology = StormCommon.systemTopology(topologyConf, topology);
+        this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
+        this.componentToStreamToFields = new HashMap<>();
+        for (String c : ThriftTopologyUtils.getComponentIds(systemTopology)) {
+            Map<String, Fields> streamToFields = new HashMap<>();
+            for (Map.Entry<String, StreamInfo> stream : ThriftTopologyUtils.getComponentCommon(systemTopology, c).get_streams().entrySet()) {
+                streamToFields.put(stream.getKey(), new Fields(stream.getValue().get_output_fields()));
+            }
+            componentToStreamToFields.put(c, streamToFields);
+        }
+        this.componentToSortedTasks = Utils.reverseMap(taskToComponent);
+        this.componentToSortedTasks.values().forEach(Collections::sort);
+        this.endpointSocketLock = new ReentrantReadWriteLock();
+        this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap<>());
+        this.cachedTaskToNodePort = new AtomicReference<>(new HashMap<>());
+        this.taskToShortExecutor = new HashMap<>();
+        for (List<Long> executor : this.executors) {
+            for (Integer task : StormCommon.executorIdToTasks(executor)) {
+                taskToShortExecutor.put(task, executor.get(0).intValue());
+            }
+        }
+        this.suicideCallback = Utils.mkSuicideFn();
+        this.uptime = Utils.makeUptimeComputer();
+        this.defaultSharedResources = makeDefaultResources();
+        this.userSharedResources = makeUserResources();
+        this.loadMapping = new LoadMapping();
+        this.assignmentVersions = new AtomicReference<>(new HashMap<>());
+        this.outboundTasks = workerOutboundTasks();
+        this.trySerializeLocal = topologyConf.containsKey(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE)
+            && (Boolean) topologyConf.get(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
+        if (trySerializeLocal) {
+            LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
+        }
+        this.drainer = new TransferDrainer();
+    }
+
+    public void refreshConnections() {
+        try {
+            refreshConnections(() -> refreshConnectionsTimer.schedule(0, this::refreshConnections));
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    public void refreshConnections(Runnable callback) throws Exception {
+        Integer version = stormClusterState.assignmentVersion(topologyId, callback);
+        version = (null == version) ? 0 : version;
+        VersionedData<Assignment> assignmentVersion = assignmentVersions.get().get(topologyId);
+        Assignment assignment;
+        if (null != assignmentVersion && (assignmentVersion.getVersion() == version)) {
+            assignment = assignmentVersion.getData();
+        } else {
+            VersionedData<Assignment>
+                newAssignmentVersion = new VersionedData<>(version,
+                stormClusterState.assignmentInfoWithVersion(topologyId, callback).getData());
+            assignmentVersions.getAndUpdate(prev -> {
+                Map<String, VersionedData<Assignment>> next = new HashMap<>(prev);
+                next.put(topologyId, newAssignmentVersion);
+                return next;
+            });
+            assignment = newAssignmentVersion.getData();
+        }
+
+        Set<NodeInfo> neededConnections = new HashSet<>();
+        Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>();
+        if (null != assignment) {
+            Map<Integer, NodeInfo> taskToNodePort = StormCommon.taskToNodeport(assignment.get_executor_node_port());
+            for (Map.Entry<Integer, NodeInfo> taskToNodePortEntry : taskToNodePort.entrySet()) {
+                Integer task = taskToNodePortEntry.getKey();
+                if (outboundTasks.contains(task)) {
+                    newTaskToNodePort.put(task, taskToNodePortEntry.getValue());
+                    if (!taskIds.contains(task)) {
+                        neededConnections.add(taskToNodePortEntry.getValue());
+                    }
+                }
+            }
+        }
+
+        Set<NodeInfo> currentConnections = cachedNodeToPortSocket.get().keySet();
+        Set<NodeInfo> newConnections = Sets.difference(neededConnections, currentConnections);
+        Set<NodeInfo> removeConnections = Sets.difference(currentConnections, neededConnections);
+
+        // Add new connections atomically
+        cachedNodeToPortSocket.getAndUpdate(prev -> {
+            Map<NodeInfo, IConnection> next = new HashMap<>(prev);
+            for (NodeInfo nodeInfo : newConnections) {
+                next.put(nodeInfo,
+                    mqContext.connect(
+                        topologyId,
+                        assignment.get_node_host().get(nodeInfo.get_node()),    // Host
+                        nodeInfo.get_port().iterator().next().intValue()));     // Port
+            }
+            return next;
+        });
+
+
+        try {
+            endpointSocketLock.writeLock().lock();
+            cachedTaskToNodePort.set(newTaskToNodePort);
+        } finally {
+            endpointSocketLock.writeLock().unlock();
+        }
+
+        for (NodeInfo nodeInfo : removeConnections) {
+            cachedNodeToPortSocket.get().get(nodeInfo).close();
+        }
+
+        // Remove old connections atomically
+        cachedNodeToPortSocket.getAndUpdate(prev -> {
+            Map<NodeInfo, IConnection> next = new HashMap<>(prev);
+            removeConnections.forEach(next::remove);
+            return next;
+        });
+
+    }
+
+    public void refreshStormActive() {
+        refreshStormActive(() -> refreshActiveTimer.schedule(0, this::refreshStormActive));
+    }
+
+    public void refreshStormActive(Runnable callback) {
+        StormBase base = stormClusterState.stormBase(topologyId, callback);
+        isTopologyActive.set(
+            (null != base) &&
+            (base.get_status() == TopologyStatus.ACTIVE) &&
+            (isWorkerActive.get()));
+        if (null != base) {
+            Map<String, DebugOptions> debugOptionsMap = new HashMap<>(base.get_component_debug());
+            for (DebugOptions debugOptions : debugOptionsMap.values()) {
+                if (!debugOptions.is_set_samplingpct()) {
+                    debugOptions.set_samplingpct(10);
+                }
+                if (!debugOptions.is_set_enable()) {
+                    debugOptions.set_enable(false);
+                }
+            }
+            stormComponentToDebug.set(debugOptionsMap);
+            LOG.debug("Events debug options {}", stormComponentToDebug.get());
+        }
+    }
+
+    public void refreshThrottle() {
+        boolean backpressure = stormClusterState.topologyBackpressure(topologyId, this::refreshThrottle);
+        this.throttleOn.set(backpressure);
+    }
+
+    public void refreshLoad() {
+        Set<Integer> remoteTasks = Sets.difference(new HashSet<Integer>(outboundTasks), new HashSet<>(taskIds));
+        Long now = System.currentTimeMillis();
+        Map<Integer, Double> localLoad = shortExecutorReceiveQueueMap.entrySet().stream().collect(Collectors.toMap(
+            (Function<Map.Entry<Integer, DisruptorQueue>, Integer>) Map.Entry::getKey,
+            (Function<Map.Entry<Integer, DisruptorQueue>, Double>) entry -> {
+                DisruptorQueue.QueueMetrics qMetrics = entry.getValue().getMetrics();
+                return ( (double) qMetrics.population()) / qMetrics.capacity();
+            }));
+
+        Map<Integer, Load> remoteLoad = new HashMap<>();
+        cachedNodeToPortSocket.get().values().stream().forEach(conn -> remoteLoad.putAll(conn.getLoad(remoteTasks)));
+        loadMapping.setLocal(localLoad);
+        loadMapping.setRemote(remoteLoad);
+
+        if (now > nextUpdate.get()) {
+            receiver.sendLoadMetrics(localLoad);
+            nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
+        }
+    }
+
+    /**
+     * we will wait all connections to be ready and then activate the spout/bolt
+     * when the worker bootup
+     */
+    public void activateWorkerWhenAllConnectionsReady() {
+        int delaySecs = 0;
+        int recurSecs = 1;
+        refreshActiveTimer.schedule(delaySecs, new Runnable() {
+            @Override public void run() {
+                if (areAllConnectionsReady()) {
+                    LOG.info("All connections are ready for worker {}:{} with id", assignmentId, port, workerId);
+                    isWorkerActive.set(Boolean.TRUE);
+                } else {
+                    refreshActiveTimer.schedule(recurSecs, () -> activateWorkerWhenAllConnectionsReady(), false, 0);
+                }
+            }
+        });
+    }
+
+    public void registerCallbacks() {
+        LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
+        receiver.registerRecv(new DeserializingConnectionCallback(topologyConf,
+            getWorkerTopologyContext(),
+            this::transferLocal));
+    }
+
+    public void transferLocal(List<AddressedTuple> tupleBatch) {
+        Map<Integer, List<AddressedTuple>> grouped = new HashMap<>();
+        for (AddressedTuple tuple : tupleBatch) {
+            Integer executor = taskToShortExecutor.get(tuple.dest);
+            if (null == executor) {
+                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
+                continue;
+            }
+            List<AddressedTuple> current = grouped.get(executor);
+            if (null == current) {
+                current = new ArrayList<>();
+                grouped.put(executor, current);
+            }
+            current.add(tuple);
+        }
+
+        for (Map.Entry<Integer, List<AddressedTuple>> entry : grouped.entrySet()) {
+            DisruptorQueue queue = shortExecutorReceiveQueueMap.get(entry.getKey());
+            if (null != queue) {
+                queue.publish(entry.getValue());
+            } else {
+                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
+            }
+        }
+    }
+
+    public void transfer(KryoTupleSerializer serializer, List<AddressedTuple> tupleBatch) {
+        if (trySerializeLocal) {
+            assertCanSerialize(serializer, tupleBatch);
+        }
+        List<AddressedTuple> local = new ArrayList<>();
+        Map<Integer, List<TaskMessage>> remoteMap = new HashMap<>();
+        for (AddressedTuple addressedTuple : tupleBatch) {
+            int destTask = addressedTuple.getDest();
+            if (taskIds.contains(destTask)) {
+                // Local task
+                local.add(addressedTuple);
+            } else {
+                // Using java objects directly to avoid performance issues in java code
+                if (! remoteMap.containsKey(destTask)) {
+                    remoteMap.put(destTask, new ArrayList<>());
+                }
+                remoteMap.get(destTask).add(new TaskMessage(destTask, serializer.serialize(addressedTuple.getTuple())));
+            }
+        }
+
+        if (!local.isEmpty()) {
+            transferLocal(local);
+        }
+        if (!remoteMap.isEmpty()) {
+            transferQueue.publish(remoteMap);
+        }
+    }
+
+    // TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
+    public void sendTuplesToRemoteWorker(HashMap<Integer, ArrayList<TaskMessage>> packets, long seqId, boolean batchEnd) {
+        drainer.add(packets);
+        if (batchEnd) {
+            ReentrantReadWriteLock.ReadLock readLock = endpointSocketLock.readLock();
+            try {
+                readLock.lock();
+                drainer.send(cachedTaskToNodePort.get(), cachedNodeToPortSocket.get());
+            } finally {
+                readLock.unlock();
+            }
+            drainer.clear();
+        }
+    }
+
+
+    private void assertCanSerialize(KryoTupleSerializer serializer, List<AddressedTuple> tuples) {
+        // Check that all of the tuples can be serialized by serializing them
+        for (AddressedTuple addressedTuple : tuples) {
+            serializer.serialize(addressedTuple.getTuple());
+        }
+    }
+
+    public WorkerTopologyContext getWorkerTopologyContext() {
+        try {
+            String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, topologyId));
+            String pidDir = ConfigUtils.workerPidsRoot(conf, topologyId);
+            return new WorkerTopologyContext(systemTopology, topologyConf, taskToComponent, componentToSortedTasks,
+                componentToStreamToFields, topologyId, codeDir, pidDir, port, taskIds,
+                defaultSharedResources,
+                userSharedResources);
+        } catch (IOException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    public void runWorkerStartHooks() {
+        WorkerTopologyContext workerContext = getWorkerTopologyContext();
+        for (ByteBuffer hook : topology.get_worker_hooks()) {
+            byte[] hookBytes = Utils.toByteArray(hook);
+            BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, BaseWorkerHook.class);
+            hookObject.start(topologyConf, workerContext);
+
+        }
+    }
+
+    public void runWorkerShutdownHooks() {
+        for (ByteBuffer hook : topology.get_worker_hooks()) {
+            byte[] hookBytes = Utils.toByteArray(hook);
+            BaseWorkerHook hookObject = Utils.javaDeserialize(hookBytes, BaseWorkerHook.class);
+            hookObject.shutdown();
+
+        }
+    }
+
+    public void closeResources() {
+        LOG.info("Shutting down default resources");
+        ((ExecutorService) defaultSharedResources.get(WorkerTopologyContext.SHARED_EXECUTOR)).shutdownNow();
+        LOG.info("Shut down default resources");
+    }
+
+    public boolean areAllConnectionsReady() {
+        return cachedNodeToPortSocket.get().values()
+            .stream()
+            .map(WorkerState::isConnectionReady)
+            .reduce((left, right) -> left && right)
+            .orElse(true);
+    }
+
+    public static boolean isConnectionReady(IConnection connection) {
+        return !(connection instanceof ConnectionWithStatus)
+            || ((ConnectionWithStatus) connection).status() == ConnectionWithStatus.Status.Ready;
+    }
+
+    private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
+        int port) {
+        LOG.info("Reading assignments");
+        List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
+        executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
+        Map<List<Long>, NodeInfo> executorToNodePort =
+            stormClusterState.assignmentInfo(topologyId, null).get_executor_node_port();
+        for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
+            NodeInfo nodeInfo = entry.getValue();
+            if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
+                executorsAssignedToThisWorker.add(entry.getKey());
+            }
+        }
+        return executorsAssignedToThisWorker;
+    }
+
+    private Map<List<Long>, DisruptorQueue> mkReceiveQueueMap(Map topologyConf, Set<List<Long>> executors) {
+        Map<List<Long>, DisruptorQueue> receiveQueueMap = new HashMap<>();
+        for (List<Long> executor : executors) {
+            receiveQueueMap.put(executor, new DisruptorQueue("receive-queue",
+                ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE)),
+                (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS),
+                ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE)),
+                (long) topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS)));
+        }
+        return receiveQueueMap;
+    }
+    
+    private Map<String, Object> makeDefaultResources() {
+        int threadPoolSize = ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE));
+        return ImmutableMap.of(WorkerTopologyContext.SHARED_EXECUTOR, Executors.newFixedThreadPool(threadPoolSize));
+    }
+    
+    private Map<String, Object> makeUserResources() {
+        /* TODO: need to invoke a hook provided by the topology, giving it a chance to create user resources.
+        * this would be part of the initialization hook
+        * need to separate workertopologycontext into WorkerContext and WorkerUserContext.
+        * actually just do it via interfaces. just need to make sure to hide setResource from tasks
+        */
+        return new HashMap<>();
+    }
+
+    private StormTimer mkHaltingTimer(String name) {
+        return new StormTimer(name, (thread, exception) -> {
+            LOG.error("Error when processing event", exception);
+            Utils.exitProcess(20, "Error when processing an event");
+        });
+    }
+
+    /**
+     *
+     * @return seq of task ids that receive messages from this worker
+     */
+    private Set<Integer> workerOutboundTasks() {
+        WorkerTopologyContext context = getWorkerTopologyContext();
+        Set<String> components = new HashSet<>();
+        for (Integer taskId : taskIds) {
+            for (Map<String, Grouping> value : context.getTargets(context.getComponentId(taskId)).values()) {
+                components.addAll(value.keySet());
+            }
+        }
+
+        Set<Integer> outboundTasks = new HashSet<>();
+
+        for (Map.Entry<String, List<Integer>> entry : Utils.reverseMap(taskToComponent).entrySet()) {
+            if (components.contains(entry.getKey())) {
+                outboundTasks.addAll(entry.getValue());
+            }
+        }
+        return outboundTasks;
+    }
+
+    public interface ILocalTransferCallback {
+        void transfer(List<AddressedTuple> tupleBatch);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/dependency/DependencyBlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/dependency/DependencyBlobStoreUtils.java b/storm-client/src/jvm/org/apache/storm/dependency/DependencyBlobStoreUtils.java
new file mode 100644
index 0000000..9ec680c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/dependency/DependencyBlobStoreUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.dependency;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.util.UUID;
+
+public class DependencyBlobStoreUtils {
+
+    private static final String BLOB_DEPENDENCIES_PREFIX = "dep-";
+
+    public static String generateDependencyBlobKey(String key) {
+        return BLOB_DEPENDENCIES_PREFIX + key;
+    }
+
+    public static String applyUUIDToFileName(String fileName) {
+        String fileNameWithExt = com.google.common.io.Files.getNameWithoutExtension(fileName);
+        String ext = com.google.common.io.Files.getFileExtension(fileName);
+        if (StringUtils.isEmpty(ext)) {
+            fileName = fileName + "-" + UUID.randomUUID();
+        } else {
+            fileName = fileNameWithExt + "-" + UUID.randomUUID() + "." + ext;
+        }
+        return fileName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java b/storm-client/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java
new file mode 100644
index 0000000..d360ae0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/dependency/DependencyPropertiesParser.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.dependency;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DependencyPropertiesParser {
+    public List<File> parseJarsProperties(String prop) {
+        if (prop.trim().isEmpty()) {
+            // handle no input
+            return Collections.emptyList();
+        }
+
+        List<String> dependencies = Arrays.asList(prop.split(","));
+        return Lists.transform(dependencies, new Function<String, File>() {
+            @Override
+            public File apply(String filePath) {
+                return new File(filePath);
+            }
+        });
+    }
+
+    public Map<String, File> parseArtifactsProperties(String prop) {
+        try {
+            Map<String, String> parsed = (Map<String, String>) JSONValue.parseWithException(prop);
+            Map<String, File> packages = new LinkedHashMap<>(parsed.size());
+            for (Map.Entry<String, String> artifactToFilePath : parsed.entrySet()) {
+                packages.put(artifactToFilePath.getKey(), new File(artifactToFilePath.getValue()));
+            }
+
+            return packages;
+        } catch (ParseException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
new file mode 100644
index 0000000..6fac380
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.dependency;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class DependencyUploader {
+    public static final Logger LOG = LoggerFactory.getLogger(DependencyUploader.class);
+
+    private final Map<String, Object> conf;
+    private ClientBlobStore blobStore;
+
+    public DependencyUploader() {
+        conf = Utils.readStormConfig();
+    }
+
+    public void init() {
+        //NOOP
+    }
+
+    public void shutdown() {
+        if (blobStore != null) {
+            blobStore.shutdown();
+        }
+    }
+
+    @VisibleForTesting
+    void setBlobStore(ClientBlobStore blobStore) {
+        this.blobStore = blobStore;
+    }
+
+    private synchronized ClientBlobStore getBlobStore() {
+        if (blobStore == null) {
+            blobStore = Utils.getClientBlobStore(conf);
+        }
+        return blobStore;
+    }
+
+    public List<String> uploadFiles(List<File> dependencies, boolean cleanupIfFails) throws IOException, AuthorizationException {
+        checkFilesExist(dependencies);
+
+        List<String> keys = new ArrayList<>(dependencies.size());
+        try {
+            for (File dependency : dependencies) {
+                String fileName = dependency.getName();
+                String key = DependencyBlobStoreUtils.generateDependencyBlobKey(DependencyBlobStoreUtils.applyUUIDToFileName(fileName));
+
+                try {
+                    uploadDependencyToBlobStore(key, dependency);
+                } catch (KeyAlreadyExistsException e) {
+                    // it should never happened since we apply UUID
+                    throw new RuntimeException(e);
+                }
+
+                keys.add(key);
+            }
+        } catch (Throwable e) {
+            if (getBlobStore() != null && cleanupIfFails) {
+                deleteBlobs(keys);
+            }
+            throw new RuntimeException(e);
+        }
+
+        return keys;
+    }
+
+    public List<String> uploadArtifacts(Map<String, File> artifacts) {
+        checkFilesExist(artifacts.values());
+
+        List<String> keys = new ArrayList<>(artifacts.size());
+        try {
+            for (Map.Entry<String, File> artifactToFile : artifacts.entrySet()) {
+                String artifact = artifactToFile.getKey();
+                File dependency = artifactToFile.getValue();
+
+                String key = DependencyBlobStoreUtils.generateDependencyBlobKey(convertArtifactToJarFileName(artifact));
+                try {
+                    uploadDependencyToBlobStore(key, dependency);
+                } catch (KeyAlreadyExistsException e) {
+                    // we lose the race, but it doesn't matter
+                }
+
+                keys.add(key);
+            }
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+
+        return keys;
+    }
+
+    public void deleteBlobs(List<String> keys) {
+        for (String key : keys) {
+            try {
+                getBlobStore().deleteBlob(key);
+            } catch (Throwable e) {
+                LOG.warn("blob delete failed - key: {} continue...", key);
+            }
+        }
+    }
+
+    private String convertArtifactToJarFileName(String artifact) {
+        return artifact.replace(":", "-") + ".jar";
+    }
+
+    private boolean uploadDependencyToBlobStore(String key, File dependency)
+            throws KeyAlreadyExistsException, AuthorizationException, IOException {
+
+        boolean uploadNew = false;
+        try {
+            // FIXME: we can filter by listKeys() with local blobstore when STORM-1986 is going to be resolved
+            // as a workaround, we call getBlobMeta() for all keys
+            getBlobStore().getBlobMeta(key);
+        } catch (KeyNotFoundException e) {
+            // TODO: do we want to add ACL here?
+            AtomicOutputStream blob = getBlobStore()
+                    .createBlob(key, new SettableBlobMeta(new ArrayList<AccessControl>()));
+            Files.copy(dependency.toPath(), blob);
+            blob.close();
+
+            uploadNew = true;
+        }
+
+        return uploadNew;
+    }
+
+    private void checkFilesExist(Collection<File> dependencies) {
+        for (File dependency : dependencies) {
+            if (!dependency.isFile() || !dependency.exists()) {
+                throw new FileNotAvailableException(dependency.getAbsolutePath());
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java b/storm-client/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java
new file mode 100644
index 0000000..442e470
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/dependency/FileNotAvailableException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.dependency;
+
+public class FileNotAvailableException extends RuntimeException {
+    public FileNotAvailableException(String fileName) {
+        super(createMessage(fileName));
+    }
+
+    public FileNotAvailableException(String fileName, Throwable cause) {
+        super(createMessage(fileName), cause);
+    }
+
+    private static String createMessage(String fileName) {
+        return "This file is not available: " + fileName;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
new file mode 100644
index 0000000..956999e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.generated.DistributedRPCInvocations;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.security.auth.ThriftClient;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
+    public static final Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
+    private final AtomicReference<DistributedRPCInvocations.Client> client = new AtomicReference<>();
+    private String host;
+    private int port;
+
+    public DRPCInvocationsClient(Map conf, String host, int port) throws TTransportException {
+        super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null);
+        this.host = host;
+        this.port = port;
+        client.set(new DistributedRPCInvocations.Client(_protocol));
+    }
+        
+    public String getHost() {
+        return host;
+    }
+    
+    public int getPort() {
+        return port;
+    }       
+
+    public void reconnectClient() throws TException {
+        if (client.get() == null) {
+            reconnect();
+            client.set(new DistributedRPCInvocations.Client(_protocol));
+        }
+    }
+
+    public boolean isConnected() {
+        return client.get() != null;
+    }
+
+    public void result(String id, String result) throws TException, AuthorizationException {
+        DistributedRPCInvocations.Client c = client.get();
+        try {
+            if (c == null) {
+                throw new TException("Client is not connected...");
+            }
+            c.result(id, result);
+        } catch(AuthorizationException aze) {
+            throw aze;
+        } catch(TException e) {
+            client.compareAndSet(c, null);
+            throw e;
+        }
+    }
+
+    public DRPCRequest fetchRequest(String func) throws TException, AuthorizationException {
+        DistributedRPCInvocations.Client c = client.get();
+        try {
+            if (c == null) {
+                throw new TException("Client is not connected...");
+            }
+            return c.fetchRequest(func);
+        } catch(AuthorizationException aze) {
+            throw aze;
+        } catch(TException e) {
+            client.compareAndSet(c, null);
+            throw e;
+        }
+    }    
+
+    public void failRequest(String id) throws TException, AuthorizationException {
+        DistributedRPCInvocations.Client c = client.get();
+        try {
+            if (c == null) {
+                throw new TException("Client is not connected...");
+            }
+            c.failRequest(id);
+        } catch(AuthorizationException aze) {
+            throw aze;
+        } catch(TException e) {
+            client.compareAndSet(c, null);
+            throw e;
+        }
+    }
+
+    public DistributedRPCInvocations.Client getClient() {
+        return client.get();
+    }
+
+    @Override
+    public void failRequestV2(String id, DRPCExecutionException ex) throws AuthorizationException, TException {
+        DistributedRPCInvocations.Client c = client.get();
+        try {
+            if (c == null) {
+                throw new TException("Client is not connected...");
+            }
+            c.failRequestV2(id, ex);
+        } catch(AuthorizationException aze) {
+            throw aze;
+        } catch(TException e) {
+            client.compareAndSet(c, null);
+            throw e;
+        }
+    }
+}