You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:30 UTC

[41/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
new file mode 100644
index 0000000..372df1f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.Thrift;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.metric.EventLoggerBolt;
+import org.apache.storm.metric.MetricsConsumerBolt;
+import org.apache.storm.metric.SystemBolt;
+import org.apache.storm.metric.filter.FilterByMetricName;
+import org.apache.storm.metric.util.DataPointExpander;
+import org.apache.storm.security.auth.IAuthorizer;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ThriftTopologyUtils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class StormCommon {
+    // A singleton instance allows us to mock delegated static methods in our
+    // tests by subclassing.
+    private static StormCommon _instance = new StormCommon();
+
+    /**
+     * Provide an instance of this class for delegates to use.  To mock out
+     * delegated methods, provide an instance of a subclass that overrides the
+     * implementation of the delegated method.
+     *
+     * @param common a StormCommon instance
+     * @return the previously set instance
+     */
+    public static StormCommon setInstance(StormCommon common) {
+        StormCommon oldInstance = _instance;
+        _instance = common;
+        return oldInstance;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(StormCommon.class);
+
+    public static final String SYSTEM_STREAM_ID = "__system";
+
+    public static final String EVENTLOGGER_COMPONENT_ID = "__eventlogger";
+    public static final String EVENTLOGGER_STREAM_ID = "__eventlog";
+
+    public static final String TOPOLOGY_METRICS_CONSUMER_CLASS = "class";
+    public static final String TOPOLOGY_METRICS_CONSUMER_ARGUMENT = "argument";
+    public static final String TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES = "max.retain.metric.tuples";
+    public static final String TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT = "parallelism.hint";
+    public static final String TOPOLOGY_METRICS_CONSUMER_WHITELIST = "whitelist";
+    public static final String TOPOLOGY_METRICS_CONSUMER_BLACKLIST = "blacklist";
+    public static final String TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE = "expandMapType";
+    public static final String TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR = "metricNameSeparator";
+
+    @Deprecated
+    public static String getStormId(final IStormClusterState stormClusterState, final String topologyName) {
+        return stormClusterState.getTopoId(topologyName).get();
+    }
+
+    public static void validateDistributedMode(Map conf) {
+        if (ConfigUtils.isLocalMode(conf)) {
+            throw new IllegalArgumentException("Cannot start server in local mode!");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static void validateIds(StormTopology topology) throws InvalidTopologyException {
+        List<String> componentIds = new ArrayList<>();
+
+        for (StormTopology._Fields field : Thrift.getTopologyFields()) {
+            if (!ThriftTopologyUtils.isWorkerHook(field) && !ThriftTopologyUtils.isDependencies(field)) {
+                Object value = topology.getFieldValue(field);
+                Map<String, Object> componentMap = (Map<String, Object>) value;
+                componentIds.addAll(componentMap.keySet());
+
+                for (String id : componentMap.keySet()) {
+                    if (Utils.isSystemId(id)) {
+                        throw new InvalidTopologyException(id + " is not a valid component id.");
+                    }
+                }
+                for (Object componentObj : componentMap.values()) {
+                    ComponentCommon common = getComponentCommon(componentObj);
+                    Set<String> streamIds = common.get_streams().keySet();
+                    for (String id : streamIds) {
+                        if (Utils.isSystemId(id)) {
+                            throw new InvalidTopologyException(id + " is not a valid stream id.");
+                        }
+                    }
+                }
+            }
+        }
+
+        List<String> offending = Utils.getRepeat(componentIds);
+        if (!offending.isEmpty()) {
+            throw new InvalidTopologyException("Duplicate component ids: " + offending);
+        }
+    }
+
+    private static boolean isEmptyInputs(ComponentCommon common) {
+        if (common.get_inputs() == null) {
+            return true;
+        } else {
+            return common.get_inputs().isEmpty();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Map<String, Object> allComponents(StormTopology topology) {
+        Map<String, Object> components = new HashMap<>();
+        List<StormTopology._Fields> topologyFields = Arrays.asList(Thrift.getTopologyFields());
+        for (StormTopology._Fields field : topologyFields) {
+            if (!ThriftTopologyUtils.isWorkerHook(field) && !ThriftTopologyUtils.isDependencies(field)) {
+                components.putAll(((Map) topology.getFieldValue(field)));
+            }
+        }
+        return components;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Map<String, Object> componentConf(Object component) {
+        try {
+            Map<String, Object> conf = new HashMap<>();
+            ComponentCommon common = getComponentCommon(component);
+            String jconf = common.get_json_conf();
+            if (jconf != null) {
+                conf.putAll((Map<String, Object>) JSONValue.parseWithException(jconf));
+            }
+            return conf;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void validateBasic(StormTopology topology) throws InvalidTopologyException {
+        validateIds(topology);
+
+        for (StormTopology._Fields field : Thrift.getSpoutFields()) {
+            Map<String, Object> spoutComponents = (Map<String, Object>) topology.getFieldValue(field);
+            if (spoutComponents != null) {
+                for (Object obj : spoutComponents.values()) {
+                    ComponentCommon common = getComponentCommon(obj);
+                    if (!isEmptyInputs(common)) {
+                        throw new InvalidTopologyException("May not declare inputs for a spout");
+                    }
+                }
+            }
+        }
+
+        Map<String, Object> componentMap = allComponents(topology);
+        for (Object componentObj : componentMap.values()) {
+            Map conf = componentConf(componentObj);
+            ComponentCommon common = getComponentCommon(componentObj);
+            int parallelismHintNum = Thrift.getParallelismHint(common);
+            Integer taskNum = ObjectReader.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
+            if (taskNum > 0 && parallelismHintNum <= 0) {
+                throw new InvalidTopologyException("Number of executors must be greater than 0 when number of tasks is greater than 0");
+            }
+        }
+    }
+
+    private static Set<String> getStreamOutputFields(Map<String, StreamInfo> streams) {
+        Set<String> outputFields = new HashSet<>();
+        for (StreamInfo streamInfo : streams.values()) {
+            outputFields.addAll(streamInfo.get_output_fields());
+        }
+        return outputFields;
+    }
+
+    public static void validateStructure(StormTopology topology) throws InvalidTopologyException {
+        Map<String, Object> componentMap = allComponents(topology);
+        for (Map.Entry<String, Object> entry : componentMap.entrySet()) {
+            String componentId = entry.getKey();
+            ComponentCommon common = getComponentCommon(entry.getValue());
+            Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
+            for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
+                String sourceStreamId = input.getKey().get_streamId();
+                String sourceComponentId = input.getKey().get_componentId();
+                if (!componentMap.keySet().contains(sourceComponentId)) {
+                    throw new InvalidTopologyException("Component: [" + componentId +
+                            "] subscribes from non-existent component [" + sourceComponentId + "]");
+                }
+
+                ComponentCommon sourceComponent = getComponentCommon(componentMap.get(sourceComponentId));
+                if (!sourceComponent.get_streams().containsKey(sourceStreamId)) {
+                    throw new InvalidTopologyException("Component: [" + componentId +
+                            "] subscribes from non-existent stream: " +
+                            "[" + sourceStreamId + "] of component [" + sourceComponentId + "]");
+                }
+
+                Grouping grouping = input.getValue();
+                if (Thrift.groupingType(grouping) == Grouping._Fields.FIELDS) {
+                    List<String> fields = new ArrayList<>(grouping.get_fields());
+                    Map<String, StreamInfo> streams = sourceComponent.get_streams();
+                    Set<String> sourceOutputFields = getStreamOutputFields(streams);
+                    fields.removeAll(sourceOutputFields);
+                    if (fields.size() != 0) {
+                        throw new InvalidTopologyException("Component: [" + componentId +
+                                "] subscribes from stream: [" + sourceStreamId + "] of component " +
+                                "[" + sourceComponentId + "] + with non-existent fields: " + fields);
+                    }
+                }
+            }
+        }
+    }
+
+    public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
+        Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
+        Set<String> boltIds = topology.get_bolts().keySet();
+        Set<String> spoutIds = topology.get_spouts().keySet();
+
+        for (String id : spoutIds) {
+            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID),
+                    Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+        }
+
+        for (String id : boltIds) {
+            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID),
+                    Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID),
+                    Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
+                    Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+        }
+        return inputs;
+    }
+
+    public static IBolt makeAckerBolt() {
+        return _instance.makeAckerBoltImpl();
+    }
+
+    public IBolt makeAckerBoltImpl() {
+        return new Acker();
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void addAcker(Map conf, StormTopology topology) {
+        int ackerNum = ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+        Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
+
+        Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
+        outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
+        outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
+        outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
+
+        Map<String, Object> ackerConf = new HashMap<>();
+        ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
+        ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+
+        Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);
+
+        for (Bolt bolt : topology.get_bolts().values()) {
+            ComponentCommon common = bolt.get_common();
+            common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
+            common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
+            common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
+        }
+
+        for (SpoutSpec spout : topology.get_spouts().values()) {
+            ComponentCommon common = spout.get_common();
+            Map spoutConf = componentConf(spout);
+            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
+                    ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+            common.set_json_conf(JSONValue.toJSONString(spoutConf));
+            common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
+                    Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
+            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
+                    Thrift.prepareDirectGrouping());
+            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
+                    Thrift.prepareDirectGrouping());
+            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
+                    Thrift.prepareDirectGrouping());
+        }
+
+        topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
+    }
+
+    public static ComponentCommon getComponentCommon(Object component) {
+        ComponentCommon common = null;
+        if (component instanceof StateSpoutSpec) {
+            common = ((StateSpoutSpec) component).get_common();
+        } else if (component instanceof SpoutSpec) {
+            common = ((SpoutSpec) component).get_common();
+        } else if (component instanceof Bolt) {
+            common = ((Bolt) component).get_common();
+        }
+        return common;
+    }
+
+    public static void addMetricStreams(StormTopology topology) {
+        for (Object component : allComponents(topology).values()) {
+            ComponentCommon common = getComponentCommon(component);
+            StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
+            common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
+        }
+    }
+
+    public static void addSystemStreams(StormTopology topology) {
+        for (Object component : allComponents(topology).values()) {
+            ComponentCommon common = getComponentCommon(component);
+            StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("event"));
+            common.put_to_streams(SYSTEM_STREAM_ID, streamInfo);
+        }
+    }
+
+    public static List<String> eventLoggerBoltFields() {
+        return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID,
+                EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES);
+    }
+
+    public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
+        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+        Set<String> allIds = new HashSet<String>();
+        allIds.addAll(topology.get_bolts().keySet());
+        allIds.addAll(topology.get_spouts().keySet());
+
+        for (String id : allIds) {
+            inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
+                    Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
+        }
+        return inputs;
+    }
+
+    public static void addEventLogger(Map conf, StormTopology topology) {
+        Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
+                ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+        HashMap<String, Object> componentConf = new HashMap<>();
+        componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
+        componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+        Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(
+                eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
+
+        for (Object component : allComponents(topology).values()) {
+            ComponentCommon common = getComponentCommon(component);
+            common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
+        }
+        topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Map<String, Bolt> metricsConsumerBoltSpecs(Map conf, StormTopology topology) {
+        Map<String, Bolt> metricsConsumerBolts = new HashMap<>();
+
+        Set<String> componentIdsEmitMetrics = new HashSet<>();
+        componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
+        componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);
+
+        Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
+        for (String componentId : componentIdsEmitMetrics) {
+            inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
+        }
+
+        List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
+        if (registerInfo != null) {
+            Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
+            for (Map<String, Object> info : registerInfo) {
+                String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);
+                Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);
+                Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(
+                    TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);
+                Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);
+                Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
+                metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
+                List<String> whitelist = (List<String>) info.get(
+                    TOPOLOGY_METRICS_CONSUMER_WHITELIST);
+                List<String> blacklist = (List<String>) info.get(
+                    TOPOLOGY_METRICS_CONSUMER_BLACKLIST);
+                FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);
+                Boolean expandMapType = ObjectReader.getBoolean(info.get(
+                    TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE), false);
+                String metricNameSeparator = ObjectReader.getString(info.get(
+                    TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR), ".");
+                DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator);
+                MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument,
+                    maxRetainMetricTuples, filterPredicate, expander);
+                Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
+                    boltInstance, null, phintNum, metricsConsumerConf);
+
+                String id = className;
+                if (classOccurrencesMap.containsKey(className)) {
+                    // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
+                    int occurrenceNum = classOccurrencesMap.get(className);
+                    occurrenceNum++;
+                    classOccurrencesMap.put(className, occurrenceNum);
+                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;
+                } else {
+                    classOccurrencesMap.put(className, 1);
+                }
+                metricsConsumerBolts.put(id, metricsConsumerBolt);
+            }
+        }
+        return metricsConsumerBolts;
+    }
+
+    public static void addMetricComponents(Map conf, StormTopology topology) {
+        Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);
+        for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
+            topology.put_to_bolts(entry.getKey(), entry.getValue());
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public static void addSystemComponents(Map conf, StormTopology topology) {
+        Map<String, StreamInfo> outputStreams = new HashMap<>();
+        outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
+        outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
+        outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(Arrays.asList("creds")));
+
+        Map<String, Object> boltConf = new HashMap<>();
+        boltConf.put(Config.TOPOLOGY_TASKS, 0);
+
+        Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null, new SystemBolt(), outputStreams, 0, boltConf);
+        topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, systemBoltSpec);
+    }
+
+    public static StormTopology systemTopology(Map stormConf, StormTopology topology) throws InvalidTopologyException {
+        return _instance.systemTopologyImpl(stormConf, topology);
+    }
+
+    protected StormTopology systemTopologyImpl(Map stormConf, StormTopology topology) throws InvalidTopologyException {
+        validateBasic(topology);
+
+        StormTopology ret = topology.deepCopy();
+        addAcker(stormConf, ret);
+        if (hasEventLoggers(stormConf)) {
+            addEventLogger(stormConf, ret);
+        }
+        addMetricComponents(stormConf, ret);
+        addSystemComponents(stormConf, ret);
+        addMetricStreams(ret);
+        addSystemStreams(ret);
+
+        validateStructure(ret);
+
+        return ret;
+    }
+
+    public static boolean hasAckers(Map stormConf) {
+        Object ackerNum = stormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+        return ackerNum == null || ObjectReader.getInt(ackerNum) > 0;
+    }
+
+    public static boolean hasEventLoggers(Map stormConf) {
+        Object eventLoggerNum = stormConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS);
+        return eventLoggerNum == null || ObjectReader.getInt(eventLoggerNum) > 0;
+    }
+
+    public static int numStartExecutors(Object component) throws InvalidTopologyException {
+        ComponentCommon common = getComponentCommon(component);
+        return Thrift.getParallelismHint(common);
+    }
+
+    public static Map<Integer, String> stormTaskInfo(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
+        return _instance.stormTaskInfoImpl(userTopology, stormConf);
+    }
+
+    /*
+     * Returns map from task -> componentId
+     */
+    protected Map<Integer, String> stormTaskInfoImpl(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
+        Map<Integer, String> taskIdToComponentId = new HashMap<>();
+
+        StormTopology systemTopology = systemTopology(stormConf, userTopology);
+        Map<String, Object> components = allComponents(systemTopology);
+        Map<String, Integer> componentIdToTaskNum = new TreeMap<>();
+        for (Map.Entry<String, Object> entry : components.entrySet()) {
+            Map conf = componentConf(entry.getValue());
+            Object taskNum = conf.get(Config.TOPOLOGY_TASKS);
+            componentIdToTaskNum.put(entry.getKey(), ObjectReader.getInt(taskNum));
+        }
+
+        int taskId = 1;
+        for (Map.Entry<String, Integer> entry : componentIdToTaskNum.entrySet()) {
+            String componentId = entry.getKey();
+            Integer taskNum = entry.getValue();
+            while (taskNum > 0) {
+                taskIdToComponentId.put(taskId, componentId);
+                taskNum--;
+                taskId++;
+            }
+        }
+        return taskIdToComponentId;
+    }
+
+    public static List<Integer> executorIdToTasks(List<Long> executorId) {
+        List<Integer> taskIds = new ArrayList<>();
+        int taskId = executorId.get(0).intValue();
+        while (taskId <= executorId.get(1).intValue()) {
+            taskIds.add(taskId);
+            taskId++;
+        }
+        return taskIds;
+    }
+
+    public static Map<Integer, NodeInfo> taskToNodeport(Map<List<Long>, NodeInfo> executorToNodePort) {
+        Map<Integer, NodeInfo> tasksToNodePort = new HashMap<>();
+        for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
+            List<Integer> taskIds = executorIdToTasks(entry.getKey());
+            for (Integer taskId : taskIds) {
+                tasksToNodePort.put(taskId, entry.getValue());
+            }
+        }
+        return tasksToNodePort;
+    }
+
+    public static IAuthorizer mkAuthorizationHandler(String klassName, Map conf)
+            throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+        return _instance.mkAuthorizationHandlerImpl(klassName, conf);
+    }
+
+    protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf)
+            throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        IAuthorizer aznHandler = null;
+        if (StringUtils.isNotBlank(klassName)) {
+            Class<?> aznClass = Class.forName(klassName);
+            if (aznClass != null) {
+                aznHandler = (IAuthorizer) aznClass.newInstance();
+                if (aznHandler != null) {
+                    aznHandler.prepare(conf);
+                }
+                LOG.debug("authorization class name:{}, class:{}, handler:{}", klassName, aznClass, aznHandler);
+            }
+        }
+
+        return aznHandler;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static WorkerTopologyContext makeWorkerContext(Map<String, Object> workerData) {
+        try {
+            StormTopology stormTopology = (StormTopology) workerData.get(Constants.SYSTEM_TOPOLOGY);
+            Map stormConf = (Map) workerData.get(Constants.STORM_CONF);
+            Map<Integer, String> taskToComponent = (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT);
+            Map<String, List<Integer>> componentToSortedTasks =
+                    (Map<String, List<Integer>>) workerData.get(Constants.COMPONENT_TO_SORTED_TASKS);
+            Map<String, Map<String, Fields>> componentToStreamToFields =
+                    (Map<String, Map<String, Fields>>) workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS);
+            String stormId = (String) workerData.get(Constants.STORM_ID);
+            Map conf = (Map) workerData.get(Constants.CONF);
+            Integer port = (Integer) workerData.get(Constants.PORT);
+            String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, stormId));
+            String pidDir = ConfigUtils.workerPidsRoot(conf, stormId);
+            List<Integer> workerTasks = (List<Integer>) workerData.get(Constants.TASK_IDS);
+            Map<String, Object> defaultResources = (Map<String, Object>) workerData.get(Constants.DEFAULT_SHARED_RESOURCES);
+            Map<String, Object> userResources = (Map<String, Object>) workerData.get(Constants.USER_SHARED_RESOURCES);
+            return new WorkerTopologyContext(stormTopology, stormConf, taskToComponent, componentToSortedTasks,
+                    componentToStreamToFields, stormId, codeDir, pidDir, port, workerTasks, defaultResources, userResources);
+        } catch (IOException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
new file mode 100644
index 0000000..6316564
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.Config;
+import org.apache.storm.Thrift;
+import org.apache.storm.daemon.metrics.BuiltinMetrics;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.ShellComponent;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class Task {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    private Executor executor;
+    private WorkerState workerData;
+    private TopologyContext systemTopologyContext;
+    private TopologyContext userTopologyContext;
+    private WorkerTopologyContext workerTopologyContext;
+    private LoadMapping loadMapping;
+    private Integer taskId;
+    private String componentId;
+    private Object taskObject; // Spout/Bolt object
+    private Map stormConf;
+    private Callable<Boolean> emitSampler;
+    private CommonStats executorStats;
+    private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
+    private BuiltinMetrics builtInMetrics;
+    private boolean debug;
+
+    public Task(Executor executor, Integer taskId) throws IOException {
+        this.taskId = taskId;
+        this.executor = executor;
+        this.workerData = executor.getWorkerData();
+        this.stormConf = executor.getStormConf();
+        this.componentId = executor.getComponentId();
+        this.streamComponentToGrouper = executor.getStreamToComponentToGrouper();
+        this.executorStats = executor.getStats();
+        this.builtInMetrics = BuiltinMetricsUtil.mkData(executor.getType(), this.executorStats);
+        this.workerTopologyContext = executor.getWorkerTopologyContext();
+        this.emitSampler = ConfigUtils.mkStatsSampler(stormConf);
+        this.loadMapping = workerData.getLoadMapping();
+        this.systemTopologyContext = mkTopologyContext(workerData.getSystemTopology());
+        this.userTopologyContext = mkTopologyContext(workerData.getTopology());
+        this.taskObject = mkTaskObject();
+        this.debug = stormConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) stormConf.get(Config.TOPOLOGY_DEBUG);
+        this.addTaskHooks();
+    }
+
+    public List<Integer> getOutgoingTasks(Integer outTaskId, String stream, List<Object> values) {
+        if (debug) {
+            LOG.info("Emitting direct: {}; {} {} {} ", outTaskId, componentId, stream, values);
+        }
+        String targetComponent = workerTopologyContext.getComponentId(outTaskId);
+        Map<String, LoadAwareCustomStreamGrouping> componentGrouping = streamComponentToGrouper.get(stream);
+        LoadAwareCustomStreamGrouping grouping = componentGrouping.get(targetComponent);
+        if (null == grouping) {
+            outTaskId = null;
+        }
+        if (grouping != null && grouping != GrouperFactory.DIRECT) {
+            throw new IllegalArgumentException("Cannot emitDirect to a task expecting a regular grouping");
+        }
+        new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
+        try {
+            if (emitSampler.call()) {
+                executorStats.emittedTuple(stream);
+                if (null != outTaskId) {
+                    executorStats.transferredTuples(stream, 1);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        if (null != outTaskId) {
+            return Collections.singletonList(outTaskId);
+        }
+        return new ArrayList<>(0);
+    }
+
+    public List<Integer> getOutgoingTasks(String stream, List<Object> values) {
+        if (debug) {
+            LOG.info("Emitting: {} {} {}", componentId, stream, values);
+        }
+
+        List<Integer> outTasks = new ArrayList<>();
+        if (!streamComponentToGrouper.containsKey(stream)) {
+            throw new IllegalArgumentException("Unknown stream ID: " + stream);
+        }
+        if (null != streamComponentToGrouper.get(stream)) {
+            // null value for __system
+            for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) {
+                if (grouper == GrouperFactory.DIRECT) {
+                    throw new IllegalArgumentException("Cannot do regular emit to direct stream");
+                }
+                List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping);
+                outTasks.addAll(compTasks);
+            }
+        }
+        new EmitInfo(values, stream, taskId, outTasks).applyOn(userTopologyContext);
+        try {
+            if (emitSampler.call()) {
+                executorStats.emittedTuple(stream);
+                executorStats.transferredTuples(stream, outTasks.size());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return outTasks;
+    }
+
+    public Tuple getTuple(String stream, List values) {
+        return new TupleImpl(systemTopologyContext, values, systemTopologyContext.getThisTaskId(), stream);
+    }
+
+    public Integer getTaskId() {
+        return taskId;
+    }
+
+    public String getComponentId() {
+        return componentId;
+    }
+
+    public TopologyContext getUserContext() {
+        return userTopologyContext;
+    }
+
+    public Object getTaskObject() {
+        return taskObject;
+    }
+
+    public BuiltinMetrics getBuiltInMetrics() {
+        return builtInMetrics;
+    }
+
+    private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
+        Map conf = workerData.getConf();
+        return new TopologyContext(
+            topology,
+            workerData.getTopologyConf(),
+            workerData.getTaskToComponent(),
+            workerData.getComponentToSortedTasks(),
+            workerData.getComponentToStreamToFields(),
+            workerData.getTopologyId(),
+            ConfigUtils.supervisorStormResourcesPath(
+                    ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),
+                    ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
+            taskId,
+            workerData.getPort(), workerData.getTaskIds(),
+            workerData.getDefaultSharedResources(),
+            workerData.getUserSharedResources(),
+            executor.getSharedExecutorData(),
+            executor.getIntervalToTaskToMetricToRegistry(),
+            executor.getOpenOrPrepareWasCalled());
+    }
+
+    private Object mkTaskObject() {
+        StormTopology topology = systemTopologyContext.getRawTopology();
+        Map<String, SpoutSpec> spouts = topology.get_spouts();
+        Map<String, Bolt> bolts = topology.get_bolts();
+        Map<String, StateSpoutSpec> stateSpouts = topology.get_state_spouts();
+        Object result;
+        ComponentObject componentObject;
+        if (spouts.containsKey(componentId)) {
+            componentObject = spouts.get(componentId).get_spout_object();
+        } else if (bolts.containsKey(componentId)) {
+            componentObject = bolts.get(componentId).get_bolt_object();
+        } else if (stateSpouts.containsKey(componentId)) {
+            componentObject = stateSpouts.get(componentId).get_state_spout_object();
+        } else {
+            throw new RuntimeException("Could not find " + componentId + " in " + topology);
+        }
+        result = Utils.getSetComponentObject(componentObject);
+
+        if (result instanceof ShellComponent) {
+            if (spouts.containsKey(componentId)) {
+                result = new ShellSpout((ShellComponent) result);
+            } else {
+                result = new ShellBolt((ShellComponent) result);
+            }
+        }
+
+        if (result instanceof JavaObject) {
+            result = Thrift.instantiateJavaObject((JavaObject) result);
+        }
+
+        return result;
+    }
+
+    private void addTaskHooks() {
+        List<String> hooksClassList = (List<String>) stormConf.get(Config.TOPOLOGY_AUTO_TASK_HOOKS);
+        if (null != hooksClassList) {
+            for (String hookClass : hooksClassList) {
+                try {
+                    userTopologyContext.addTaskHook(((ITaskHook) Class.forName(hookClass).newInstance()));
+                } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+                    throw new RuntimeException("Failed to add hook: " + hookClass, e);
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinBoltMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinBoltMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinBoltMetrics.java
new file mode 100644
index 0000000..b9677f6
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinBoltMetrics.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.metric.internal.MultiCountStatAndMetric;
+import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
+import org.apache.storm.stats.BoltExecutorStats;
+
+public class BuiltinBoltMetrics extends BuiltinMetrics {
+    private final MultiCountStatAndMetric ackCount;
+    private final MultiCountStatAndMetric failCount;
+    private final MultiCountStatAndMetric emitCount;
+    private final MultiCountStatAndMetric executeCount;
+    private final MultiCountStatAndMetric transferCount;
+    private final MultiLatencyStatAndMetric executeLatency;
+    private final MultiLatencyStatAndMetric processLatency;
+
+    public BuiltinBoltMetrics(BoltExecutorStats stats) {
+        this.ackCount = stats.getAcked();
+        this.failCount = stats.getFailed();
+        this.emitCount = stats.getEmitted();
+        this.executeCount = stats.getExecuted();
+        this.transferCount = stats.getTransferred();
+        this.executeLatency = stats.getExecuteLatencies();
+        this.processLatency = stats.getProcessLatencies();
+
+        this.metricMap.put("ack-count", ackCount);
+        this.metricMap.put("fail-count", failCount);
+        this.metricMap.put("emit-count", emitCount);
+        this.metricMap.put("transfer-count", transferCount);
+        this.metricMap.put("execute-count", executeCount);
+        this.metricMap.put("process-latency", processLatency);
+        this.metricMap.put("execute-latency", executeLatency);
+    }
+
+    public MultiCountStatAndMetric getAckCount() {
+        return ackCount;
+    }
+
+    public MultiCountStatAndMetric getFailCount() {
+        return failCount;
+    }
+
+    public MultiCountStatAndMetric getEmitCount() {
+        return emitCount;
+    }
+
+    public MultiCountStatAndMetric getTransferCount() {
+        return transferCount;
+    }
+
+    public MultiCountStatAndMetric getExecuteCount() {
+        return executeCount;
+    }
+
+    public MultiLatencyStatAndMetric getExecuteLatency() {
+        return executeLatency;
+    }
+
+    public MultiLatencyStatAndMetric getProcessLatency() {
+        return processLatency;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java
new file mode 100644
index 0000000..0dacad1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.task.TopologyContext;
+
+public abstract class BuiltinMetrics {
+    protected final Map<String, IMetric> metricMap = new HashMap<>();
+
+    public void registerAll(Map stormConf, TopologyContext context) {
+        for (Map.Entry<String, IMetric> entry : metricMap.entrySet()) {
+            BuiltinMetricsUtil.registerMetric("__" + entry.getKey(), entry.getValue(), stormConf, context);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
new file mode 100644
index 0000000..2827420
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.api.StateMetric;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.TopologyContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BuiltinMetricsUtil {
+    public static BuiltinMetrics mkData(String type, CommonStats stats) {
+        if (StatsUtil.SPOUT.equals(type)) {
+            return new BuiltinSpoutMetrics((SpoutExecutorStats) stats);
+        } else if (StatsUtil.BOLT.equals(type)) {
+            return new BuiltinBoltMetrics((BoltExecutorStats) stats);
+        }
+        throw new RuntimeException("Invalid component type!");
+    }
+
+    public static void registerIconnectionServerMetric(Object server, Map stormConf, TopologyContext context) {
+        if (server instanceof IStatefulObject) {
+            registerMetric("__recv-iconnection", new StateMetric((IStatefulObject) server), stormConf, context);
+        }
+    }
+
+    public static void registerIconnectionClientMetrics(final Map nodePortToSocket, Map stormConf, TopologyContext context) {
+        IMetric metric = new IMetric() {
+            @Override
+            public Object getValueAndReset() {
+                Map<Object, Object> ret = new HashMap<>();
+                for (Object o : nodePortToSocket.entrySet()) {
+                    Map.Entry entry = (Map.Entry) o;
+                    Object nodePort = entry.getKey();
+                    Object connection = entry.getValue();
+                    if (connection instanceof IStatefulObject) {
+                        ret.put(nodePort, ((IStatefulObject) connection).getState());
+                    }
+                }
+                return ret;
+            }
+        };
+        registerMetric("__send-iconnection", metric, stormConf, context);
+    }
+
+    public static void registerQueueMetrics(Map queues, Map stormConf, TopologyContext context) {
+        for (Object o : queues.entrySet()) {
+            Map.Entry entry = (Map.Entry) o;
+            String name = "__" + entry.getKey();
+            IMetric metric = new StateMetric((IStatefulObject) entry.getValue());
+            registerMetric(name, metric, stormConf, context);
+        }
+    }
+
+    public static void registerMetric(String name, IMetric metric, Map stormConf, TopologyContext context) {
+        int bucketSize = ((Number) stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)).intValue();
+        context.registerMetric(name, metric, bucketSize);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinSpoutMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinSpoutMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinSpoutMetrics.java
new file mode 100644
index 0000000..5eef4bb
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinSpoutMetrics.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.metric.internal.MultiCountStatAndMetric;
+import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
+import org.apache.storm.stats.SpoutExecutorStats;
+
+public class BuiltinSpoutMetrics extends BuiltinMetrics {
+    private final MultiCountStatAndMetric ackCount;
+    private final MultiCountStatAndMetric failCount;
+    private final MultiCountStatAndMetric emitCount;
+    private final MultiCountStatAndMetric transferCount;
+    private final MultiLatencyStatAndMetric completeLatency;
+
+    public BuiltinSpoutMetrics(SpoutExecutorStats stats) {
+        this.ackCount = stats.getAcked();
+        this.failCount = stats.getFailed();
+        this.emitCount = stats.getEmitted();
+        this.transferCount = stats.getTransferred();
+        this.completeLatency = stats.getCompleteLatencies();
+
+        this.metricMap.put("ack-count", ackCount);
+        this.metricMap.put("fail-count", failCount);
+        this.metricMap.put("emit-count", emitCount);
+        this.metricMap.put("transfer-count", transferCount);
+        this.metricMap.put("complete-latency", completeLatency);
+    }
+
+    public MultiCountStatAndMetric getAckCount() {
+        return ackCount;
+    }
+
+    public MultiCountStatAndMetric getFailCount() {
+        return failCount;
+    }
+
+    public MultiCountStatAndMetric getEmitCount() {
+        return emitCount;
+    }
+
+    public MultiCountStatAndMetric getTransferCount() {
+        return transferCount;
+    }
+
+    public MultiLatencyStatAndMetric getCompleteLatency() {
+        return completeLatency;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
new file mode 100644
index 0000000..3c7f524
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.metric.api.CountMetric;
+import org.apache.storm.stats.CommonStats;
+
+public class SpoutThrottlingMetrics extends BuiltinMetrics {
+    private final CountMetric skippedMaxSpout = new CountMetric();
+    private final CountMetric skippedThrottle = new CountMetric();
+    private final CountMetric skippedInactive = new CountMetric();
+
+    public SpoutThrottlingMetrics() {
+        this.metricMap.put("skipped-max-spout", skippedMaxSpout);
+        this.metricMap.put("skipped-throttle", skippedThrottle);
+        this.metricMap.put("skipped-inactive", skippedInactive);
+    }
+
+    public CountMetric getSkippedMaxSpout() {
+        return skippedMaxSpout;
+    }
+
+    public CountMetric getSkippedThrottle() {
+        return skippedThrottle;
+    }
+
+    public CountMetric getSkippedInactive() {
+        return skippedInactive;
+    }
+
+    public void skippedMaxSpout(CommonStats stats) {
+        this.skippedMaxSpout.incrBy(stats.getRate());
+    }
+
+    public void skippedThrottle(CommonStats stats) {
+        this.skippedThrottle.incrBy(stats.getRate());
+    }
+
+    public void skippedInactive(CommonStats stats) {
+        this.skippedInactive.incrBy(stats.getRate());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
new file mode 100644
index 0000000..ba7f9db
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AdvancedFSOps implements IAdvancedFSOps {
+    private static final Logger LOG = LoggerFactory.getLogger(AdvancedFSOps.class);
+    
+    /**
+     * Factory to create a new AdvancedFSOps
+     * @param conf the configuration of the process
+     * @return the appropriate instance of the class for this config and environment.
+     */
+    public static AdvancedFSOps make(Map<String, Object> conf) {
+        if (Utils.isOnWindows()) {
+            return new AdvancedWindowsFSOps(conf);
+        }
+        if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            return new AdvancedRunAsUserFSOps(conf);
+        }
+        return new AdvancedFSOps(conf);
+    }
+    
+    private static class AdvancedRunAsUserFSOps extends AdvancedFSOps {
+        private final Map<String, Object> _conf;
+        
+        public AdvancedRunAsUserFSOps(Map<String, Object> conf) {
+            super(conf);
+            if (Utils.isOnWindows()) {
+                throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
+            }
+            _conf = conf;
+        }
+        
+        @Override
+        public void setupBlobPermissions(File path, String user) throws IOException {
+            String logPrefix = "setup blob permissions for " + path;
+            ClientSupervisorUtils.processLauncherAndWait(_conf, user, Arrays.asList("blob", path.toString()), null, logPrefix);
+        }
+        
+        @Override
+        public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
+            String absolutePath = path.getAbsolutePath();
+            LOG.info("Deleting path {}", absolutePath);
+            if (user == null) {
+                user = Files.getOwner(path.toPath()).getName();
+            }
+            List<String> commands = new ArrayList<>();
+            commands.add("rmr");
+            commands.add(absolutePath);
+            ClientSupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix);
+
+            if (Utils.checkFileExists(absolutePath)) {
+                // It's possible that permissions were not set properly on the directory, and
+                // the user who is *supposed* to own the dir does not. In this case, try the
+                // delete as the supervisor user.
+                Utils.forceDelete(absolutePath);
+                if (Utils.checkFileExists(absolutePath)) {
+                    throw new RuntimeException(path + " was not deleted.");
+                }
+            }
+        }
+        
+        @Override
+        public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
+            ClientSupervisorUtils.setupStormCodeDir(_conf, topologyConf, path.getCanonicalPath());
+        }
+
+        @Override
+        public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException {
+            ClientSupervisorUtils.setupWorkerArtifactsDir(_conf, topologyConf, path.getCanonicalPath());
+        }
+    }
+    
+    /**
+     * Operations that need to override the default ones when running on Windows
+     *
+     */
+    private static class AdvancedWindowsFSOps extends AdvancedFSOps {
+
+        public AdvancedWindowsFSOps(Map<String, Object> conf) {
+            super(conf);
+            if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+                throw new RuntimeException("ERROR: Windows doesn't support running workers as different users yet");
+            }
+        }
+        
+        @Override
+        public void restrictDirectoryPermissions(File dir) throws IOException {
+            //NOOP, if windows gets support for run as user we will need to find a way to support this
+        }
+        
+        @Override
+        public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException {
+            // Files/move with non-empty directory doesn't work well on Windows
+            // This is not atomic but it does work
+            FileUtils.moveDirectory(fromDir, toDir);
+        }
+        
+        @Override
+        public boolean supportsAtomicDirectoryMove() {
+            // Files/move with non-empty directory doesn't work well on Windows
+            // FileUtils.moveDirectory is not atomic
+            return false;
+        }
+    }
+
+    protected final boolean _symlinksDisabled;
+    
+    protected AdvancedFSOps(Map<String, Object> conf) {
+        _symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
+    }
+
+    /**
+     * Set directory permissions to (OWNER)RWX (GROUP)R-X (OTHER)---
+     * On some systems that do not support this, it may become a noop
+     * @param dir the directory to change permissions on
+     * @throws IOException on any error
+     */
+    public void restrictDirectoryPermissions(File dir) throws IOException {
+        Set<PosixFilePermission> perms = new HashSet<>(
+                Arrays.asList(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE,
+                        PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ,
+                        PosixFilePermission.GROUP_EXECUTE));
+        Files.setPosixFilePermissions(dir.toPath(), perms);
+    }
+
+    /**
+     * Move fromDir to toDir, and try to make it an atomic move if possible
+     * @param fromDir what to move
+     * @param toDir where to move it from
+     * @throws IOException on any error
+     */
+    public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException {
+        FileUtils.forceMkdir(toDir);
+        Files.move(fromDir.toPath(), toDir.toPath(), StandardCopyOption.ATOMIC_MOVE);
+    }
+
+    /**
+     * @return true if an atomic directory move works, else false.
+     */
+    public boolean supportsAtomicDirectoryMove() {
+        return true;
+    }
+
+    /**
+     * Copy a directory
+     * @param fromDir from where
+     * @param toDir to where
+     * @throws IOException on any error
+     */
+    public void copyDirectory(File fromDir, File toDir) throws IOException {
+        FileUtils.copyDirectory(fromDir, toDir);
+    }
+
+    /**
+     * Setup permissions properly for an internal blob store path
+     * @param path the path to set the permissions on
+     * @param user the user to change the permissions for
+     * @throws IOException on any error
+     */
+    public void setupBlobPermissions(File path, String user) throws IOException {
+        //Normally this is a NOOP
+    }
+
+    /**
+     * Delete a file or a directory and all of the children. If it exists.
+     * @param path what to delete
+     * @param user who to delete it as if doing it as someone else is supported
+     * @param logPrefix if an external process needs to be launched to delete
+     * the object what prefix to include in the logs
+     * @throws IOException on any error.
+     */
+    public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
+        //by default no need to do this as a different user
+        deleteIfExists(path);
+    }
+
+    /**
+     * Delete a file or a directory and all of the children. If it exists.
+     * @param path what to delete
+     * @throws IOException on any error.
+     */
+    public void deleteIfExists(File path) throws IOException {
+        LOG.info("Deleting path {}", path);
+        Path p = path.toPath();
+        if (Files.exists(p)) {
+            try {
+                FileUtils.forceDelete(path);
+            } catch (FileNotFoundException ignored) {}
+        }
+    }
+
+    /**
+     * Setup the permissions for the storm code dir
+     * @param topologyConf the config of the Topology
+     * @param path the directory to set the permissions on
+     * @throws IOException on any error
+     */
+    public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
+        //By default this is a NOOP
+    }
+
+    /**
+     * Setup the permissions for the worker artifacts dirs
+     * @param topologyConf the config of the Topology
+     * @param path the directory to set the permissions on
+     * @throws IOException on any error
+     */
+    public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException {
+        //By default this is a NOOP
+    }
+
+    /**
+     * Sanity check if everything the topology needs is there for it to run.
+     * @param conf the config of the supervisor
+     * @param topologyId the ID of the topology
+     * @return true if everything is there, else false
+     * @throws IOException on any error
+     */
+    public boolean doRequiredTopoFilesExist(Map<String, Object> conf, String topologyId) throws IOException {
+        return ClientSupervisorUtils.doRequiredTopoFilesExist(conf, topologyId);
+    }
+
+    /**
+     * Makes a directory, including any necessary but nonexistent parent
+     * directories.
+     *
+     * @param path the directory to create
+     * @throws IOException on any error
+     */
+    public void forceMkdir(File path) throws IOException {
+        FileUtils.forceMkdir(path);
+    }
+
+    /**
+     * Check if a file exists or not
+     * @param path the path to check
+     * @return true if it exists else false
+     * @throws IOException on any error.
+     */
+    public boolean fileExists(File path) throws IOException {
+        return path.exists();
+    }
+
+    /**
+     * Get a writer for the given location
+     * @param file the file to write to
+     * @return the Writer to use.
+     * @throws IOException on any error
+     */
+    public Writer getWriter(File file) throws IOException {
+        return new FileWriter(file);
+    }
+
+    /**
+     * Get an output stream to write to a given file
+     * @param file the file to write to
+     * @return an OutputStream for that file
+     * @throws IOException on any error
+     */
+    public OutputStream getOutputStream(File file) throws IOException {
+        return new FileOutputStream(file);
+    }
+
+    /**
+     * Dump a string to a file
+     * @param location where to write to
+     * @param data the data to write
+     * @throws IOException on any error
+     */
+    public void dump(File location, String data) throws IOException {
+        File parent = location.getParentFile();
+        if (!parent.exists()) {
+            forceMkdir(parent);
+        }
+        try (Writer w = getWriter(location)) {
+            w.write(data);
+        }
+    }
+
+    /**
+     * Read the contents of a file into a String
+     * @param location the file to read
+     * @return the contents of the file
+     * @throws IOException on any error
+     */
+    @Override
+    public String slurpString(File location) throws IOException {
+        return FileUtils.readFileToString(location, "UTF-8");
+    }
+
+    /**
+     * Read the contents of a file into a byte array.
+     * @param location the file to read
+     * @return the contents of the file
+     * @throws IOException on any error
+     */
+    @Override
+    public byte[] slurp(File location) throws IOException {
+        return FileUtils.readFileToByteArray(location);
+    }
+
+    /**
+     * Create a symbolic link pointing at target
+     * @param link the link to create
+     * @param target where it should point to
+     * @throws IOException on any error.
+     */
+    @Override
+    public void createSymlink(File link, File target) throws IOException {
+        if (_symlinksDisabled) {
+            throw new IOException("Symlinks have been disabled, this should not be called");
+        }
+        Path plink = link.toPath().toAbsolutePath();
+        Path ptarget = target.toPath().toAbsolutePath();
+        LOG.debug("Creating symlink [{}] to [{}]", plink, ptarget);
+        if (Files.exists(plink)) {
+            if (Files.isSameFile(plink, ptarget)) {
+                //It already points where we want it to
+                return;
+            }
+            FileUtils.forceDelete(link);
+        }
+        Files.createSymbolicLink(plink, ptarget);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
new file mode 100644
index 0000000..8cd7261
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class ClientSupervisorUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(ClientSupervisorUtils.class);
+
+    static boolean doRequiredTopoFilesExist(Map<String, Object> conf, String stormId) throws IOException {
+        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+        String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
+        String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
+        String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot);
+        if (!Utils.checkFileExists(stormroot))
+            return false;
+        if (!Utils.checkFileExists(stormcodepath))
+            return false;
+        if (!Utils.checkFileExists(stormconfpath))
+            return false;
+        if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath))
+            return true;
+        return false;
+    }
+
+    public static int processLauncherAndWait(Map<String, Object> conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
+            throws IOException {
+        int ret = 0;
+        Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null);
+        if (StringUtils.isNotBlank(logPreFix))
+            Utils.readAndLogStream(logPreFix, process.getInputStream());
+        try {
+            process.waitFor();
+        } catch (InterruptedException e) {
+            LOG.info("{} interrupted.", logPreFix);
+        }
+        ret = process.exitValue();
+        return ret;
+    }
+
+    static Process processLauncher(Map<String, Object> conf, String user, List<String> commandPrefix, List<String> args, Map<String, String> environment, final String logPreFix,
+                                          final ExitCodeCallback exitCodeCallback, File dir) throws IOException {
+        if (StringUtils.isBlank(user)) {
+            throw new IllegalArgumentException("User cannot be blank when calling processLauncher.");
+        }
+        String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+        String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+        String wl;
+        if (StringUtils.isNotBlank(wlinitial)) {
+            wl = wlinitial;
+        } else {
+            wl = stormHome + "/bin/worker-launcher";
+        }
+        List<String> commands = new ArrayList<>();
+        if (commandPrefix != null){
+            commands.addAll(commandPrefix);
+        }
+        commands.add(wl);
+        commands.add(user);
+        commands.addAll(args);
+        LOG.info("Running as user: {} command: {}", user, commands);
+        return launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
+    }
+
+    /**
+     * Launch a new process as per {@link ProcessBuilder} with a given
+     * callback.
+     * @param command the command to be executed in the new process
+     * @param environment the environment to be applied to the process. Can be
+     *                    null.
+     * @param logPrefix a prefix for log entries from the output of the process.
+     *                  Can be null.
+     * @param exitCodeCallback code to be called passing the exit code value
+     *                         when the process completes
+     * @param dir the working directory of the new process
+     * @return the new process
+     * @throws IOException
+     * @see ProcessBuilder
+     */
+    public static Process launchProcess(List<String> command,
+                                        Map<String,String> environment,
+                                        final String logPrefix,
+                                        final ExitCodeCallback exitCodeCallback,
+                                        File dir)
+            throws IOException {
+        ProcessBuilder builder = new ProcessBuilder(command);
+        Map<String,String> procEnv = builder.environment();
+        if (dir != null) {
+            builder.directory(dir);
+        }
+        builder.redirectErrorStream(true);
+        if (environment != null) {
+            procEnv.putAll(environment);
+        }
+        final Process process = builder.start();
+        if (logPrefix != null || exitCodeCallback != null) {
+            Utils.asyncLoop(new Callable<Object>() {
+                public Object call() {
+                    if (logPrefix != null ) {
+                        Utils.readAndLogStream(logPrefix,
+                                process.getInputStream());
+                    }
+                    if (exitCodeCallback != null) {
+                        try {
+                            process.waitFor();
+                            exitCodeCallback.call(process.exitValue());
+                        } catch (InterruptedException ie) {
+                            LOG.info("{} interrupted", logPrefix);
+                            exitCodeCallback.call(-1);
+                        }
+                    }
+                    return null; // Run only once.
+                }
+            });
+        }
+        return process;
+    }
+
+    public static void setupStormCodeDir(Map<String, Object> conf, Map<String, Object> stormConf, String dir) throws IOException {
+        if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            String logPrefix = "Storm Code Dir Setup for " + dir;
+            List<String> commands = new ArrayList<>();
+            commands.add("code-dir");
+            commands.add(dir);
+            processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+        }
+    }
+
+    public static void setupWorkerArtifactsDir(Map<String, Object> conf, Map<String, Object> stormConf, String dir) throws IOException {
+        if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            String logPrefix = "Worker Artifacts Setup for " + dir;
+            List<String> commands = new ArrayList<>();
+            commands.add("artifacts-dir");
+            commands.add(dir);
+            processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
new file mode 100644
index 0000000..082f205
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+/**
+ * A callback that can accept an integer.
+ */
+public interface ExitCodeCallback {
+    
+    /**
+     * The process finished 
+     * @param exitCode the exit code of the finished process.
+     */
+    public void call(int exitCode);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
new file mode 100644
index 0000000..e5f5db0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.util.Map;
+
+public interface IAdvancedFSOps {
+
+    /**
+     * Set directory permissions to (OWNER)RWX (GROUP)R-X (OTHER)---
+     * On some systems that do not support this, it may become a noop
+     * @param dir the directory to change permissions on
+     * @throws IOException on any error
+     */
+    void restrictDirectoryPermissions(File dir) throws IOException;
+
+    /**
+     * Move fromDir to toDir, and try to make it an atomic move if possible
+     * @param fromDir what to move
+     * @param toDir where to move it from
+     * @throws IOException on any error
+     */
+    void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException;
+
+    /**
+     * @return true if an atomic directory move works, else false.
+     */
+    boolean supportsAtomicDirectoryMove();
+
+    /**
+     * Copy a directory
+     * @param fromDir from where
+     * @param toDir to where
+     * @throws IOException on any error
+     */
+    void copyDirectory(File fromDir, File toDir) throws IOException;
+
+    /**
+     * Setup permissions properly for an internal blob store path
+     * @param path the path to set the permissions on
+     * @param user the user to change the permissions for
+     * @throws IOException on any error
+     */
+    void setupBlobPermissions(File path, String user) throws IOException;
+
+    /**
+     * Delete a file or a directory and all of the children. If it exists.
+     * @param path what to delete
+     * @param user who to delete it as if doing it as someone else is supported
+     * @param logPrefix if an external process needs to be launched to delete
+     * the object what prefix to include in the logs
+     * @throws IOException on any error.
+     */
+    void deleteIfExists(File path, String user, String logPrefix) throws IOException;
+
+    /**
+     * Delete a file or a directory and all of the children. If it exists.
+     * @param path what to delete
+     * @throws IOException on any error.
+     */
+    void deleteIfExists(File path) throws IOException;
+
+    /**
+     * Setup the permissions for the storm code dir
+     * @param topologyConf the config of the Topology
+     * @param path the directory to set the permissions on
+     * @throws IOException on any error
+     */
+    void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException;
+
+    /**
+     * Setup the permissions for the worker artifacts dirs
+     * @param topologyConf the config of the Topology
+     * @param path the directory to set the permissions on
+     * @throws IOException on any error
+     */
+    void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException;
+
+    /**
+     * Sanity check if everything the topology needs is there for it to run.
+     * @param conf the config of the supervisor
+     * @param topologyId the ID of the topology
+     * @return true if everything is there, else false
+     * @throws IOException on any error
+     */
+    boolean doRequiredTopoFilesExist(Map<String, Object> conf, String topologyId) throws IOException;
+
+    /**
+     * Makes a directory, including any necessary but nonexistent parent
+     * directories.
+     *
+     * @param path the directory to create
+     * @throws IOException on any error
+     */
+    void forceMkdir(File path) throws IOException;
+
+    /**
+     * Check if a file exists or not
+     * @param path the path to check
+     * @return true if it exists else false
+     * @throws IOException on any error.
+     */
+    boolean fileExists(File path) throws IOException;
+
+    /**
+     * Get a writer for the given location
+     * @param file the file to write to
+     * @return the Writer to use.
+     * @throws IOException on any error
+     */
+    Writer getWriter(File file) throws IOException;
+
+    /**
+     * Get an output stream to write to a given file
+     * @param file the file to write to
+     * @return an OutputStream for that file
+     * @throws IOException on any error
+     */
+    OutputStream getOutputStream(File file) throws IOException;
+
+    /**
+     * Dump a string to a file
+     * @param location where to write to
+     * @param data the data to write
+     * @throws IOException on any error
+     */
+    void dump(File location, String data) throws IOException;
+
+    /**
+     * Read the contents of a file into a String
+     * @param location the file to read
+     * @return the contents of the file
+     * @throws IOException on any error
+     */
+    String slurpString(File location) throws IOException;
+
+    /**
+     * Read the contents of a file into a byte array.
+     * @param localtion the file to read
+     * @return the contents of the file
+     * @throws IOException on any error
+     */
+    byte[] slurp(File location) throws IOException;
+
+    /**
+     * Create a symbolic link pointing at target
+     * @param link the link to create
+     * @param target where it should point to
+     * @throws IOException on any error.
+     */
+    void createSymlink(File link, File target) throws IOException;
+}