You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:30 UTC
[41/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
new file mode 100644
index 0000000..372df1f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.Thrift;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.metric.EventLoggerBolt;
+import org.apache.storm.metric.MetricsConsumerBolt;
+import org.apache.storm.metric.SystemBolt;
+import org.apache.storm.metric.filter.FilterByMetricName;
+import org.apache.storm.metric.util.DataPointExpander;
+import org.apache.storm.security.auth.IAuthorizer;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ThriftTopologyUtils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class StormCommon {
+ // A singleton instance allows us to mock delegated static methods in our
+ // tests by subclassing.
+ private static StormCommon _instance = new StormCommon();
+
+ /**
+ * Provide an instance of this class for delegates to use. To mock out
+ * delegated methods, provide an instance of a subclass that overrides the
+ * implementation of the delegated method.
+ *
+ * @param common a StormCommon instance
+ * @return the previously set instance
+ */
+ public static StormCommon setInstance(StormCommon common) {
+ StormCommon oldInstance = _instance;
+ _instance = common;
+ return oldInstance;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(StormCommon.class);
+
+ public static final String SYSTEM_STREAM_ID = "__system";
+
+ public static final String EVENTLOGGER_COMPONENT_ID = "__eventlogger";
+ public static final String EVENTLOGGER_STREAM_ID = "__eventlog";
+
+ public static final String TOPOLOGY_METRICS_CONSUMER_CLASS = "class";
+ public static final String TOPOLOGY_METRICS_CONSUMER_ARGUMENT = "argument";
+ public static final String TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES = "max.retain.metric.tuples";
+ public static final String TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT = "parallelism.hint";
+ public static final String TOPOLOGY_METRICS_CONSUMER_WHITELIST = "whitelist";
+ public static final String TOPOLOGY_METRICS_CONSUMER_BLACKLIST = "blacklist";
+ public static final String TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE = "expandMapType";
+ public static final String TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR = "metricNameSeparator";
+
+ @Deprecated
+ public static String getStormId(final IStormClusterState stormClusterState, final String topologyName) {
+ return stormClusterState.getTopoId(topologyName).get();
+ }
+
+ public static void validateDistributedMode(Map conf) {
+ if (ConfigUtils.isLocalMode(conf)) {
+ throw new IllegalArgumentException("Cannot start server in local mode!");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void validateIds(StormTopology topology) throws InvalidTopologyException {
+ List<String> componentIds = new ArrayList<>();
+
+ for (StormTopology._Fields field : Thrift.getTopologyFields()) {
+ if (!ThriftTopologyUtils.isWorkerHook(field) && !ThriftTopologyUtils.isDependencies(field)) {
+ Object value = topology.getFieldValue(field);
+ Map<String, Object> componentMap = (Map<String, Object>) value;
+ componentIds.addAll(componentMap.keySet());
+
+ for (String id : componentMap.keySet()) {
+ if (Utils.isSystemId(id)) {
+ throw new InvalidTopologyException(id + " is not a valid component id.");
+ }
+ }
+ for (Object componentObj : componentMap.values()) {
+ ComponentCommon common = getComponentCommon(componentObj);
+ Set<String> streamIds = common.get_streams().keySet();
+ for (String id : streamIds) {
+ if (Utils.isSystemId(id)) {
+ throw new InvalidTopologyException(id + " is not a valid stream id.");
+ }
+ }
+ }
+ }
+ }
+
+ List<String> offending = Utils.getRepeat(componentIds);
+ if (!offending.isEmpty()) {
+ throw new InvalidTopologyException("Duplicate component ids: " + offending);
+ }
+ }
+
+ private static boolean isEmptyInputs(ComponentCommon common) {
+ if (common.get_inputs() == null) {
+ return true;
+ } else {
+ return common.get_inputs().isEmpty();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> allComponents(StormTopology topology) {
+ Map<String, Object> components = new HashMap<>();
+ List<StormTopology._Fields> topologyFields = Arrays.asList(Thrift.getTopologyFields());
+ for (StormTopology._Fields field : topologyFields) {
+ if (!ThriftTopologyUtils.isWorkerHook(field) && !ThriftTopologyUtils.isDependencies(field)) {
+ components.putAll(((Map) topology.getFieldValue(field)));
+ }
+ }
+ return components;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> componentConf(Object component) {
+ try {
+ Map<String, Object> conf = new HashMap<>();
+ ComponentCommon common = getComponentCommon(component);
+ String jconf = common.get_json_conf();
+ if (jconf != null) {
+ conf.putAll((Map<String, Object>) JSONValue.parseWithException(jconf));
+ }
+ return conf;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void validateBasic(StormTopology topology) throws InvalidTopologyException {
+ validateIds(topology);
+
+ for (StormTopology._Fields field : Thrift.getSpoutFields()) {
+ Map<String, Object> spoutComponents = (Map<String, Object>) topology.getFieldValue(field);
+ if (spoutComponents != null) {
+ for (Object obj : spoutComponents.values()) {
+ ComponentCommon common = getComponentCommon(obj);
+ if (!isEmptyInputs(common)) {
+ throw new InvalidTopologyException("May not declare inputs for a spout");
+ }
+ }
+ }
+ }
+
+ Map<String, Object> componentMap = allComponents(topology);
+ for (Object componentObj : componentMap.values()) {
+ Map conf = componentConf(componentObj);
+ ComponentCommon common = getComponentCommon(componentObj);
+ int parallelismHintNum = Thrift.getParallelismHint(common);
+ Integer taskNum = ObjectReader.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
+ if (taskNum > 0 && parallelismHintNum <= 0) {
+ throw new InvalidTopologyException("Number of executors must be greater than 0 when number of tasks is greater than 0");
+ }
+ }
+ }
+
+ private static Set<String> getStreamOutputFields(Map<String, StreamInfo> streams) {
+ Set<String> outputFields = new HashSet<>();
+ for (StreamInfo streamInfo : streams.values()) {
+ outputFields.addAll(streamInfo.get_output_fields());
+ }
+ return outputFields;
+ }
+
+ public static void validateStructure(StormTopology topology) throws InvalidTopologyException {
+ Map<String, Object> componentMap = allComponents(topology);
+ for (Map.Entry<String, Object> entry : componentMap.entrySet()) {
+ String componentId = entry.getKey();
+ ComponentCommon common = getComponentCommon(entry.getValue());
+ Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
+ for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
+ String sourceStreamId = input.getKey().get_streamId();
+ String sourceComponentId = input.getKey().get_componentId();
+ if (!componentMap.keySet().contains(sourceComponentId)) {
+ throw new InvalidTopologyException("Component: [" + componentId +
+ "] subscribes from non-existent component [" + sourceComponentId + "]");
+ }
+
+ ComponentCommon sourceComponent = getComponentCommon(componentMap.get(sourceComponentId));
+ if (!sourceComponent.get_streams().containsKey(sourceStreamId)) {
+ throw new InvalidTopologyException("Component: [" + componentId +
+ "] subscribes from non-existent stream: " +
+ "[" + sourceStreamId + "] of component [" + sourceComponentId + "]");
+ }
+
+ Grouping grouping = input.getValue();
+ if (Thrift.groupingType(grouping) == Grouping._Fields.FIELDS) {
+ List<String> fields = new ArrayList<>(grouping.get_fields());
+ Map<String, StreamInfo> streams = sourceComponent.get_streams();
+ Set<String> sourceOutputFields = getStreamOutputFields(streams);
+ fields.removeAll(sourceOutputFields);
+ if (fields.size() != 0) {
+ throw new InvalidTopologyException("Component: [" + componentId +
+ "] subscribes from stream: [" + sourceStreamId + "] of component " +
+ "[" + sourceComponentId + "] + with non-existent fields: " + fields);
+ }
+ }
+ }
+ }
+ }
+
+ public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
+ Set<String> boltIds = topology.get_bolts().keySet();
+ Set<String> spoutIds = topology.get_spouts().keySet();
+
+ for (String id : spoutIds) {
+ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID),
+ Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ }
+
+ for (String id : boltIds) {
+ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID),
+ Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID),
+ Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
+ Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ }
+ return inputs;
+ }
+
+ public static IBolt makeAckerBolt() {
+ return _instance.makeAckerBoltImpl();
+ }
+
+ public IBolt makeAckerBoltImpl() {
+ return new Acker();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void addAcker(Map conf, StormTopology topology) {
+ int ackerNum = ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
+
+ Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
+ outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
+ outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
+ outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
+
+ Map<String, Object> ackerConf = new HashMap<>();
+ ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
+ ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+
+ Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);
+
+ for (Bolt bolt : topology.get_bolts().values()) {
+ ComponentCommon common = bolt.get_common();
+ common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
+ common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
+ common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
+ }
+
+ for (SpoutSpec spout : topology.get_spouts().values()) {
+ ComponentCommon common = spout.get_common();
+ Map spoutConf = componentConf(spout);
+ spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
+ ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+ common.set_json_conf(JSONValue.toJSONString(spoutConf));
+ common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
+ Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
+ common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
+ Thrift.prepareDirectGrouping());
+ common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
+ Thrift.prepareDirectGrouping());
+ common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
+ Thrift.prepareDirectGrouping());
+ }
+
+ topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
+ }
+
+ public static ComponentCommon getComponentCommon(Object component) {
+ ComponentCommon common = null;
+ if (component instanceof StateSpoutSpec) {
+ common = ((StateSpoutSpec) component).get_common();
+ } else if (component instanceof SpoutSpec) {
+ common = ((SpoutSpec) component).get_common();
+ } else if (component instanceof Bolt) {
+ common = ((Bolt) component).get_common();
+ }
+ return common;
+ }
+
+ public static void addMetricStreams(StormTopology topology) {
+ for (Object component : allComponents(topology).values()) {
+ ComponentCommon common = getComponentCommon(component);
+ StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
+ common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
+ }
+ }
+
+ public static void addSystemStreams(StormTopology topology) {
+ for (Object component : allComponents(topology).values()) {
+ ComponentCommon common = getComponentCommon(component);
+ StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("event"));
+ common.put_to_streams(SYSTEM_STREAM_ID, streamInfo);
+ }
+ }
+
+ public static List<String> eventLoggerBoltFields() {
+ return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID,
+ EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES);
+ }
+
+ public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+ Set<String> allIds = new HashSet<String>();
+ allIds.addAll(topology.get_bolts().keySet());
+ allIds.addAll(topology.get_spouts().keySet());
+
+ for (String id : allIds) {
+ inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
+ Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
+ }
+ return inputs;
+ }
+
+ public static void addEventLogger(Map conf, StormTopology topology) {
+ Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
+ ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ HashMap<String, Object> componentConf = new HashMap<>();
+ componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
+ componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+ Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(
+ eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
+
+ for (Object component : allComponents(topology).values()) {
+ ComponentCommon common = getComponentCommon(component);
+ common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
+ }
+ topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Map<String, Bolt> metricsConsumerBoltSpecs(Map conf, StormTopology topology) {
+ Map<String, Bolt> metricsConsumerBolts = new HashMap<>();
+
+ Set<String> componentIdsEmitMetrics = new HashSet<>();
+ componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
+ componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);
+
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
+ for (String componentId : componentIdsEmitMetrics) {
+ inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
+ }
+
+ List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
+ if (registerInfo != null) {
+ Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
+ for (Map<String, Object> info : registerInfo) {
+ String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);
+ Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);
+ Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(
+ TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);
+ Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);
+ Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
+ metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
+ List<String> whitelist = (List<String>) info.get(
+ TOPOLOGY_METRICS_CONSUMER_WHITELIST);
+ List<String> blacklist = (List<String>) info.get(
+ TOPOLOGY_METRICS_CONSUMER_BLACKLIST);
+ FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);
+ Boolean expandMapType = ObjectReader.getBoolean(info.get(
+ TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE), false);
+ String metricNameSeparator = ObjectReader.getString(info.get(
+ TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR), ".");
+ DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator);
+ MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument,
+ maxRetainMetricTuples, filterPredicate, expander);
+ Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
+ boltInstance, null, phintNum, metricsConsumerConf);
+
+ String id = className;
+ if (classOccurrencesMap.containsKey(className)) {
+ // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
+ int occurrenceNum = classOccurrencesMap.get(className);
+ occurrenceNum++;
+ classOccurrencesMap.put(className, occurrenceNum);
+ id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;
+ } else {
+ classOccurrencesMap.put(className, 1);
+ }
+ metricsConsumerBolts.put(id, metricsConsumerBolt);
+ }
+ }
+ return metricsConsumerBolts;
+ }
+
+ public static void addMetricComponents(Map conf, StormTopology topology) {
+ Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);
+ for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
+ topology.put_to_bolts(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public static void addSystemComponents(Map conf, StormTopology topology) {
+ Map<String, StreamInfo> outputStreams = new HashMap<>();
+ outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
+ outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
+ outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(Arrays.asList("creds")));
+
+ Map<String, Object> boltConf = new HashMap<>();
+ boltConf.put(Config.TOPOLOGY_TASKS, 0);
+
+ Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null, new SystemBolt(), outputStreams, 0, boltConf);
+ topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, systemBoltSpec);
+ }
+
+ public static StormTopology systemTopology(Map stormConf, StormTopology topology) throws InvalidTopologyException {
+ return _instance.systemTopologyImpl(stormConf, topology);
+ }
+
+ protected StormTopology systemTopologyImpl(Map stormConf, StormTopology topology) throws InvalidTopologyException {
+ validateBasic(topology);
+
+ StormTopology ret = topology.deepCopy();
+ addAcker(stormConf, ret);
+ if (hasEventLoggers(stormConf)) {
+ addEventLogger(stormConf, ret);
+ }
+ addMetricComponents(stormConf, ret);
+ addSystemComponents(stormConf, ret);
+ addMetricStreams(ret);
+ addSystemStreams(ret);
+
+ validateStructure(ret);
+
+ return ret;
+ }
+
+ public static boolean hasAckers(Map stormConf) {
+ Object ackerNum = stormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+ return ackerNum == null || ObjectReader.getInt(ackerNum) > 0;
+ }
+
+ public static boolean hasEventLoggers(Map stormConf) {
+ Object eventLoggerNum = stormConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS);
+ return eventLoggerNum == null || ObjectReader.getInt(eventLoggerNum) > 0;
+ }
+
+ public static int numStartExecutors(Object component) throws InvalidTopologyException {
+ ComponentCommon common = getComponentCommon(component);
+ return Thrift.getParallelismHint(common);
+ }
+
+ public static Map<Integer, String> stormTaskInfo(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
+ return _instance.stormTaskInfoImpl(userTopology, stormConf);
+ }
+
+ /*
+ * Returns map from task -> componentId
+ */
+ protected Map<Integer, String> stormTaskInfoImpl(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
+ Map<Integer, String> taskIdToComponentId = new HashMap<>();
+
+ StormTopology systemTopology = systemTopology(stormConf, userTopology);
+ Map<String, Object> components = allComponents(systemTopology);
+ Map<String, Integer> componentIdToTaskNum = new TreeMap<>();
+ for (Map.Entry<String, Object> entry : components.entrySet()) {
+ Map conf = componentConf(entry.getValue());
+ Object taskNum = conf.get(Config.TOPOLOGY_TASKS);
+ componentIdToTaskNum.put(entry.getKey(), ObjectReader.getInt(taskNum));
+ }
+
+ int taskId = 1;
+ for (Map.Entry<String, Integer> entry : componentIdToTaskNum.entrySet()) {
+ String componentId = entry.getKey();
+ Integer taskNum = entry.getValue();
+ while (taskNum > 0) {
+ taskIdToComponentId.put(taskId, componentId);
+ taskNum--;
+ taskId++;
+ }
+ }
+ return taskIdToComponentId;
+ }
+
+ public static List<Integer> executorIdToTasks(List<Long> executorId) {
+ List<Integer> taskIds = new ArrayList<>();
+ int taskId = executorId.get(0).intValue();
+ while (taskId <= executorId.get(1).intValue()) {
+ taskIds.add(taskId);
+ taskId++;
+ }
+ return taskIds;
+ }
+
+ public static Map<Integer, NodeInfo> taskToNodeport(Map<List<Long>, NodeInfo> executorToNodePort) {
+ Map<Integer, NodeInfo> tasksToNodePort = new HashMap<>();
+ for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
+ List<Integer> taskIds = executorIdToTasks(entry.getKey());
+ for (Integer taskId : taskIds) {
+ tasksToNodePort.put(taskId, entry.getValue());
+ }
+ }
+ return tasksToNodePort;
+ }
+
+ public static IAuthorizer mkAuthorizationHandler(String klassName, Map conf)
+ throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ return _instance.mkAuthorizationHandlerImpl(klassName, conf);
+ }
+
+ protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ IAuthorizer aznHandler = null;
+ if (StringUtils.isNotBlank(klassName)) {
+ Class<?> aznClass = Class.forName(klassName);
+ if (aznClass != null) {
+ aznHandler = (IAuthorizer) aznClass.newInstance();
+ if (aznHandler != null) {
+ aznHandler.prepare(conf);
+ }
+ LOG.debug("authorization class name:{}, class:{}, handler:{}", klassName, aznClass, aznHandler);
+ }
+ }
+
+ return aznHandler;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static WorkerTopologyContext makeWorkerContext(Map<String, Object> workerData) {
+ try {
+ StormTopology stormTopology = (StormTopology) workerData.get(Constants.SYSTEM_TOPOLOGY);
+ Map stormConf = (Map) workerData.get(Constants.STORM_CONF);
+ Map<Integer, String> taskToComponent = (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT);
+ Map<String, List<Integer>> componentToSortedTasks =
+ (Map<String, List<Integer>>) workerData.get(Constants.COMPONENT_TO_SORTED_TASKS);
+ Map<String, Map<String, Fields>> componentToStreamToFields =
+ (Map<String, Map<String, Fields>>) workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS);
+ String stormId = (String) workerData.get(Constants.STORM_ID);
+ Map conf = (Map) workerData.get(Constants.CONF);
+ Integer port = (Integer) workerData.get(Constants.PORT);
+ String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, stormId));
+ String pidDir = ConfigUtils.workerPidsRoot(conf, stormId);
+ List<Integer> workerTasks = (List<Integer>) workerData.get(Constants.TASK_IDS);
+ Map<String, Object> defaultResources = (Map<String, Object>) workerData.get(Constants.DEFAULT_SHARED_RESOURCES);
+ Map<String, Object> userResources = (Map<String, Object>) workerData.get(Constants.USER_SHARED_RESOURCES);
+ return new WorkerTopologyContext(stormTopology, stormConf, taskToComponent, componentToSortedTasks,
+ componentToStreamToFields, stormId, codeDir, pidDir, port, workerTasks, defaultResources, userResources);
+ } catch (IOException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
new file mode 100644
index 0000000..6316564
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.Config;
+import org.apache.storm.Thrift;
+import org.apache.storm.daemon.metrics.BuiltinMetrics;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.ShellComponent;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class Task {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+ private Executor executor;
+ private WorkerState workerData;
+ private TopologyContext systemTopologyContext;
+ private TopologyContext userTopologyContext;
+ private WorkerTopologyContext workerTopologyContext;
+ private LoadMapping loadMapping;
+ private Integer taskId;
+ private String componentId;
+ private Object taskObject; // Spout/Bolt object
+ private Map stormConf;
+ private Callable<Boolean> emitSampler;
+ private CommonStats executorStats;
+ private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
+ private BuiltinMetrics builtInMetrics;
+ private boolean debug;
+
+ public Task(Executor executor, Integer taskId) throws IOException {
+ this.taskId = taskId;
+ this.executor = executor;
+ this.workerData = executor.getWorkerData();
+ this.stormConf = executor.getStormConf();
+ this.componentId = executor.getComponentId();
+ this.streamComponentToGrouper = executor.getStreamToComponentToGrouper();
+ this.executorStats = executor.getStats();
+ this.builtInMetrics = BuiltinMetricsUtil.mkData(executor.getType(), this.executorStats);
+ this.workerTopologyContext = executor.getWorkerTopologyContext();
+ this.emitSampler = ConfigUtils.mkStatsSampler(stormConf);
+ this.loadMapping = workerData.getLoadMapping();
+ this.systemTopologyContext = mkTopologyContext(workerData.getSystemTopology());
+ this.userTopologyContext = mkTopologyContext(workerData.getTopology());
+ this.taskObject = mkTaskObject();
+ this.debug = stormConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) stormConf.get(Config.TOPOLOGY_DEBUG);
+ this.addTaskHooks();
+ }
+
+ public List<Integer> getOutgoingTasks(Integer outTaskId, String stream, List<Object> values) {
+ if (debug) {
+ LOG.info("Emitting direct: {}; {} {} {} ", outTaskId, componentId, stream, values);
+ }
+ String targetComponent = workerTopologyContext.getComponentId(outTaskId);
+ Map<String, LoadAwareCustomStreamGrouping> componentGrouping = streamComponentToGrouper.get(stream);
+ LoadAwareCustomStreamGrouping grouping = componentGrouping.get(targetComponent);
+ if (null == grouping) {
+ outTaskId = null;
+ }
+ if (grouping != null && grouping != GrouperFactory.DIRECT) {
+ throw new IllegalArgumentException("Cannot emitDirect to a task expecting a regular grouping");
+ }
+ new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
+ try {
+ if (emitSampler.call()) {
+ executorStats.emittedTuple(stream);
+ if (null != outTaskId) {
+ executorStats.transferredTuples(stream, 1);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (null != outTaskId) {
+ return Collections.singletonList(outTaskId);
+ }
+ return new ArrayList<>(0);
+ }
+
+ public List<Integer> getOutgoingTasks(String stream, List<Object> values) {
+ if (debug) {
+ LOG.info("Emitting: {} {} {}", componentId, stream, values);
+ }
+
+ List<Integer> outTasks = new ArrayList<>();
+ if (!streamComponentToGrouper.containsKey(stream)) {
+ throw new IllegalArgumentException("Unknown stream ID: " + stream);
+ }
+ if (null != streamComponentToGrouper.get(stream)) {
+ // null value for __system
+ for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) {
+ if (grouper == GrouperFactory.DIRECT) {
+ throw new IllegalArgumentException("Cannot do regular emit to direct stream");
+ }
+ List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping);
+ outTasks.addAll(compTasks);
+ }
+ }
+ new EmitInfo(values, stream, taskId, outTasks).applyOn(userTopologyContext);
+ try {
+ if (emitSampler.call()) {
+ executorStats.emittedTuple(stream);
+ executorStats.transferredTuples(stream, outTasks.size());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return outTasks;
+ }
+
+ public Tuple getTuple(String stream, List values) {
+ return new TupleImpl(systemTopologyContext, values, systemTopologyContext.getThisTaskId(), stream);
+ }
+
+ public Integer getTaskId() {
+ return taskId;
+ }
+
+ public String getComponentId() {
+ return componentId;
+ }
+
+ public TopologyContext getUserContext() {
+ return userTopologyContext;
+ }
+
+ public Object getTaskObject() {
+ return taskObject;
+ }
+
+ public BuiltinMetrics getBuiltInMetrics() {
+ return builtInMetrics;
+ }
+
+ private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
+ Map conf = workerData.getConf();
+ return new TopologyContext(
+ topology,
+ workerData.getTopologyConf(),
+ workerData.getTaskToComponent(),
+ workerData.getComponentToSortedTasks(),
+ workerData.getComponentToStreamToFields(),
+ workerData.getTopologyId(),
+ ConfigUtils.supervisorStormResourcesPath(
+ ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),
+ ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
+ taskId,
+ workerData.getPort(), workerData.getTaskIds(),
+ workerData.getDefaultSharedResources(),
+ workerData.getUserSharedResources(),
+ executor.getSharedExecutorData(),
+ executor.getIntervalToTaskToMetricToRegistry(),
+ executor.getOpenOrPrepareWasCalled());
+ }
+
+ private Object mkTaskObject() {
+ StormTopology topology = systemTopologyContext.getRawTopology();
+ Map<String, SpoutSpec> spouts = topology.get_spouts();
+ Map<String, Bolt> bolts = topology.get_bolts();
+ Map<String, StateSpoutSpec> stateSpouts = topology.get_state_spouts();
+ Object result;
+ ComponentObject componentObject;
+ if (spouts.containsKey(componentId)) {
+ componentObject = spouts.get(componentId).get_spout_object();
+ } else if (bolts.containsKey(componentId)) {
+ componentObject = bolts.get(componentId).get_bolt_object();
+ } else if (stateSpouts.containsKey(componentId)) {
+ componentObject = stateSpouts.get(componentId).get_state_spout_object();
+ } else {
+ throw new RuntimeException("Could not find " + componentId + " in " + topology);
+ }
+ result = Utils.getSetComponentObject(componentObject);
+
+ if (result instanceof ShellComponent) {
+ if (spouts.containsKey(componentId)) {
+ result = new ShellSpout((ShellComponent) result);
+ } else {
+ result = new ShellBolt((ShellComponent) result);
+ }
+ }
+
+ if (result instanceof JavaObject) {
+ result = Thrift.instantiateJavaObject((JavaObject) result);
+ }
+
+ return result;
+ }
+
+ private void addTaskHooks() {
+ List<String> hooksClassList = (List<String>) stormConf.get(Config.TOPOLOGY_AUTO_TASK_HOOKS);
+ if (null != hooksClassList) {
+ for (String hookClass : hooksClassList) {
+ try {
+ userTopologyContext.addTaskHook(((ITaskHook) Class.forName(hookClass).newInstance()));
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new RuntimeException("Failed to add hook: " + hookClass, e);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinBoltMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinBoltMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinBoltMetrics.java
new file mode 100644
index 0000000..b9677f6
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinBoltMetrics.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.metric.internal.MultiCountStatAndMetric;
+import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
+import org.apache.storm.stats.BoltExecutorStats;
+
+public class BuiltinBoltMetrics extends BuiltinMetrics {
+ private final MultiCountStatAndMetric ackCount;
+ private final MultiCountStatAndMetric failCount;
+ private final MultiCountStatAndMetric emitCount;
+ private final MultiCountStatAndMetric executeCount;
+ private final MultiCountStatAndMetric transferCount;
+ private final MultiLatencyStatAndMetric executeLatency;
+ private final MultiLatencyStatAndMetric processLatency;
+
+ public BuiltinBoltMetrics(BoltExecutorStats stats) {
+ this.ackCount = stats.getAcked();
+ this.failCount = stats.getFailed();
+ this.emitCount = stats.getEmitted();
+ this.executeCount = stats.getExecuted();
+ this.transferCount = stats.getTransferred();
+ this.executeLatency = stats.getExecuteLatencies();
+ this.processLatency = stats.getProcessLatencies();
+
+ this.metricMap.put("ack-count", ackCount);
+ this.metricMap.put("fail-count", failCount);
+ this.metricMap.put("emit-count", emitCount);
+ this.metricMap.put("transfer-count", transferCount);
+ this.metricMap.put("execute-count", executeCount);
+ this.metricMap.put("process-latency", processLatency);
+ this.metricMap.put("execute-latency", executeLatency);
+ }
+
+ public MultiCountStatAndMetric getAckCount() {
+ return ackCount;
+ }
+
+ public MultiCountStatAndMetric getFailCount() {
+ return failCount;
+ }
+
+ public MultiCountStatAndMetric getEmitCount() {
+ return emitCount;
+ }
+
+ public MultiCountStatAndMetric getTransferCount() {
+ return transferCount;
+ }
+
+ public MultiCountStatAndMetric getExecuteCount() {
+ return executeCount;
+ }
+
+ public MultiLatencyStatAndMetric getExecuteLatency() {
+ return executeLatency;
+ }
+
+ public MultiLatencyStatAndMetric getProcessLatency() {
+ return processLatency;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java
new file mode 100644
index 0000000..0dacad1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.task.TopologyContext;
+
+public abstract class BuiltinMetrics {
+ protected final Map<String, IMetric> metricMap = new HashMap<>();
+
+ public void registerAll(Map stormConf, TopologyContext context) {
+ for (Map.Entry<String, IMetric> entry : metricMap.entrySet()) {
+ BuiltinMetricsUtil.registerMetric("__" + entry.getKey(), entry.getValue(), stormConf, context);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
new file mode 100644
index 0000000..2827420
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.api.StateMetric;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.TopologyContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BuiltinMetricsUtil {
+ public static BuiltinMetrics mkData(String type, CommonStats stats) {
+ if (StatsUtil.SPOUT.equals(type)) {
+ return new BuiltinSpoutMetrics((SpoutExecutorStats) stats);
+ } else if (StatsUtil.BOLT.equals(type)) {
+ return new BuiltinBoltMetrics((BoltExecutorStats) stats);
+ }
+ throw new RuntimeException("Invalid component type!");
+ }
+
+ public static void registerIconnectionServerMetric(Object server, Map stormConf, TopologyContext context) {
+ if (server instanceof IStatefulObject) {
+ registerMetric("__recv-iconnection", new StateMetric((IStatefulObject) server), stormConf, context);
+ }
+ }
+
+ public static void registerIconnectionClientMetrics(final Map nodePortToSocket, Map stormConf, TopologyContext context) {
+ IMetric metric = new IMetric() {
+ @Override
+ public Object getValueAndReset() {
+ Map<Object, Object> ret = new HashMap<>();
+ for (Object o : nodePortToSocket.entrySet()) {
+ Map.Entry entry = (Map.Entry) o;
+ Object nodePort = entry.getKey();
+ Object connection = entry.getValue();
+ if (connection instanceof IStatefulObject) {
+ ret.put(nodePort, ((IStatefulObject) connection).getState());
+ }
+ }
+ return ret;
+ }
+ };
+ registerMetric("__send-iconnection", metric, stormConf, context);
+ }
+
+ public static void registerQueueMetrics(Map queues, Map stormConf, TopologyContext context) {
+ for (Object o : queues.entrySet()) {
+ Map.Entry entry = (Map.Entry) o;
+ String name = "__" + entry.getKey();
+ IMetric metric = new StateMetric((IStatefulObject) entry.getValue());
+ registerMetric(name, metric, stormConf, context);
+ }
+ }
+
+ public static void registerMetric(String name, IMetric metric, Map stormConf, TopologyContext context) {
+ int bucketSize = ((Number) stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)).intValue();
+ context.registerMetric(name, metric, bucketSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinSpoutMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinSpoutMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinSpoutMetrics.java
new file mode 100644
index 0000000..5eef4bb
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinSpoutMetrics.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.metric.internal.MultiCountStatAndMetric;
+import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
+import org.apache.storm.stats.SpoutExecutorStats;
+
+public class BuiltinSpoutMetrics extends BuiltinMetrics {
+ private final MultiCountStatAndMetric ackCount;
+ private final MultiCountStatAndMetric failCount;
+ private final MultiCountStatAndMetric emitCount;
+ private final MultiCountStatAndMetric transferCount;
+ private final MultiLatencyStatAndMetric completeLatency;
+
+ public BuiltinSpoutMetrics(SpoutExecutorStats stats) {
+ this.ackCount = stats.getAcked();
+ this.failCount = stats.getFailed();
+ this.emitCount = stats.getEmitted();
+ this.transferCount = stats.getTransferred();
+ this.completeLatency = stats.getCompleteLatencies();
+
+ this.metricMap.put("ack-count", ackCount);
+ this.metricMap.put("fail-count", failCount);
+ this.metricMap.put("emit-count", emitCount);
+ this.metricMap.put("transfer-count", transferCount);
+ this.metricMap.put("complete-latency", completeLatency);
+ }
+
+ public MultiCountStatAndMetric getAckCount() {
+ return ackCount;
+ }
+
+ public MultiCountStatAndMetric getFailCount() {
+ return failCount;
+ }
+
+ public MultiCountStatAndMetric getEmitCount() {
+ return emitCount;
+ }
+
+ public MultiCountStatAndMetric getTransferCount() {
+ return transferCount;
+ }
+
+ public MultiLatencyStatAndMetric getCompleteLatency() {
+ return completeLatency;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
new file mode 100644
index 0000000..3c7f524
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import org.apache.storm.metric.api.CountMetric;
+import org.apache.storm.stats.CommonStats;
+
+public class SpoutThrottlingMetrics extends BuiltinMetrics {
+ private final CountMetric skippedMaxSpout = new CountMetric();
+ private final CountMetric skippedThrottle = new CountMetric();
+ private final CountMetric skippedInactive = new CountMetric();
+
+ public SpoutThrottlingMetrics() {
+ this.metricMap.put("skipped-max-spout", skippedMaxSpout);
+ this.metricMap.put("skipped-throttle", skippedThrottle);
+ this.metricMap.put("skipped-inactive", skippedInactive);
+ }
+
+ public CountMetric getSkippedMaxSpout() {
+ return skippedMaxSpout;
+ }
+
+ public CountMetric getSkippedThrottle() {
+ return skippedThrottle;
+ }
+
+ public CountMetric getSkippedInactive() {
+ return skippedInactive;
+ }
+
+ public void skippedMaxSpout(CommonStats stats) {
+ this.skippedMaxSpout.incrBy(stats.getRate());
+ }
+
+ public void skippedThrottle(CommonStats stats) {
+ this.skippedThrottle.incrBy(stats.getRate());
+ }
+
+ public void skippedInactive(CommonStats stats) {
+ this.skippedInactive.incrBy(stats.getRate());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
new file mode 100644
index 0000000..ba7f9db
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AdvancedFSOps implements IAdvancedFSOps {
+ private static final Logger LOG = LoggerFactory.getLogger(AdvancedFSOps.class);
+
+ /**
+ * Factory to create a new AdvancedFSOps
+ * @param conf the configuration of the process
+ * @return the appropriate instance of the class for this config and environment.
+ */
+ public static AdvancedFSOps make(Map<String, Object> conf) {
+ if (Utils.isOnWindows()) {
+ return new AdvancedWindowsFSOps(conf);
+ }
+ if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ return new AdvancedRunAsUserFSOps(conf);
+ }
+ return new AdvancedFSOps(conf);
+ }
+
+ private static class AdvancedRunAsUserFSOps extends AdvancedFSOps {
+ private final Map<String, Object> _conf;
+
+ public AdvancedRunAsUserFSOps(Map<String, Object> conf) {
+ super(conf);
+ if (Utils.isOnWindows()) {
+ throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
+ }
+ _conf = conf;
+ }
+
+ @Override
+ public void setupBlobPermissions(File path, String user) throws IOException {
+ String logPrefix = "setup blob permissions for " + path;
+ ClientSupervisorUtils.processLauncherAndWait(_conf, user, Arrays.asList("blob", path.toString()), null, logPrefix);
+ }
+
+ @Override
+ public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
+ String absolutePath = path.getAbsolutePath();
+ LOG.info("Deleting path {}", absolutePath);
+ if (user == null) {
+ user = Files.getOwner(path.toPath()).getName();
+ }
+ List<String> commands = new ArrayList<>();
+ commands.add("rmr");
+ commands.add(absolutePath);
+ ClientSupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix);
+
+ if (Utils.checkFileExists(absolutePath)) {
+ // It's possible that permissions were not set properly on the directory, and
+ // the user who is *supposed* to own the dir does not. In this case, try the
+ // delete as the supervisor user.
+ Utils.forceDelete(absolutePath);
+ if (Utils.checkFileExists(absolutePath)) {
+ throw new RuntimeException(path + " was not deleted.");
+ }
+ }
+ }
+
+ @Override
+ public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
+ ClientSupervisorUtils.setupStormCodeDir(_conf, topologyConf, path.getCanonicalPath());
+ }
+
+ @Override
+ public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException {
+ ClientSupervisorUtils.setupWorkerArtifactsDir(_conf, topologyConf, path.getCanonicalPath());
+ }
+ }
+
+ /**
+ * Operations that need to override the default ones when running on Windows
+ *
+ */
+ private static class AdvancedWindowsFSOps extends AdvancedFSOps {
+
+ public AdvancedWindowsFSOps(Map<String, Object> conf) {
+ super(conf);
+ if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ throw new RuntimeException("ERROR: Windows doesn't support running workers as different users yet");
+ }
+ }
+
+ @Override
+ public void restrictDirectoryPermissions(File dir) throws IOException {
+ //NOOP, if windows gets support for run as user we will need to find a way to support this
+ }
+
+ @Override
+ public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException {
+ // Files/move with non-empty directory doesn't work well on Windows
+ // This is not atomic but it does work
+ FileUtils.moveDirectory(fromDir, toDir);
+ }
+
+ @Override
+ public boolean supportsAtomicDirectoryMove() {
+ // Files/move with non-empty directory doesn't work well on Windows
+ // FileUtils.moveDirectory is not atomic
+ return false;
+ }
+ }
+
+ protected final boolean _symlinksDisabled;
+
+ protected AdvancedFSOps(Map<String, Object> conf) {
+ _symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
+ }
+
+ /**
+ * Set directory permissions to (OWNER)RWX (GROUP)R-X (OTHER)---
+ * On some systems that do not support this, it may become a noop
+ * @param dir the directory to change permissions on
+ * @throws IOException on any error
+ */
+ public void restrictDirectoryPermissions(File dir) throws IOException {
+ Set<PosixFilePermission> perms = new HashSet<>(
+ Arrays.asList(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE,
+ PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ,
+ PosixFilePermission.GROUP_EXECUTE));
+ Files.setPosixFilePermissions(dir.toPath(), perms);
+ }
+
+ /**
+ * Move fromDir to toDir, and try to make it an atomic move if possible
+ * @param fromDir what to move
+ * @param toDir where to move it from
+ * @throws IOException on any error
+ */
+ public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException {
+ FileUtils.forceMkdir(toDir);
+ Files.move(fromDir.toPath(), toDir.toPath(), StandardCopyOption.ATOMIC_MOVE);
+ }
+
+ /**
+ * @return true if an atomic directory move works, else false.
+ */
+ public boolean supportsAtomicDirectoryMove() {
+ return true;
+ }
+
+ /**
+ * Copy a directory
+ * @param fromDir from where
+ * @param toDir to where
+ * @throws IOException on any error
+ */
+ public void copyDirectory(File fromDir, File toDir) throws IOException {
+ FileUtils.copyDirectory(fromDir, toDir);
+ }
+
+ /**
+ * Setup permissions properly for an internal blob store path
+ * @param path the path to set the permissions on
+ * @param user the user to change the permissions for
+ * @throws IOException on any error
+ */
+ public void setupBlobPermissions(File path, String user) throws IOException {
+ //Normally this is a NOOP
+ }
+
+ /**
+ * Delete a file or a directory and all of the children. If it exists.
+ * @param path what to delete
+ * @param user who to delete it as if doing it as someone else is supported
+ * @param logPrefix if an external process needs to be launched to delete
+ * the object what prefix to include in the logs
+ * @throws IOException on any error.
+ */
+ public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
+ //by default no need to do this as a different user
+ deleteIfExists(path);
+ }
+
+ /**
+ * Delete a file or a directory and all of the children. If it exists.
+ * @param path what to delete
+ * @throws IOException on any error.
+ */
+ public void deleteIfExists(File path) throws IOException {
+ LOG.info("Deleting path {}", path);
+ Path p = path.toPath();
+ if (Files.exists(p)) {
+ try {
+ FileUtils.forceDelete(path);
+ } catch (FileNotFoundException ignored) {}
+ }
+ }
+
+ /**
+ * Setup the permissions for the storm code dir
+ * @param topologyConf the config of the Topology
+ * @param path the directory to set the permissions on
+ * @throws IOException on any error
+ */
+ public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
+ //By default this is a NOOP
+ }
+
+ /**
+ * Setup the permissions for the worker artifacts dirs
+ * @param topologyConf the config of the Topology
+ * @param path the directory to set the permissions on
+ * @throws IOException on any error
+ */
+ public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException {
+ //By default this is a NOOP
+ }
+
+ /**
+ * Sanity check if everything the topology needs is there for it to run.
+ * @param conf the config of the supervisor
+ * @param topologyId the ID of the topology
+ * @return true if everything is there, else false
+ * @throws IOException on any error
+ */
+ public boolean doRequiredTopoFilesExist(Map<String, Object> conf, String topologyId) throws IOException {
+ return ClientSupervisorUtils.doRequiredTopoFilesExist(conf, topologyId);
+ }
+
+ /**
+ * Makes a directory, including any necessary but nonexistent parent
+ * directories.
+ *
+ * @param path the directory to create
+ * @throws IOException on any error
+ */
+ public void forceMkdir(File path) throws IOException {
+ FileUtils.forceMkdir(path);
+ }
+
+ /**
+ * Check if a file exists or not
+ * @param path the path to check
+ * @return true if it exists else false
+ * @throws IOException on any error.
+ */
+ public boolean fileExists(File path) throws IOException {
+ return path.exists();
+ }
+
+ /**
+ * Get a writer for the given location
+ * @param file the file to write to
+ * @return the Writer to use.
+ * @throws IOException on any error
+ */
+ public Writer getWriter(File file) throws IOException {
+ return new FileWriter(file);
+ }
+
+ /**
+ * Get an output stream to write to a given file
+ * @param file the file to write to
+ * @return an OutputStream for that file
+ * @throws IOException on any error
+ */
+ public OutputStream getOutputStream(File file) throws IOException {
+ return new FileOutputStream(file);
+ }
+
+ /**
+ * Dump a string to a file
+ * @param location where to write to
+ * @param data the data to write
+ * @throws IOException on any error
+ */
+ public void dump(File location, String data) throws IOException {
+ File parent = location.getParentFile();
+ if (!parent.exists()) {
+ forceMkdir(parent);
+ }
+ try (Writer w = getWriter(location)) {
+ w.write(data);
+ }
+ }
+
+ /**
+ * Read the contents of a file into a String
+ * @param location the file to read
+ * @return the contents of the file
+ * @throws IOException on any error
+ */
+ @Override
+ public String slurpString(File location) throws IOException {
+ return FileUtils.readFileToString(location, "UTF-8");
+ }
+
+ /**
+ * Read the contents of a file into a byte array.
+ * @param location the file to read
+ * @return the contents of the file
+ * @throws IOException on any error
+ */
+ @Override
+ public byte[] slurp(File location) throws IOException {
+ return FileUtils.readFileToByteArray(location);
+ }
+
+ /**
+ * Create a symbolic link pointing at target
+ * @param link the link to create
+ * @param target where it should point to
+ * @throws IOException on any error.
+ */
+ @Override
+ public void createSymlink(File link, File target) throws IOException {
+ if (_symlinksDisabled) {
+ throw new IOException("Symlinks have been disabled, this should not be called");
+ }
+ Path plink = link.toPath().toAbsolutePath();
+ Path ptarget = target.toPath().toAbsolutePath();
+ LOG.debug("Creating symlink [{}] to [{}]", plink, ptarget);
+ if (Files.exists(plink)) {
+ if (Files.isSameFile(plink, ptarget)) {
+ //It already points where we want it to
+ return;
+ }
+ FileUtils.forceDelete(link);
+ }
+ Files.createSymbolicLink(plink, ptarget);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
new file mode 100644
index 0000000..8cd7261
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class ClientSupervisorUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(ClientSupervisorUtils.class);
+
+ static boolean doRequiredTopoFilesExist(Map<String, Object> conf, String stormId) throws IOException {
+ String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+ String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
+ String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
+ String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot);
+ if (!Utils.checkFileExists(stormroot))
+ return false;
+ if (!Utils.checkFileExists(stormcodepath))
+ return false;
+ if (!Utils.checkFileExists(stormconfpath))
+ return false;
+ if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath))
+ return true;
+ return false;
+ }
+
+ public static int processLauncherAndWait(Map<String, Object> conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
+ throws IOException {
+ int ret = 0;
+ Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null);
+ if (StringUtils.isNotBlank(logPreFix))
+ Utils.readAndLogStream(logPreFix, process.getInputStream());
+ try {
+ process.waitFor();
+ } catch (InterruptedException e) {
+ LOG.info("{} interrupted.", logPreFix);
+ }
+ ret = process.exitValue();
+ return ret;
+ }
+
+ static Process processLauncher(Map<String, Object> conf, String user, List<String> commandPrefix, List<String> args, Map<String, String> environment, final String logPreFix,
+ final ExitCodeCallback exitCodeCallback, File dir) throws IOException {
+ if (StringUtils.isBlank(user)) {
+ throw new IllegalArgumentException("User cannot be blank when calling processLauncher.");
+ }
+ String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+ String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+ String wl;
+ if (StringUtils.isNotBlank(wlinitial)) {
+ wl = wlinitial;
+ } else {
+ wl = stormHome + "/bin/worker-launcher";
+ }
+ List<String> commands = new ArrayList<>();
+ if (commandPrefix != null){
+ commands.addAll(commandPrefix);
+ }
+ commands.add(wl);
+ commands.add(user);
+ commands.addAll(args);
+ LOG.info("Running as user: {} command: {}", user, commands);
+ return launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
+ }
+
+ /**
+ * Launch a new process as per {@link ProcessBuilder} with a given
+ * callback.
+ * @param command the command to be executed in the new process
+ * @param environment the environment to be applied to the process. Can be
+ * null.
+ * @param logPrefix a prefix for log entries from the output of the process.
+ * Can be null.
+ * @param exitCodeCallback code to be called passing the exit code value
+ * when the process completes
+ * @param dir the working directory of the new process
+ * @return the new process
+ * @throws IOException
+ * @see ProcessBuilder
+ */
+ public static Process launchProcess(List<String> command,
+ Map<String,String> environment,
+ final String logPrefix,
+ final ExitCodeCallback exitCodeCallback,
+ File dir)
+ throws IOException {
+ ProcessBuilder builder = new ProcessBuilder(command);
+ Map<String,String> procEnv = builder.environment();
+ if (dir != null) {
+ builder.directory(dir);
+ }
+ builder.redirectErrorStream(true);
+ if (environment != null) {
+ procEnv.putAll(environment);
+ }
+ final Process process = builder.start();
+ if (logPrefix != null || exitCodeCallback != null) {
+ Utils.asyncLoop(new Callable<Object>() {
+ public Object call() {
+ if (logPrefix != null ) {
+ Utils.readAndLogStream(logPrefix,
+ process.getInputStream());
+ }
+ if (exitCodeCallback != null) {
+ try {
+ process.waitFor();
+ exitCodeCallback.call(process.exitValue());
+ } catch (InterruptedException ie) {
+ LOG.info("{} interrupted", logPrefix);
+ exitCodeCallback.call(-1);
+ }
+ }
+ return null; // Run only once.
+ }
+ });
+ }
+ return process;
+ }
+
+ public static void setupStormCodeDir(Map<String, Object> conf, Map<String, Object> stormConf, String dir) throws IOException {
+ if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ String logPrefix = "Storm Code Dir Setup for " + dir;
+ List<String> commands = new ArrayList<>();
+ commands.add("code-dir");
+ commands.add(dir);
+ processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+ }
+ }
+
+ public static void setupWorkerArtifactsDir(Map<String, Object> conf, Map<String, Object> stormConf, String dir) throws IOException {
+ if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+ String logPrefix = "Worker Artifacts Setup for " + dir;
+ List<String> commands = new ArrayList<>();
+ commands.add("artifacts-dir");
+ commands.add(dir);
+ processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
new file mode 100644
index 0000000..082f205
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+/**
+ * A callback that can accept an integer.
+ */
+public interface ExitCodeCallback {
+
+ /**
+ * The process finished
+ * @param exitCode the exit code of the finished process.
+ */
+ public void call(int exitCode);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
new file mode 100644
index 0000000..e5f5db0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.util.Map;
+
+public interface IAdvancedFSOps {
+
+ /**
+ * Set directory permissions to (OWNER)RWX (GROUP)R-X (OTHER)---
+ * On some systems that do not support this, it may become a noop
+ * @param dir the directory to change permissions on
+ * @throws IOException on any error
+ */
+ void restrictDirectoryPermissions(File dir) throws IOException;
+
+ /**
+ * Move fromDir to toDir, and try to make it an atomic move if possible
+ * @param fromDir what to move
+ * @param toDir where to move it from
+ * @throws IOException on any error
+ */
+ void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException;
+
+ /**
+ * @return true if an atomic directory move works, else false.
+ */
+ boolean supportsAtomicDirectoryMove();
+
+ /**
+ * Copy a directory
+ * @param fromDir from where
+ * @param toDir to where
+ * @throws IOException on any error
+ */
+ void copyDirectory(File fromDir, File toDir) throws IOException;
+
+ /**
+ * Setup permissions properly for an internal blob store path
+ * @param path the path to set the permissions on
+ * @param user the user to change the permissions for
+ * @throws IOException on any error
+ */
+ void setupBlobPermissions(File path, String user) throws IOException;
+
+ /**
+ * Delete a file or a directory and all of the children. If it exists.
+ * @param path what to delete
+ * @param user who to delete it as if doing it as someone else is supported
+ * @param logPrefix if an external process needs to be launched to delete
+ * the object what prefix to include in the logs
+ * @throws IOException on any error.
+ */
+ void deleteIfExists(File path, String user, String logPrefix) throws IOException;
+
+ /**
+ * Delete a file or a directory and all of the children. If it exists.
+ * @param path what to delete
+ * @throws IOException on any error.
+ */
+ void deleteIfExists(File path) throws IOException;
+
+ /**
+ * Setup the permissions for the storm code dir
+ * @param topologyConf the config of the Topology
+ * @param path the directory to set the permissions on
+ * @throws IOException on any error
+ */
+ void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException;
+
+ /**
+ * Setup the permissions for the worker artifacts dirs
+ * @param topologyConf the config of the Topology
+ * @param path the directory to set the permissions on
+ * @throws IOException on any error
+ */
+ void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException;
+
+ /**
+ * Sanity check if everything the topology needs is there for it to run.
+ * @param conf the config of the supervisor
+ * @param topologyId the ID of the topology
+ * @return true if everything is there, else false
+ * @throws IOException on any error
+ */
+ boolean doRequiredTopoFilesExist(Map<String, Object> conf, String topologyId) throws IOException;
+
+ /**
+ * Makes a directory, including any necessary but nonexistent parent
+ * directories.
+ *
+ * @param path the directory to create
+ * @throws IOException on any error
+ */
+ void forceMkdir(File path) throws IOException;
+
+ /**
+ * Check if a file exists or not
+ * @param path the path to check
+ * @return true if it exists else false
+ * @throws IOException on any error.
+ */
+ boolean fileExists(File path) throws IOException;
+
+ /**
+ * Get a writer for the given location
+ * @param file the file to write to
+ * @return the Writer to use.
+ * @throws IOException on any error
+ */
+ Writer getWriter(File file) throws IOException;
+
+ /**
+ * Get an output stream to write to a given file
+ * @param file the file to write to
+ * @return an OutputStream for that file
+ * @throws IOException on any error
+ */
+ OutputStream getOutputStream(File file) throws IOException;
+
+ /**
+ * Dump a string to a file
+ * @param location where to write to
+ * @param data the data to write
+ * @throws IOException on any error
+ */
+ void dump(File location, String data) throws IOException;
+
+ /**
+ * Read the contents of a file into a String
+ * @param location the file to read
+ * @return the contents of the file
+ * @throws IOException on any error
+ */
+ String slurpString(File location) throws IOException;
+
+ /**
+ * Read the contents of a file into a byte array.
+ * @param localtion the file to read
+ * @return the contents of the file
+ * @throws IOException on any error
+ */
+ byte[] slurp(File location) throws IOException;
+
+ /**
+ * Create a symbolic link pointing at target
+ * @param link the link to create
+ * @param target where it should point to
+ * @throws IOException on any error.
+ */
+ void createSymlink(File link, File target) throws IOException;
+}