You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2017/05/19 03:35:30 UTC

[07/13] ambari git commit: AMBARI-21048. HDP 3.0 TP - create service definition for Storm with configs, kerberos, widgets, etc.(vbrodetsky)

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py
new file mode 100644
index 0000000..78ec165
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py
@@ -0,0 +1,424 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+import re
+import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
+
+import status_params
+
+from ambari_commons.constants import AMBARI_SUDO_BINARY
+from ambari_commons import yaml_utils
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions.default import default
+from resource_management.libraries.functions.get_bare_principal import get_bare_principal
+from resource_management.libraries.script import Script
+from resource_management.libraries.resources.hdfs_resource import HdfsResource
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import get_kinit_path
+from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions.stack_features import get_stack_feature_version
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.expect import expect
+from resource_management.libraries.functions.setup_atlas_hook import has_atlas_in_cluster
+from resource_management.libraries.functions import is_empty
+from ambari_commons.ambari_metrics_helper import select_metric_collector_hosts_from_hostnames
+from resource_management.libraries.functions.setup_ranger_plugin_xml import get_audit_configs, generate_ranger_service_config
+
+# server configurations
+config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
+stack_root = status_params.stack_root
+sudo = AMBARI_SUDO_BINARY
+
+limits_conf_dir = "/etc/security/limits.d"
+
+# Needed since this is an Atlas Hook service.
+cluster_name = config['clusterName']
+
+stack_name = status_params.stack_name
+upgrade_direction = default("/commandParams/upgrade_direction", None)
+version = default("/commandParams/version", None)
+
+agent_stack_retry_on_unavailability = config['hostLevelParams']['agent_stack_retry_on_unavailability']
+agent_stack_retry_count = expect("/hostLevelParams/agent_stack_retry_count", int)
+
+storm_component_home_dir = status_params.storm_component_home_dir
+conf_dir = status_params.conf_dir
+
+stack_version_unformatted = status_params.stack_version_unformatted
+stack_version_formatted = status_params.stack_version_formatted
+stack_supports_ru = stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted)
+stack_supports_storm_kerberos = stack_version_formatted and check_stack_feature(StackFeature.STORM_KERBEROS, stack_version_formatted)
+stack_supports_storm_ams = stack_version_formatted and check_stack_feature(StackFeature.STORM_AMS, stack_version_formatted)
+stack_supports_core_site_for_ranger_plugin = check_stack_feature(StackFeature.CORE_SITE_FOR_RANGER_PLUGINS_SUPPORT, stack_version_formatted)
+
+# get the correct version to use for checking stack features
+version_for_stack_feature_checks = get_stack_feature_version(config)
+
+stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks)
+stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks)
+
+# default hadoop params
+rest_lib_dir = "/usr/lib/storm/contrib/storm-rest"
+storm_bin_dir = "/usr/bin"
+storm_lib_dir = "/usr/lib/storm/lib/"
+
+# hadoop parameters for 2.2+
+if stack_supports_ru:
+  rest_lib_dir = format("{storm_component_home_dir}/contrib/storm-rest")
+  storm_bin_dir = format("{storm_component_home_dir}/bin")
+  storm_lib_dir = format("{storm_component_home_dir}/lib")
+  log4j_dir = format("{storm_component_home_dir}/log4j2")
+
+storm_user = config['configurations']['storm-env']['storm_user']
+log_dir = config['configurations']['storm-env']['storm_log_dir']
+pid_dir = status_params.pid_dir
+local_dir = config['configurations']['storm-site']['storm.local.dir']
+user_group = config['configurations']['cluster-env']['user_group']
+java64_home = config['hostLevelParams']['java_home']
+jps_binary = format("{java64_home}/bin/jps")
+nimbus_port = config['configurations']['storm-site']['nimbus.thrift.port']
+storm_zookeeper_root_dir = default('/configurations/storm-site/storm.zookeeper.root', None)
+storm_zookeeper_servers = config['configurations']['storm-site']['storm.zookeeper.servers']
+storm_zookeeper_port = config['configurations']['storm-site']['storm.zookeeper.port']
+storm_logs_supported = config['configurations']['storm-env']['storm_logs_supported']
+
+# nimbus.seeds is supported in HDP 2.3.0.0 and higher
+nimbus_seeds_supported = default('/configurations/storm-env/nimbus_seeds_supported', False)
+nimbus_host = default('/configurations/storm-site/nimbus.host', None)
+nimbus_seeds = default('/configurations/storm-site/nimbus.seeds', None)
+default_topology_max_replication_wait_time_sec = default('/configurations/storm-site/topology.max.replication.wait.time.sec.default', -1)
+nimbus_hosts = default("/clusterHostInfo/nimbus_hosts", [])
+default_topology_min_replication_count = default('/configurations/storm-site/topology.min.replication.count.default', 1)
+
+#Calculate topology.max.replication.wait.time.sec and topology.min.replication.count
+if len(nimbus_hosts) > 1:
+  # for HA Nimbus
+  actual_topology_max_replication_wait_time_sec = -1
+  actual_topology_min_replication_count = len(nimbus_hosts) / 2 + 1
+else:
+  # for non-HA Nimbus
+  actual_topology_max_replication_wait_time_sec = default_topology_max_replication_wait_time_sec
+  actual_topology_min_replication_count = default_topology_min_replication_count
+
+if 'topology.max.replication.wait.time.sec.default' in config['configurations']['storm-site']:
+  del config['configurations']['storm-site']['topology.max.replication.wait.time.sec.default']
+if 'topology.min.replication.count.default' in config['configurations']['storm-site']:
+  del config['configurations']['storm-site']['topology.min.replication.count.default']
+
+rest_api_port = "8745"
+rest_api_admin_port = "8746"
+rest_api_conf_file = format("{conf_dir}/config.yaml")
+storm_env_sh_template = config['configurations']['storm-env']['content']
+jmxremote_port = config['configurations']['storm-env']['jmxremote_port']
+
+if 'ganglia_server_host' in config['clusterHostInfo'] and len(config['clusterHostInfo']['ganglia_server_host'])>0:
+  ganglia_installed = True
+  ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
+  ganglia_report_interval = 60
+else:
+  ganglia_installed = False
+
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+
+storm_ui_host = default("/clusterHostInfo/storm_ui_server_hosts", [])
+
+storm_user_nofile_limit = default('/configurations/storm-env/storm_user_nofile_limit', 128000)
+storm_user_nproc_limit = default('/configurations/storm-env/storm_user_noproc_limit', 65536)
+
+if security_enabled:
+  _hostname_lowercase = config['hostname'].lower()
+  _storm_principal_name = config['configurations']['storm-env']['storm_principal_name']
+  storm_jaas_principal = _storm_principal_name.replace('_HOST',_hostname_lowercase)
+  _ambari_principal_name = default('/configurations/cluster-env/ambari_principal_name', None)
+  storm_keytab_path = config['configurations']['storm-env']['storm_keytab']
+
+  if stack_supports_storm_kerberos:
+    storm_ui_keytab_path = config['configurations']['storm-env']['storm_ui_keytab']
+    _storm_ui_jaas_principal_name = config['configurations']['storm-env']['storm_ui_principal_name']
+    storm_ui_jaas_principal = _storm_ui_jaas_principal_name.replace('_HOST',_hostname_lowercase)
+    storm_bare_jaas_principal = get_bare_principal(_storm_principal_name)
+    if _ambari_principal_name:
+      ambari_bare_jaas_principal = get_bare_principal(_ambari_principal_name)
+    _nimbus_principal_name = config['configurations']['storm-env']['nimbus_principal_name']
+    nimbus_jaas_principal = _nimbus_principal_name.replace('_HOST', _hostname_lowercase)
+    nimbus_bare_jaas_principal = get_bare_principal(_nimbus_principal_name)
+    nimbus_keytab_path = config['configurations']['storm-env']['nimbus_keytab']
+
+kafka_bare_jaas_principal = None
+if stack_supports_storm_kerberos:
+  if security_enabled:
+    storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.secure.transport']
+    # generate KafkaClient jaas config if kafka is kerberoized
+    _kafka_principal_name = default("/configurations/kafka-env/kafka_principal_name", None)
+    kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name)
+  else:
+    storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.nonsecure.transport']
+
+set_instanceId = "false"
+if 'cluster-env' in config['configurations'] and \
+        'metrics_collector_external_hosts' in config['configurations']['cluster-env']:
+  ams_collector_hosts = config['configurations']['cluster-env']['metrics_collector_external_hosts']
+  set_instanceId = "true"
+else:
+  ams_collector_hosts = ",".join(default("/clusterHostInfo/metrics_collector_hosts", []))
+has_metric_collector = not len(ams_collector_hosts) == 0
+metric_collector_port = None
+if has_metric_collector:
+  if 'cluster-env' in config['configurations'] and \
+      'metrics_collector_external_port' in config['configurations']['cluster-env']:
+    metric_collector_port = config['configurations']['cluster-env']['metrics_collector_external_port']
+  else:
+    metric_collector_web_address = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:6188")
+    if metric_collector_web_address.find(':') != -1:
+      metric_collector_port = metric_collector_web_address.split(':')[1]
+    else:
+      metric_collector_port = '6188'
+
+  metric_collector_report_interval = 60
+  metric_collector_app_id = "nimbus"
+  if default("/configurations/ams-site/timeline.metrics.service.http.policy", "HTTP_ONLY") == "HTTPS_ONLY":
+    metric_collector_protocol = 'https'
+  else:
+    metric_collector_protocol = 'http'
+  metric_truststore_path= default("/configurations/ams-ssl-client/ssl.client.truststore.location", "")
+  metric_truststore_type= default("/configurations/ams-ssl-client/ssl.client.truststore.type", "")
+  metric_truststore_password= default("/configurations/ams-ssl-client/ssl.client.truststore.password", "")
+  pass
+metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
+metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-with-common-*.jar"
+metric_collector_legacy_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar"
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
+
+# Cluster Zookeeper quorum
+zookeeper_quorum = ""
+if storm_zookeeper_servers:
+  storm_zookeeper_servers_list = yaml_utils.get_values_from_yaml_array(storm_zookeeper_servers)
+  zookeeper_quorum = (":" + storm_zookeeper_port + ",").join(storm_zookeeper_servers_list)
+  zookeeper_quorum += ":" + storm_zookeeper_port
+
+jar_jvm_opts = ''
+
+########################################################
+############# Atlas related params #####################
+########################################################
+#region Atlas Hooks
+storm_atlas_application_properties = default('/configurations/storm-atlas-application.properties', {})
+enable_atlas_hook = default('/configurations/storm-env/storm.atlas.hook', False)
+atlas_hook_filename = default('/configurations/atlas-env/metadata_conf_file', 'atlas-application.properties')
+
+if enable_atlas_hook:
+  # Only append /etc/atlas/conf to classpath if on HDP 2.4.*
+  if check_stack_feature(StackFeature.ATLAS_CONF_DIR_IN_PATH, stack_version_formatted):
+    atlas_conf_dir = format('{stack_root}/current/atlas-server/conf')
+    jar_jvm_opts += '-Datlas.conf=' + atlas_conf_dir
+#endregion
+
+storm_ui_port = config['configurations']['storm-site']['ui.port']
+
+#Storm log4j properties
+storm_a1_maxfilesize = default('/configurations/storm-cluster-log4j/storm_a1_maxfilesize', 100)
+storm_a1_maxbackupindex = default('/configurations/storm-cluster-log4j/storm_a1_maxbackupindex', 9)
+storm_wrkr_a1_maxfilesize = default('/configurations/storm-worker-log4j/storm_wrkr_a1_maxfilesize', 100)
+storm_wrkr_a1_maxbackupindex = default('/configurations/storm-worker-log4j/storm_wrkr_a1_maxbackupindex', 9)
+storm_wrkr_out_maxfilesize = default('/configurations/storm-worker-log4j/storm_wrkr_out_maxfilesize', 100)
+storm_wrkr_out_maxbackupindex = default('/configurations/storm-worker-log4j/storm_wrkr_out_maxbackupindex', 4)
+storm_wrkr_err_maxfilesize = default('/configurations/storm-worker-log4j/storm_wrkr_err_maxfilesize', 100)
+storm_wrkr_err_maxbackupindex = default('/configurations/storm-worker-log4j/storm_wrkr_err_maxbackupindex', 4)
+
+storm_cluster_log4j_content = config['configurations']['storm-cluster-log4j']['content']
+storm_worker_log4j_content = config['configurations']['storm-worker-log4j']['content']
+
+# some commands may need to supply the JAAS location when running as storm
+storm_jaas_file = format("{conf_dir}/storm_jaas.conf")
+
+# for curl command in ranger plugin to get db connector
+jdk_location = config['hostLevelParams']['jdk_location']
+
+# ranger storm plugin start section
+
+# ranger host
+ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", [])
+has_ranger_admin = not len(ranger_admin_hosts) == 0
+
+# ranger support xml_configuration flag, instead of depending on ranger xml_configurations_supported/ranger-env, using stack feature
+xml_configurations_supported = check_stack_feature(StackFeature.RANGER_XML_CONFIGURATION, version_for_stack_feature_checks)
+
+# ambari-server hostname
+ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0]
+
+# ranger storm plugin enabled property
+enable_ranger_storm = default("/configurations/ranger-storm-plugin-properties/ranger-storm-plugin-enabled", "No")
+enable_ranger_storm = True if enable_ranger_storm.lower() == 'yes' else False
+
+# ranger storm properties
+if enable_ranger_storm:
+  # get ranger policy url
+  policymgr_mgr_url = config['configurations']['admin-properties']['policymgr_external_url']
+  if xml_configurations_supported:
+    policymgr_mgr_url = config['configurations']['ranger-storm-security']['ranger.plugin.storm.policy.rest.url']
+
+  if not is_empty(policymgr_mgr_url) and policymgr_mgr_url.endswith('/'):
+    policymgr_mgr_url = policymgr_mgr_url.rstrip('/')
+
+  # ranger audit db user
+  xa_audit_db_user = default('/configurations/admin-properties/audit_db_user', 'rangerlogger')
+
+  # ranger storm service name
+  repo_name = str(config['clusterName']) + '_storm'
+  repo_name_value = config['configurations']['ranger-storm-security']['ranger.plugin.storm.service.name']
+  if not is_empty(repo_name_value) and repo_name_value != "{{repo_name}}":
+    repo_name = repo_name_value
+
+  common_name_for_certificate = config['configurations']['ranger-storm-plugin-properties']['common.name.for.certificate']
+  repo_config_username = config['configurations']['ranger-storm-plugin-properties']['REPOSITORY_CONFIG_USERNAME']
+
+  # ranger-env config
+  ranger_env = config['configurations']['ranger-env']
+
+  # create ranger-env config having external ranger credential properties
+  if not has_ranger_admin and enable_ranger_storm:
+    external_admin_username = default('/configurations/ranger-storm-plugin-properties/external_admin_username', 'admin')
+    external_admin_password = default('/configurations/ranger-storm-plugin-properties/external_admin_password', 'admin')
+    external_ranger_admin_username = default('/configurations/ranger-storm-plugin-properties/external_ranger_admin_username', 'amb_ranger_admin')
+    external_ranger_admin_password = default('/configurations/ranger-storm-plugin-properties/external_ranger_admin_password', 'amb_ranger_admin')
+    ranger_env = {}
+    ranger_env['admin_username'] = external_admin_username
+    ranger_env['admin_password'] = external_admin_password
+    ranger_env['ranger_admin_username'] = external_ranger_admin_username
+    ranger_env['ranger_admin_password'] = external_ranger_admin_password
+
+  ranger_plugin_properties = config['configurations']['ranger-storm-plugin-properties']
+  policy_user = storm_user
+  repo_config_password = config['configurations']['ranger-storm-plugin-properties']['REPOSITORY_CONFIG_PASSWORD']
+
+  xa_audit_db_password = ''
+  if not is_empty(config['configurations']['admin-properties']['audit_db_password']) and stack_supports_ranger_audit_db and has_ranger_admin:
+    xa_audit_db_password = config['configurations']['admin-properties']['audit_db_password']
+
+  repo_config_password = config['configurations']['ranger-storm-plugin-properties']['REPOSITORY_CONFIG_PASSWORD']
+
+  downloaded_custom_connector = None
+  previous_jdbc_jar_name = None
+  driver_curl_source = None
+  driver_curl_target = None
+  previous_jdbc_jar = None
+
+  if has_ranger_admin and stack_supports_ranger_audit_db:
+    xa_audit_db_flavor = config['configurations']['admin-properties']['DB_FLAVOR']
+    jdbc_jar_name, previous_jdbc_jar_name, audit_jdbc_url, jdbc_driver = get_audit_configs(config)
+
+    downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+    driver_curl_source = format("{jdk_location}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+    driver_curl_target = format("{storm_component_home_dir}/lib/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+    previous_jdbc_jar = format("{storm_component_home_dir}/lib/{previous_jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+    sql_connector_jar = ''
+
+  storm_ranger_plugin_config = {
+    'username': repo_config_username,
+    'password': repo_config_password,
+    'nimbus.url': 'http://' + storm_ui_host[0].lower() + ':' + str(storm_ui_port),
+    'commonNameForCertificate': common_name_for_certificate
+  }
+
+  storm_ranger_plugin_repo = {
+    'isActive': 'true',
+    'config': json.dumps(storm_ranger_plugin_config),
+    'description': 'storm repo',
+    'name': repo_name,
+    'repositoryType': 'storm',
+    'assetType': '6'
+  }
+
+  custom_ranger_service_config = generate_ranger_service_config(ranger_plugin_properties)
+  if len(custom_ranger_service_config) > 0:
+    storm_ranger_plugin_config.update(custom_ranger_service_config)
+
+  if stack_supports_ranger_kerberos and security_enabled:
+    policy_user = format('{storm_user},{storm_bare_jaas_principal}')
+    storm_ranger_plugin_config['policy.download.auth.users'] = policy_user
+    storm_ranger_plugin_config['tag.download.auth.users'] = policy_user
+    storm_ranger_plugin_config['ambari.service.check.user'] = policy_user
+
+    storm_ranger_plugin_repo = {
+      'isEnabled': 'true',
+      'configs': storm_ranger_plugin_config,
+      'description': 'storm repo',
+      'name': repo_name,
+      'type': 'storm'
+    }
+
+  ranger_storm_principal = None
+  ranger_storm_keytab = None
+  if stack_supports_ranger_kerberos and security_enabled:
+    ranger_storm_principal = storm_jaas_principal
+    ranger_storm_keytab = storm_keytab_path
+
+  xa_audit_db_is_enabled = False
+  if xml_configurations_supported and stack_supports_ranger_audit_db:
+    xa_audit_db_is_enabled = config['configurations']['ranger-storm-audit']['xasecure.audit.destination.db']
+
+  xa_audit_hdfs_is_enabled = default('/configurations/ranger-storm-audit/xasecure.audit.destination.hdfs', False)
+  ssl_keystore_password = config['configurations']['ranger-storm-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password'] if xml_configurations_supported else None
+  ssl_truststore_password = config['configurations']['ranger-storm-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password'] if xml_configurations_supported else None
+  credential_file = format('/etc/ranger/{repo_name}/cred.jceks')
+
+  # for SQLA explicitly disable audit to DB for Ranger
+  if has_ranger_admin and stack_supports_ranger_audit_db and xa_audit_db_flavor.lower() == 'sqla':
+    xa_audit_db_is_enabled = False
+
+# ranger storm plugin end section
+
+namenode_hosts = default("/clusterHostInfo/namenode_host", [])
+has_namenode = not len(namenode_hosts) == 0
+
+availableServices = config['availableServices']
+
+hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if has_namenode else None
+hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] if has_namenode else None
+hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode else None
+hdfs_site = config['configurations']['hdfs-site'] if has_namenode else None
+default_fs = config['configurations']['core-site']['fs.defaultFS'] if has_namenode else None
+hadoop_bin_dir = stack_select.get_hadoop_dir("bin") if has_namenode else None
+hadoop_conf_dir = conf_select.get_hadoop_conf_dir() if has_namenode else None
+kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
+
+import functools
+#create partial functions with common arguments for every HdfsResource call
+#to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code
+HdfsResource = functools.partial(
+  HdfsResource,
+  user=hdfs_user,
+  hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
+  security_enabled = security_enabled,
+  keytab = hdfs_user_keytab,
+  kinit_path_local = kinit_path_local,
+  hadoop_bin_dir = hadoop_bin_dir,
+  hadoop_conf_dir = hadoop_conf_dir,
+  principal_name = hdfs_principal_name,
+  hdfs_site = hdfs_site,
+  default_fs = default_fs,
+  immutable_paths = get_not_managed_resources()
+)

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py
new file mode 100644
index 0000000..a758375
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from status_params import *
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.default import default
+
+# server configurations
+config = Script.get_config()
+
+stack_is_hdp23_or_further = Script.is_stack_greater_or_equal("2.3")
+
+stack_root = os.path.abspath(os.path.join(os.environ["HADOOP_HOME"],".."))
+conf_dir = os.environ["STORM_CONF_DIR"]
+hadoop_user = config["configurations"]["cluster-env"]["hadoop.user.name"]
+storm_user = hadoop_user
+
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+default_topology_max_replication_wait_time_sec = default('/configurations/storm-site/topology.max.replication.wait.time.sec.default', -1)
+nimbus_hosts = default("/clusterHostInfo/nimbus_hosts", [])
+default_topology_min_replication_count = default('/configurations/storm-site/topology.min.replication.count.default', 1)
+
+#Calculate topology.max.replication.wait.time.sec and topology.min.replication.count
+if len(nimbus_hosts) > 1:
+  # for HA Nimbus
+  actual_topology_max_replication_wait_time_sec = -1
+  actual_topology_min_replication_count = len(nimbus_hosts) / 2 + 1
+else:
+  # for non-HA Nimbus
+  actual_topology_max_replication_wait_time_sec = default_topology_max_replication_wait_time_sec
+  actual_topology_min_replication_count = default_topology_min_replication_count
+
+if stack_is_hdp23_or_further:
+  if security_enabled:
+    storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.secure.transport']
+  else:
+    storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.nonsecure.transport']
+
+service_map = {
+  "nimbus" : nimbus_win_service_name,
+  "supervisor" : supervisor_win_service_name,
+  "ui" : ui_win_service_name
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py
new file mode 100644
index 0000000..f9b3b80
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+from resource_management.libraries.functions import check_process_status
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import format
+from resource_management.core.resources.system import Execute
+
+from storm import storm
+from service import service
+from service_check import ServiceCheck
+
+
+class StormRestApi(Script):
+  """
+  Storm REST API.
+  It was available in HDP 2.0 and 2.1.
+  In HDP 2.2, it was removed since the functionality was moved to Storm UI Server.
+  """
+
+  def get_component_name(self):
+    return "storm-client"
+
+  def install(self, env):
+    self.install_packages(env)
+    self.configure(env)
+
+  def configure(self, env):
+    import params
+    env.set_params(params)
+
+    storm()
+
+  def start(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    self.configure(env)
+
+    service("rest_api", action="start")
+
+  def stop(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+
+    service("rest_api", action="stop")
+
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    check_process_status(status_params.pid_rest_api)
+
+  def get_log_folder(self):
+    import params
+    return params.log_dir
+  
+  def get_user(self):
+    import params
+    return params.storm_user
+
+  def get_pid_files(self):
+    import status_params
+    return [status_params.pid_rest_api]
+  
+if __name__ == "__main__":
+  StormRestApi().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py
new file mode 100644
index 0000000..b5e5cd5
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import os
+
+from resource_management.core.resources import Execute
+from resource_management.core.resources import File
+from resource_management.core.shell import as_user
+from resource_management.core import shell
+from resource_management.core.logger import Logger
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions import get_user_call_output
+from resource_management.libraries.functions.show_logs import show_logs
+import time
+
+
+def service(name, action = 'start'):
+  import params
+  import status_params
+
+  pid_file = status_params.pid_files[name]
+  no_op_test = as_user(format(
+    "ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1"), user=params.storm_user)
+
+  if name == 'ui':
+    process_grep = "storm.ui.core$"
+  elif name == "rest_api":
+    process_grep = format("{rest_lib_dir}/storm-rest-.*\.jar$")
+  else:
+    process_grep = format("storm.daemon.{name}$")
+
+  find_proc = format("{jps_binary} -l  | grep {process_grep}")
+  write_pid = format("{find_proc} | awk {{'print $1'}} > {pid_file}")
+  crt_pid_cmd = format("{find_proc} && {write_pid}")
+  storm_env = format(
+    "source {conf_dir}/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH")
+
+  if action == "start":
+    if name == "rest_api":
+      process_cmd = format(
+        "{storm_env} ; java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server")
+      cmd = format(
+        "{process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log 2>&1")
+    else:
+      # Storm start script gets forked into actual storm java process.
+      # Which means we can use the pid of start script as a pid of start component
+      cmd = format("{storm_env} ; storm {name} > {log_dir}/{name}.out 2>&1")
+
+    cmd = format("{cmd} &\n echo $! > {pid_file}")
+    
+    Execute(cmd,
+      not_if = no_op_test,
+      user = params.storm_user,
+      path = params.storm_bin_dir,
+    )
+    
+    File(pid_file,
+         owner = params.storm_user,
+         group = params.user_group
+    )
+  elif action == "stop":
+    process_dont_exist = format("! ({no_op_test})")
+    if os.path.exists(pid_file):
+      pid = get_user_call_output.get_user_call_output(format("! test -f {pid_file} ||  cat {pid_file}"), user=params.storm_user)[1]
+
+      # if multiple processes are running (for example user can start logviewer from console)
+      # there can be more than one id
+      pid = pid.replace("\n", " ")
+
+      Execute(format("{sudo} kill {pid}"),
+        not_if = process_dont_exist)
+
+      Execute(format("{sudo} kill -9 {pid}"),
+        not_if = format(
+          "sleep 2; {process_dont_exist} || sleep 20; {process_dont_exist}"),
+        ignore_failures = True)
+
+      File(pid_file, action = "delete")

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py
new file mode 100644
index 0000000..80ea0f5
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py
@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import os
+
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions import get_unique_id_and_date
+from resource_management.core.resources import File
+from resource_management.core.resources import Execute
+from resource_management.libraries.script import Script
+from resource_management.core.source import StaticFile
+from ambari_commons import OSCheck, OSConst
+from ambari_commons.os_family_impl import OsFamilyImpl
+
+class ServiceCheck(Script):
+  pass
+
+
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class ServiceCheckWindows(ServiceCheck):
+  def service_check(self, env):
+    import params
+    env.set_params(params)
+    smoke_cmd = os.path.join(params.stack_root,"Run-SmokeTests.cmd")
+    service = "STORM"
+    Execute(format("cmd /C {smoke_cmd} {service}", smoke_cmd=smoke_cmd, service=service), user=params.storm_user, logoutput=True)
+
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class ServiceCheckDefault(ServiceCheck):
+  def service_check(self, env):
+    import params
+    env.set_params(params)
+
+    unique = get_unique_id_and_date()
+
+    File("/tmp/wordCount.jar",
+         content=StaticFile("wordCount.jar"),
+         owner=params.storm_user
+    )
+
+    cmd = ""
+    if params.nimbus_seeds_supported:
+      # Because this command is guaranteed to run on one of the hosts with storm client, there is no need
+      # to specify "-c nimbus.seeds={nimbus_seeds}"
+      cmd = format("storm jar /tmp/wordCount.jar storm.starter.WordCountTopology WordCount{unique}")
+    elif params.nimbus_host is not None:
+      cmd = format("storm jar /tmp/wordCount.jar storm.starter.WordCountTopology WordCount{unique} -c nimbus.host={nimbus_host}")
+
+    Execute(cmd,
+            logoutput=True,
+            path=params.storm_bin_dir,
+            user=params.storm_user
+    )
+
+    Execute(format("storm kill WordCount{unique}"),
+            path=params.storm_bin_dir,
+            user=params.storm_user
+    )
+
+if __name__ == "__main__":
+  ServiceCheck().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py
new file mode 100644
index 0000000..c04496e
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management.core.logger import Logger
+from resource_management.libraries.functions.setup_ranger_plugin_xml import setup_core_site_for_required_plugins
+from resource_management.libraries.resources.xml_config import XmlConfig
+from resource_management.libraries.functions.format import format
+from resource_management.core.resources import File, Directory
+
+def setup_ranger_storm(upgrade_type=None):
+  """
+  :param upgrade_type: Upgrade Type such as "rolling" or "nonrolling"
+  """
+  import params
+  if params.enable_ranger_storm and params.security_enabled:
+
+    stack_version = None
+    if upgrade_type is not None:
+      stack_version = params.version
+
+    if params.retryAble:
+      Logger.info("Storm: Setup ranger: command retry enables thus retrying if ranger admin is down !")
+    else:
+      Logger.info("Storm: Setup ranger: command retry not enabled thus skipping if ranger admin is down !")
+
+    if params.xml_configurations_supported and params.enable_ranger_storm and params.xa_audit_hdfs_is_enabled:
+      if params.has_namenode:
+        params.HdfsResource("/ranger/audit",
+                           type="directory",
+                           action="create_on_execute",
+                           owner=params.hdfs_user,
+                           group=params.hdfs_user,
+                           mode=0755,
+                           recursive_chmod=True
+        )
+        params.HdfsResource("/ranger/audit/storm",
+                           type="directory",
+                           action="create_on_execute",
+                           owner=params.storm_user,
+                           group=params.storm_user,
+                           mode=0700,
+                           recursive_chmod=True
+        )
+        params.HdfsResource(None, action="execute")
+
+    if params.xml_configurations_supported:
+      api_version=None
+      if params.stack_supports_ranger_kerberos:
+        api_version='v2'
+      from resource_management.libraries.functions.setup_ranger_plugin_xml import setup_ranger_plugin
+      setup_ranger_plugin('storm-nimbus', 'storm', params.previous_jdbc_jar,
+                          params.downloaded_custom_connector, params.driver_curl_source,
+                          params.driver_curl_target, params.java64_home,
+                          params.repo_name, params.storm_ranger_plugin_repo,
+                          params.ranger_env, params.ranger_plugin_properties,
+                          params.policy_user, params.policymgr_mgr_url,
+                          params.enable_ranger_storm, conf_dict=params.conf_dir,
+                          component_user=params.storm_user, component_group=params.user_group, cache_service_list=['storm'],
+                          plugin_audit_properties=params.config['configurations']['ranger-storm-audit'], plugin_audit_attributes=params.config['configuration_attributes']['ranger-storm-audit'],
+                          plugin_security_properties=params.config['configurations']['ranger-storm-security'], plugin_security_attributes=params.config['configuration_attributes']['ranger-storm-security'],
+                          plugin_policymgr_ssl_properties=params.config['configurations']['ranger-storm-policymgr-ssl'], plugin_policymgr_ssl_attributes=params.config['configuration_attributes']['ranger-storm-policymgr-ssl'],
+                          component_list=['storm-client', 'storm-nimbus'], audit_db_is_enabled=params.xa_audit_db_is_enabled,
+                          credential_file=params.credential_file, xa_audit_db_password=params.xa_audit_db_password,
+                          ssl_truststore_password=params.ssl_truststore_password, ssl_keystore_password=params.ssl_keystore_password,
+                          stack_version_override = stack_version, skip_if_rangeradmin_down= not params.retryAble,api_version=api_version,
+                          is_security_enabled = params.security_enabled,
+                          is_stack_supports_ranger_kerberos = params.stack_supports_ranger_kerberos,
+                          component_user_principal=params.ranger_storm_principal if params.security_enabled else None,
+                          component_user_keytab=params.ranger_storm_keytab if params.security_enabled else None)
+    else:
+      from resource_management.libraries.functions.setup_ranger_plugin import setup_ranger_plugin
+      setup_ranger_plugin('storm-nimbus', 'storm', params.previous_jdbc_jar,
+                        params.downloaded_custom_connector, params.driver_curl_source,
+                        params.driver_curl_target, params.java64_home,
+                        params.repo_name, params.storm_ranger_plugin_repo,
+                        params.ranger_env, params.ranger_plugin_properties,
+                        params.policy_user, params.policymgr_mgr_url,
+                        params.enable_ranger_storm, conf_dict=params.conf_dir,
+                        component_user=params.storm_user, component_group=params.user_group, cache_service_list=['storm'],
+                        plugin_audit_properties=params.config['configurations']['ranger-storm-audit'], plugin_audit_attributes=params.config['configuration_attributes']['ranger-storm-audit'],
+                        plugin_security_properties=params.config['configurations']['ranger-storm-security'], plugin_security_attributes=params.config['configuration_attributes']['ranger-storm-security'],
+                        plugin_policymgr_ssl_properties=params.config['configurations']['ranger-storm-policymgr-ssl'], plugin_policymgr_ssl_attributes=params.config['configuration_attributes']['ranger-storm-policymgr-ssl'],
+                        component_list=['storm-client', 'storm-nimbus'], audit_db_is_enabled=params.xa_audit_db_is_enabled,
+                        credential_file=params.credential_file, xa_audit_db_password=params.xa_audit_db_password,
+                        ssl_truststore_password=params.ssl_truststore_password, ssl_keystore_password=params.ssl_keystore_password,
+                        stack_version_override = stack_version, skip_if_rangeradmin_down= not params.retryAble)
+
+
+    site_files_create_path = format('{storm_component_home_dir}/extlib-daemon/ranger-storm-plugin-impl/conf')
+    Directory(site_files_create_path,
+            owner = params.storm_user,
+            group = params.user_group,
+            mode=0775,
+            create_parents = True,
+            cd_access = 'a'
+            )
+
+    if params.stack_supports_core_site_for_ranger_plugin and params.enable_ranger_storm and params.has_namenode and params.security_enabled:
+      Logger.info("Stack supports core-site.xml creation for Ranger plugin, creating create core-site.xml from namenode configuraitions")
+      setup_core_site_for_required_plugins(component_user=params.storm_user,component_group=params.user_group,create_core_site_path = site_files_create_path, config = params.config)
+      if len(params.namenode_hosts) > 1:
+        Logger.info('Ranger Storm plugin is enabled along with security and NameNode is HA , creating hdfs-site.xml')
+        XmlConfig("hdfs-site.xml",
+          conf_dir=site_files_create_path,
+          configurations=params.config['configurations']['hdfs-site'],
+          configuration_attributes=params.config['configuration_attributes']['hdfs-site'],
+          owner=params.storm_user,
+          group=params.user_group,
+          mode=0644
+        )
+      else:
+        Logger.info('Ranger Storm plugin is not enabled or security is disabled, removing hdfs-site.xml')
+        File(format('{site_files_create_path}/hdfs-site.xml'), action="delete")
+    else:
+      Logger.info("Stack does not support core-site.xml creation for Ranger plugin, skipping core-site.xml configurations")
+  else:
+    Logger.info('Ranger Storm plugin is not enabled')

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py
new file mode 100644
index 0000000..d84b095
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions import get_kinit_path
+from resource_management.libraries.functions import default, format
+from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from ambari_commons import OSCheck
+
+# a map of the Ambari role to the component name
+# for use with <stack-root>/current/<component>
+SERVER_ROLE_DIRECTORY_MAP = {
+  'NIMBUS' : 'storm-nimbus',
+  'SUPERVISOR' : 'storm-supervisor',
+  'STORM_UI_SERVER' : 'storm-client',
+  'DRPC_SERVER' : 'storm-client',
+  'STORM_SERVICE_CHECK' : 'storm-client'
+}
+
+component_directory = Script.get_component_from_role(SERVER_ROLE_DIRECTORY_MAP, "STORM_SERVICE_CHECK")
+
+config = Script.get_config()
+stack_root = Script.get_stack_root()
+stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
+stack_version_formatted = format_stack_version(stack_version_unformatted)
+
+if OSCheck.is_windows_family():
+  nimbus_win_service_name = "nimbus"
+  supervisor_win_service_name = "supervisor"
+  ui_win_service_name = "ui"
+else:
+  pid_dir = config['configurations']['storm-env']['storm_pid_dir']
+  pid_nimbus = format("{pid_dir}/nimbus.pid")
+  pid_supervisor = format("{pid_dir}/supervisor.pid")
+  pid_drpc = format("{pid_dir}/drpc.pid")
+  pid_ui = format("{pid_dir}/ui.pid")
+  pid_logviewer = format("{pid_dir}/logviewer.pid")
+  pid_rest_api = format("{pid_dir}/restapi.pid")
+
+  pid_files = {
+    "logviewer":pid_logviewer,
+    "ui": pid_ui,
+    "nimbus": pid_nimbus,
+    "supervisor": pid_supervisor,
+    "drpc": pid_drpc,
+    "rest_api": pid_rest_api
+  }
+
+  # Security related/required params
+  hostname = config['hostname']
+  security_enabled = config['configurations']['cluster-env']['security_enabled']
+  kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
+  tmp_dir = Script.get_tmp_dir()
+
+  storm_component_home_dir = "/usr/lib/storm"
+  conf_dir = "/etc/storm/conf"
+  if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
+    storm_component_home_dir = format("{stack_root}/current/{component_directory}")
+    conf_dir = format("{stack_root}/current/{component_directory}/conf")
+
+  storm_user = config['configurations']['storm-env']['storm_user']
+  storm_ui_principal = default('/configurations/storm-env/storm_ui_principal_name', None)
+  storm_ui_keytab = default('/configurations/storm-env/storm_ui_keytab', None)
+
+stack_name = default("/hostLevelParams/stack_name", None)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py
new file mode 100644
index 0000000..99579d2
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py
@@ -0,0 +1,182 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.core.exceptions import Fail
+from resource_management.core.resources.service import ServiceConfig
+from resource_management.core.resources.system import Directory, Execute, File, Link
+from resource_management.core.source import InlineTemplate
+from resource_management.libraries.resources.template_config import TemplateConfig
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.script.script import Script
+from resource_management.core.source import Template
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from storm_yaml_utils import yaml_config_template, yaml_config
+from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
+from ambari_commons import OSConst
+from resource_management.libraries.functions.setup_atlas_hook import has_atlas_in_cluster, setup_atlas_hook, setup_atlas_jar_symlinks
+from ambari_commons.constants import SERVICE
+
+
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def storm(name=None):
+  import params
+  yaml_config("storm.yaml",
+              conf_dir=params.conf_dir,
+              configurations=params.config['configurations']['storm-site'],
+              owner=params.storm_user
+  )
+
+  if params.service_map.has_key(name):
+    service_name = params.service_map[name]
+    ServiceConfig(service_name,
+                  action="change_user",
+                  username = params.storm_user,
+                  password = Script.get_password(params.storm_user))
+
+
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+def storm(name=None):
+  import params
+  import os
+
+  Directory(params.log_dir,
+            owner=params.storm_user,
+            group=params.user_group,
+            mode=0777,
+            create_parents = True,
+            cd_access="a",
+  )
+
+  Directory([params.pid_dir, params.local_dir],
+            owner=params.storm_user,
+            group=params.user_group,
+            create_parents = True,
+            cd_access="a",
+            mode=0755,
+  )
+
+  Directory(params.conf_dir,
+            group=params.user_group,
+            create_parents = True,
+            cd_access="a",
+  )
+
+  File(format("{limits_conf_dir}/storm.conf"),
+       owner='root',
+       group='root',
+       mode=0644,
+       content=Template("storm.conf.j2")
+  )
+
+  File(format("{conf_dir}/config.yaml"),
+       content=Template("config.yaml.j2"),
+       owner=params.storm_user,
+       group=params.user_group
+  )
+
+  configurations = params.config['configurations']['storm-site']
+
+  File(format("{conf_dir}/storm.yaml"),
+       content=yaml_config_template(configurations),
+       owner=params.storm_user,
+       group=params.user_group
+  )
+
+  File(format("{conf_dir}/storm-env.sh"),
+       owner=params.storm_user,
+       content=InlineTemplate(params.storm_env_sh_template)
+  )
+
+  # Generate atlas-application.properties.xml file and symlink the hook jars
+  if params.enable_atlas_hook:
+    atlas_hook_filepath = os.path.join(params.conf_dir, params.atlas_hook_filename)
+    setup_atlas_hook(SERVICE.STORM, params.storm_atlas_application_properties, atlas_hook_filepath, params.storm_user, params.user_group)
+    storm_extlib_dir = os.path.join(params.storm_component_home_dir, "extlib")
+    setup_atlas_jar_symlinks("storm", storm_extlib_dir)
+
+  if params.has_metric_collector:
+    File(format("{conf_dir}/storm-metrics2.properties"),
+        owner=params.storm_user,
+        group=params.user_group,
+        content=Template("storm-metrics2.properties.j2")
+    )
+
+    # Remove symlinks. They can be there, if you doing upgrade from HDP < 2.2 to HDP >= 2.2
+    Link(format("{storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+         action="delete")
+    # On old HDP 2.1 versions, this symlink may also exist and break EU to newer versions
+    Link("/usr/lib/storm/lib/ambari-metrics-storm-sink.jar", action="delete")
+
+    if check_stack_feature(StackFeature.STORM_METRICS_APACHE_CLASSES, params.version_for_stack_feature_checks):
+      sink_jar = params.metric_collector_sink_jar
+    else:
+      sink_jar = params.metric_collector_legacy_sink_jar
+
+    Execute(format("{sudo} ln -s {sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+            not_if=format("ls {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+            only_if=format("ls {sink_jar}")
+    )
+
+  if params.storm_logs_supported:
+    Directory(params.log4j_dir,
+              owner=params.storm_user,
+              group=params.user_group,
+              mode=0755,
+              create_parents = True
+    )
+    
+    File(format("{log4j_dir}/cluster.xml"),
+      owner=params.storm_user,
+      content=InlineTemplate(params.storm_cluster_log4j_content)
+    )
+    File(format("{log4j_dir}/worker.xml"),
+      owner=params.storm_user,
+      content=InlineTemplate(params.storm_worker_log4j_content)
+    )
+
+  if params.security_enabled:
+    TemplateConfig(format("{conf_dir}/storm_jaas.conf"),
+                   owner=params.storm_user
+    )
+    if params.stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.stack_version_formatted):
+      TemplateConfig(format("{conf_dir}/client_jaas.conf"),
+                     owner=params.storm_user
+      )
+      minRuid = configurations['_storm.min.ruid'] if configurations.has_key('_storm.min.ruid') else ''
+      
+      min_user_ruid = int(minRuid) if minRuid.isdigit() else _find_real_user_min_uid()
+      
+      File(format("{conf_dir}/worker-launcher.cfg"),
+           content=Template("worker-launcher.cfg.j2", min_user_ruid = min_user_ruid),
+           owner='root',
+           group=params.user_group
+      )
+
+
+'''
+Finds minimal real user UID
+'''
+def _find_real_user_min_uid():
+  with open('/etc/login.defs') as f:
+    for line in f:
+      if line.strip().startswith('UID_MIN') and len(line.split()) == 2 and line.split()[1].isdigit():
+        return int(line.split()[1])
+  raise Fail("Unable to find UID_MIN in file /etc/login.defs. Expecting format e.g.: 'UID_MIN    500'")  

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py
new file mode 100644
index 0000000..bc245c4
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py
@@ -0,0 +1,177 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
+import os
+
+from ambari_commons import yaml_utils
+from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+from resource_management.core.resources.system import Directory
+from resource_management.core.resources.system import File
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.default import default
+from resource_management.libraries.functions.format import format
+
+class StormUpgrade(Script):
+  """
+  Applies to Rolling/Express Upgrade from HDP 2.1 or 2.2 to 2.3 or higher.
+
+  Requirements: Needs to run from a host with ZooKeeper Client.
+
+  This class helps perform some of the upgrade tasks needed for Storm during
+  a Rolling or Express upgrade. Storm writes data to disk locally and to ZooKeeper.
+  If any HDP 2.1 or 2.2 bits exist in these directories when an HDP 2.3 instance
+  starts up, it will fail to start properly. Because the upgrade framework in
+  Ambari doesn't yet have a mechanism to say "stop all" before starting to
+  upgrade each component, we need to rely on a Storm trick to bring down
+  running daemons. By removing the ZooKeeper data with running daemons, those
+  daemons will die.
+  """
+
+  def delete_storm_zookeeper_data(self, env):
+    """
+    Deletes the Storm data from ZooKeeper, effectively bringing down all
+    Storm daemons.
+    :return:
+    """
+    import params
+
+    Logger.info('Clearing Storm data from ZooKeeper')
+
+    storm_zookeeper_root_dir = params.storm_zookeeper_root_dir
+    if storm_zookeeper_root_dir is None:
+      raise Fail("The storm ZooKeeper directory specified by storm-site/storm.zookeeper.root must be specified")
+
+    # The zookeeper client must be given a zookeeper host to contact. Guaranteed to have at least one host.
+    storm_zookeeper_server_list = yaml_utils.get_values_from_yaml_array(params.storm_zookeeper_servers)
+    if storm_zookeeper_server_list is None:
+      Logger.info("Unable to extract ZooKeeper hosts from '{0}', assuming localhost").format(params.storm_zookeeper_servers)
+      storm_zookeeper_server_list = ["localhost"]
+
+    # For every zk server, try to remove /storm
+    zookeeper_data_cleared = False
+    for storm_zookeeper_server in storm_zookeeper_server_list:
+      # Determine where the zkCli.sh shell script is
+      zk_command_location = os.path.join(params.stack_root, "current", "zookeeper-client", "bin", "zkCli.sh")
+      if params.version is not None:
+        zk_command_location = os.path.join(params.stack_root, params.version, "zookeeper", "bin", "zkCli.sh")
+
+      # create the ZooKeeper delete command
+      command = "{0} -server {1}:{2} rmr /storm".format(
+        zk_command_location, storm_zookeeper_server, params.storm_zookeeper_port)
+
+      # clean out ZK
+      try:
+        # the ZK client requires Java to run; ensure it's on the path
+        env_map = {
+          'JAVA_HOME': params.java64_home
+        }
+
+        # AMBARI-12094: if security is enabled, then we need to tell zookeeper where the
+        # JAAS file is located since we don't use kinit directly with STORM
+        if params.security_enabled:
+          env_map['JVMFLAGS'] = "-Djava.security.auth.login.config={0}".format(params.storm_jaas_file)
+
+        Execute(command, user=params.storm_user, environment=env_map,
+          logoutput=True, tries=1)
+
+        zookeeper_data_cleared = True
+        break
+      except:
+        # the command failed, try a different ZK server
+        pass
+
+    # fail if the ZK data could not be cleared
+    if not zookeeper_data_cleared:
+      raise Fail("Unable to clear ZooKeeper Storm data on any of the following ZooKeeper hosts: {0}".format(
+        storm_zookeeper_server_list))
+
+
+  def delete_storm_local_data(self, env):
+    """
+    Deletes Storm data from local directories. This will create a marker file
+    with JSON data representing the upgrade stack and request/stage ID. This
+    will prevent multiple Storm components on the same host from removing
+    the local directories more than once.
+    :return:
+    """
+    import params
+
+    Logger.info('Clearing Storm data from local directories...')
+
+    storm_local_directory = params.local_dir
+    if storm_local_directory is None:
+      raise Fail("The storm local directory specified by storm-site/storm.local.dir must be specified")
+
+    request_id = default("/requestId", None)
+
+    stack_name = params.stack_name
+    stack_version = params.version
+    upgrade_direction = params.upgrade_direction
+
+    json_map = {}
+    json_map["requestId"] = request_id
+    json_map["stackName"] = stack_name
+    json_map["stackVersion"] = stack_version
+    json_map["direction"] = upgrade_direction
+
+    temp_directory = params.tmp_dir
+    marker_file = os.path.join(temp_directory, "storm-upgrade-{0}.json".format(stack_version))
+    Logger.info("Marker file for upgrade/downgrade of Storm, {0}".format(marker_file))
+
+    if os.path.exists(marker_file):
+      Logger.info("The marker file exists.")
+      try:
+        with open(marker_file) as file_pointer:
+          existing_json_map = json.load(file_pointer)
+
+        if cmp(json_map, existing_json_map) == 0:
+          Logger.info("The storm upgrade has already removed the local directories for {0}-{1} for "
+                      "request {2} and direction {3}. Nothing else to do.".format(stack_name, stack_version, request_id, upgrade_direction))
+
+          # Nothing else to do here for this as it appears to have already been
+          # removed by another component being upgraded
+          return
+        else:
+          Logger.info("The marker file differs from the new value. Will proceed to delete Storm local dir, "
+                      "and generate new file. Current marker file: {0}".format(str(existing_json_map)))
+      except Exception, e:
+        Logger.error("The marker file {0} appears to be corrupt; removing it. Error: {1}".format(marker_file, str(e)))
+        File(marker_file, action="delete")
+    else:
+      Logger.info('The marker file {0} does not exist; will attempt to delete local Storm directory if it exists.'.format(marker_file))
+
+    # Delete from local directory
+    if os.path.isdir(storm_local_directory):
+      Logger.info("Deleting storm local directory, {0}".format(storm_local_directory))
+      Directory(storm_local_directory, action="delete", create_parents = True)
+
+    # Recreate storm local directory
+    Logger.info("Recreating storm local directory, {0}".format(storm_local_directory))
+    Directory(storm_local_directory, mode=0755, owner=params.storm_user,
+      group=params.user_group, create_parents = True)
+
+    # The file doesn't exist, so create it
+    Logger.info("Saving marker file to {0} with contents: {1}".format(marker_file, str(json_map)))
+    with open(marker_file, 'w') as file_pointer:
+      json.dump(json_map, file_pointer, indent=2)
+
+if __name__ == "__main__":
+  StormUpgrade().execute()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py
new file mode 100644
index 0000000..9d78e71
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+import resource_management
+
+from ambari_commons.yaml_utils import escape_yaml_property
+from resource_management.core.source import InlineTemplate
+from resource_management.core.resources.system import File
+
+def replace_jaas_placeholder(name, security_enabled, conf_dir):
+  if name.find('_JAAS_PLACEHOLDER') > -1:
+    if security_enabled:
+      return name.replace('_JAAS_PLACEHOLDER', '-Djava.security.auth.login.config=' + conf_dir + '/storm_jaas.conf')
+    else:
+      return name.replace('_JAAS_PLACEHOLDER', '')
+  else:
+    return name
+
+storm_yaml_template = """{% for key, value in configurations|dictsort if not key.startswith('_') %}{{key}} : {{ escape_yaml_property(replace_jaas_placeholder(resource_management.core.source.InlineTemplate(value).get_content().strip(), security_enabled, conf_dir)) }}
+{% endfor %}"""
+
+def yaml_config_template(configurations):
+  return InlineTemplate(storm_yaml_template, configurations=configurations,
+                        extra_imports=[escape_yaml_property, replace_jaas_placeholder, resource_management,
+                                       resource_management.core, resource_management.core.source])
+
+def yaml_config(filename, configurations = None, conf_dir = None, owner = None, group = None):
+  import params
+  config_content = InlineTemplate('''{% for key, value in configurations_dict|dictsort %}{{ key }}: {{ escape_yaml_property(resource_management.core.source.InlineTemplate(value).get_content()) }}
+{% endfor %}''', configurations_dict=configurations, extra_imports=[escape_yaml_property, resource_management, resource_management.core, resource_management.core.source])
+
+  File (os.path.join(params.conf_dir, filename),
+        content = config_content,
+        owner = owner,
+        mode = "f"
+  )

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py
new file mode 100644
index 0000000..ec3f533
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+from resource_management.libraries.functions import check_process_status
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import format
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from storm import storm
+from service import service
+from ambari_commons import OSConst
+from ambari_commons.os_family_impl import OsFamilyImpl
+from resource_management.core.resources.service import Service
+
+
+class Supervisor(Script):
+  def get_component_name(self):
+    return "storm-supervisor"
+
+  def install(self, env):
+    self.install_packages(env)
+    self.configure(env)
+
+  def configure(self, env):
+    import params
+    env.set_params(params)
+    storm("supervisor")
+
+
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class SupervisorWindows(Supervisor):
+  def start(self, env):
+    import status_params
+    env.set_params(status_params)
+    self.configure(env)
+    Service(status_params.supervisor_win_service_name, action="start")
+
+  def stop(self, env):
+    import status_params
+    env.set_params(status_params)
+    Service(status_params.supervisor_win_service_name, action="stop")
+
+  def status(self, env):
+    import status_params
+    from resource_management.libraries.functions.windows_service_utils import check_windows_service_status
+    env.set_params(status_params)
+    check_windows_service_status(status_params.supervisor_win_service_name)
+
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class SupervisorDefault(Supervisor):
+
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+
+    if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
+      conf_select.select(params.stack_name, "storm", params.version)
+      stack_select.select("storm-client", params.version)
+      stack_select.select("storm-supervisor", params.version)
+
+  def start(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    self.configure(env)
+
+    service("supervisor", action="start")
+    service("logviewer", action="start")
+
+  def stop(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+
+    service("supervisor", action="stop")
+    service("logviewer", action="stop")
+
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    check_process_status(status_params.pid_supervisor)
+
+  def get_log_folder(self):
+    import params
+    return params.log_dir
+  
+  def get_user(self):
+    import params
+    return params.storm_user
+
+  def get_pid_files(self):
+    import status_params
+    return [status_params.pid_supervisor]
+
+if __name__ == "__main__":
+  Supervisor().execute()
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py
new file mode 100644
index 0000000..d6c3545
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py
@@ -0,0 +1,84 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+from storm import storm
+from service import service
+from supervisord_service import supervisord_service, supervisord_check_status
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import format
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+
+
+class Supervisor(Script):
+
+  def get_component_name(self):
+    return "storm-supervisor"
+
+  def install(self, env):
+    self.install_packages(env)
+    self.configure(env)
+
+  def configure(self, env):
+    import params
+    env.set_params(params)
+    storm()
+
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+
+    if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
+      conf_select.select(params.stack_name, "storm", params.version)
+      stack_select.select("storm-client", params.version)
+      stack_select.select("storm-supervisor", params.version)
+
+  def start(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    self.configure(env)
+
+    supervisord_service("supervisor", action="start")
+    service("logviewer", action="start")
+
+  def stop(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+
+    supervisord_service("supervisor", action="stop")
+    service("logviewer", action="stop")
+
+  def status(self, env):
+    supervisord_check_status("supervisor")
+    
+  def get_log_folder(self):
+    import params
+    return params.log_dir
+  
+  def get_user(self):
+    import params
+    return params.storm_user
+
+if __name__ == "__main__":
+  Supervisor().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py
new file mode 100644
index 0000000..6ff9f9c
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.format import format
+
+def supervisord_service(component_name, action):
+  Execute(format("supervisorctl {action} storm-{component_name}"),
+    wait_for_finish=False
+  )
+
+def supervisord_check_status(component_name):
+  try:
+    Execute(format("supervisorctl status storm-{component_name} | grep RUNNING"))
+  except Fail:
+    raise ComponentIsNotRunning() 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py
new file mode 100644
index 0000000..e257ef9
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py
@@ -0,0 +1,137 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+from storm import storm
+from service import service
+from service_check import ServiceCheck
+from resource_management.libraries.functions import check_process_status
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import format
+from resource_management.core.resources.system import Link
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.security_commons import build_expectations, \
+  cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
+  FILE_TYPE_JAAS_CONF
+from setup_ranger_storm import setup_ranger_storm
+from ambari_commons import OSConst
+from ambari_commons.os_family_impl import OsFamilyImpl
+from resource_management.core.resources.service import Service
+
+
+class UiServer(Script):
+
+  def get_component_name(self):
+    return "storm-client"
+
+  def install(self, env):
+    self.install_packages(env)
+    self.configure(env)
+
+  def configure(self, env):
+    import params
+    env.set_params(params)
+    storm("ui")
+
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class UiServerWindows(UiServer):
+  def start(self, env):
+    import status_params
+    env.set_params(status_params)
+    self.configure(env)
+    Service(status_params.ui_win_service_name, action="start")
+
+  def stop(self, env):
+    import status_params
+    env.set_params(status_params)
+    Service(status_params.ui_win_service_name, action="stop")
+
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    from resource_management.libraries.functions.windows_service_utils import check_windows_service_status
+    check_windows_service_status(status_params.ui_win_service_name)
+
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class UiServerDefault(UiServer):
+
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
+      conf_select.select(params.stack_name, "storm", params.version)
+      stack_select.select("storm-client", params.version)
+
+  def link_metrics_sink_jar(self):
+    import params
+    # Add storm metrics reporter JAR to storm-ui-server classpath.
+    # Remove symlinks. They can be there, if you doing upgrade from HDP < 2.2 to HDP >= 2.2
+    Link(format("{storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+         action="delete")
+    # On old HDP 2.1 versions, this symlink may also exist and break EU to newer versions
+    Link("/usr/lib/storm/lib/ambari-metrics-storm-sink.jar", action="delete")
+
+    if check_stack_feature(StackFeature.STORM_METRICS_APACHE_CLASSES, params.version_for_stack_feature_checks):
+      sink_jar = params.metric_collector_sink_jar
+    else:
+      sink_jar = params.metric_collector_legacy_sink_jar
+
+    Execute(format("{sudo} ln -s {sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+            not_if=format("ls {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+            only_if=format("ls {sink_jar}")
+            )
+
+  def start(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    self.configure(env)
+    self.link_metrics_sink_jar()
+    setup_ranger_storm(upgrade_type=upgrade_type)
+    service("ui", action="start")
+
+  def stop(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    service("ui", action="stop")
+
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    check_process_status(status_params.pid_ui)
+      
+  def get_log_folder(self):
+    import params
+    return params.log_dir
+  
+  def get_user(self):
+    import params
+    return params.storm_user
+
+  def get_pid_files(self):
+    import status_params
+    return [status_params.pid_ui]
+
+if __name__ == "__main__":
+  UiServer().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2 b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2
new file mode 100644
index 0000000..b061cd1
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2
@@ -0,0 +1,33 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+StormClient {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="{{nimbus_bare_jaas_principal}}";
+};
+
+{% if kafka_bare_jaas_principal %}
+KafkaClient {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="{{kafka_bare_jaas_principal}}";
+};
+{% endif %}