You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2017/05/24 02:39:22 UTC
[05/50] [abbrv] ambari git commit: AMBARI-21048. HDP 3.0 TP - create
service definition for Storm with configs, kerberos, widgets, etc.(vbrodetsky)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py
new file mode 100644
index 0000000..78ec165
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py
@@ -0,0 +1,424 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+import re
+import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
+
+import status_params
+
+from ambari_commons.constants import AMBARI_SUDO_BINARY
+from ambari_commons import yaml_utils
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions.default import default
+from resource_management.libraries.functions.get_bare_principal import get_bare_principal
+from resource_management.libraries.script import Script
+from resource_management.libraries.resources.hdfs_resource import HdfsResource
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import get_kinit_path
+from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions.stack_features import get_stack_feature_version
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.expect import expect
+from resource_management.libraries.functions.setup_atlas_hook import has_atlas_in_cluster
+from resource_management.libraries.functions import is_empty
+from ambari_commons.ambari_metrics_helper import select_metric_collector_hosts_from_hostnames
+from resource_management.libraries.functions.setup_ranger_plugin_xml import get_audit_configs, generate_ranger_service_config
+
+# server configurations
+config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
+stack_root = status_params.stack_root
+sudo = AMBARI_SUDO_BINARY
+
+limits_conf_dir = "/etc/security/limits.d"
+
+# Needed since this is an Atlas Hook service.
+cluster_name = config['clusterName']
+
+stack_name = status_params.stack_name
+upgrade_direction = default("/commandParams/upgrade_direction", None)
+version = default("/commandParams/version", None)
+
+agent_stack_retry_on_unavailability = config['hostLevelParams']['agent_stack_retry_on_unavailability']
+agent_stack_retry_count = expect("/hostLevelParams/agent_stack_retry_count", int)
+
+storm_component_home_dir = status_params.storm_component_home_dir
+conf_dir = status_params.conf_dir
+
+stack_version_unformatted = status_params.stack_version_unformatted
+stack_version_formatted = status_params.stack_version_formatted
+stack_supports_ru = stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted)
+stack_supports_storm_kerberos = stack_version_formatted and check_stack_feature(StackFeature.STORM_KERBEROS, stack_version_formatted)
+stack_supports_storm_ams = stack_version_formatted and check_stack_feature(StackFeature.STORM_AMS, stack_version_formatted)
+stack_supports_core_site_for_ranger_plugin = check_stack_feature(StackFeature.CORE_SITE_FOR_RANGER_PLUGINS_SUPPORT, stack_version_formatted)
+
+# get the correct version to use for checking stack features
+version_for_stack_feature_checks = get_stack_feature_version(config)
+
+stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks)
+stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks)
+
+# default hadoop params
+rest_lib_dir = "/usr/lib/storm/contrib/storm-rest"
+storm_bin_dir = "/usr/bin"
+storm_lib_dir = "/usr/lib/storm/lib/"
+
+# hadoop parameters for 2.2+
+if stack_supports_ru:
+ rest_lib_dir = format("{storm_component_home_dir}/contrib/storm-rest")
+ storm_bin_dir = format("{storm_component_home_dir}/bin")
+ storm_lib_dir = format("{storm_component_home_dir}/lib")
+ log4j_dir = format("{storm_component_home_dir}/log4j2")
+
+storm_user = config['configurations']['storm-env']['storm_user']
+log_dir = config['configurations']['storm-env']['storm_log_dir']
+pid_dir = status_params.pid_dir
+local_dir = config['configurations']['storm-site']['storm.local.dir']
+user_group = config['configurations']['cluster-env']['user_group']
+java64_home = config['hostLevelParams']['java_home']
+jps_binary = format("{java64_home}/bin/jps")
+nimbus_port = config['configurations']['storm-site']['nimbus.thrift.port']
+storm_zookeeper_root_dir = default('/configurations/storm-site/storm.zookeeper.root', None)
+storm_zookeeper_servers = config['configurations']['storm-site']['storm.zookeeper.servers']
+storm_zookeeper_port = config['configurations']['storm-site']['storm.zookeeper.port']
+storm_logs_supported = config['configurations']['storm-env']['storm_logs_supported']
+
+# nimbus.seeds is supported in HDP 2.3.0.0 and higher
+nimbus_seeds_supported = default('/configurations/storm-env/nimbus_seeds_supported', False)
+nimbus_host = default('/configurations/storm-site/nimbus.host', None)
+nimbus_seeds = default('/configurations/storm-site/nimbus.seeds', None)
+default_topology_max_replication_wait_time_sec = default('/configurations/storm-site/topology.max.replication.wait.time.sec.default', -1)
+nimbus_hosts = default("/clusterHostInfo/nimbus_hosts", [])
+default_topology_min_replication_count = default('/configurations/storm-site/topology.min.replication.count.default', 1)
+
+#Calculate topology.max.replication.wait.time.sec and topology.min.replication.count
+if len(nimbus_hosts) > 1:
+ # for HA Nimbus
+ actual_topology_max_replication_wait_time_sec = -1
+ actual_topology_min_replication_count = len(nimbus_hosts) / 2 + 1
+else:
+ # for non-HA Nimbus
+ actual_topology_max_replication_wait_time_sec = default_topology_max_replication_wait_time_sec
+ actual_topology_min_replication_count = default_topology_min_replication_count
+
+if 'topology.max.replication.wait.time.sec.default' in config['configurations']['storm-site']:
+ del config['configurations']['storm-site']['topology.max.replication.wait.time.sec.default']
+if 'topology.min.replication.count.default' in config['configurations']['storm-site']:
+ del config['configurations']['storm-site']['topology.min.replication.count.default']
+
+rest_api_port = "8745"
+rest_api_admin_port = "8746"
+rest_api_conf_file = format("{conf_dir}/config.yaml")
+storm_env_sh_template = config['configurations']['storm-env']['content']
+jmxremote_port = config['configurations']['storm-env']['jmxremote_port']
+
+if 'ganglia_server_host' in config['clusterHostInfo'] and len(config['clusterHostInfo']['ganglia_server_host'])>0:
+ ganglia_installed = True
+ ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
+ ganglia_report_interval = 60
+else:
+ ganglia_installed = False
+
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+
+storm_ui_host = default("/clusterHostInfo/storm_ui_server_hosts", [])
+
+storm_user_nofile_limit = default('/configurations/storm-env/storm_user_nofile_limit', 128000)
+storm_user_nproc_limit = default('/configurations/storm-env/storm_user_noproc_limit', 65536)
+
+if security_enabled:
+ _hostname_lowercase = config['hostname'].lower()
+ _storm_principal_name = config['configurations']['storm-env']['storm_principal_name']
+ storm_jaas_principal = _storm_principal_name.replace('_HOST',_hostname_lowercase)
+ _ambari_principal_name = default('/configurations/cluster-env/ambari_principal_name', None)
+ storm_keytab_path = config['configurations']['storm-env']['storm_keytab']
+
+ if stack_supports_storm_kerberos:
+ storm_ui_keytab_path = config['configurations']['storm-env']['storm_ui_keytab']
+ _storm_ui_jaas_principal_name = config['configurations']['storm-env']['storm_ui_principal_name']
+ storm_ui_jaas_principal = _storm_ui_jaas_principal_name.replace('_HOST',_hostname_lowercase)
+ storm_bare_jaas_principal = get_bare_principal(_storm_principal_name)
+ if _ambari_principal_name:
+ ambari_bare_jaas_principal = get_bare_principal(_ambari_principal_name)
+ _nimbus_principal_name = config['configurations']['storm-env']['nimbus_principal_name']
+ nimbus_jaas_principal = _nimbus_principal_name.replace('_HOST', _hostname_lowercase)
+ nimbus_bare_jaas_principal = get_bare_principal(_nimbus_principal_name)
+ nimbus_keytab_path = config['configurations']['storm-env']['nimbus_keytab']
+
+kafka_bare_jaas_principal = None
+if stack_supports_storm_kerberos:
+ if security_enabled:
+ storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.secure.transport']
+ # generate KafkaClient jaas config if kafka is kerberoized
+ _kafka_principal_name = default("/configurations/kafka-env/kafka_principal_name", None)
+ kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name)
+ else:
+ storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.nonsecure.transport']
+
+set_instanceId = "false"
+if 'cluster-env' in config['configurations'] and \
+ 'metrics_collector_external_hosts' in config['configurations']['cluster-env']:
+ ams_collector_hosts = config['configurations']['cluster-env']['metrics_collector_external_hosts']
+ set_instanceId = "true"
+else:
+ ams_collector_hosts = ",".join(default("/clusterHostInfo/metrics_collector_hosts", []))
+has_metric_collector = not len(ams_collector_hosts) == 0
+metric_collector_port = None
+if has_metric_collector:
+ if 'cluster-env' in config['configurations'] and \
+ 'metrics_collector_external_port' in config['configurations']['cluster-env']:
+ metric_collector_port = config['configurations']['cluster-env']['metrics_collector_external_port']
+ else:
+ metric_collector_web_address = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:6188")
+ if metric_collector_web_address.find(':') != -1:
+ metric_collector_port = metric_collector_web_address.split(':')[1]
+ else:
+ metric_collector_port = '6188'
+
+ metric_collector_report_interval = 60
+ metric_collector_app_id = "nimbus"
+ if default("/configurations/ams-site/timeline.metrics.service.http.policy", "HTTP_ONLY") == "HTTPS_ONLY":
+ metric_collector_protocol = 'https'
+ else:
+ metric_collector_protocol = 'http'
+ metric_truststore_path= default("/configurations/ams-ssl-client/ssl.client.truststore.location", "")
+ metric_truststore_type= default("/configurations/ams-ssl-client/ssl.client.truststore.type", "")
+ metric_truststore_password= default("/configurations/ams-ssl-client/ssl.client.truststore.password", "")
+ pass
+metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
+metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-with-common-*.jar"
+metric_collector_legacy_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar"
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
+
+# Cluster Zookeeper quorum
+zookeeper_quorum = ""
+if storm_zookeeper_servers:
+ storm_zookeeper_servers_list = yaml_utils.get_values_from_yaml_array(storm_zookeeper_servers)
+ zookeeper_quorum = (":" + storm_zookeeper_port + ",").join(storm_zookeeper_servers_list)
+ zookeeper_quorum += ":" + storm_zookeeper_port
+
+jar_jvm_opts = ''
+
+########################################################
+############# Atlas related params #####################
+########################################################
+#region Atlas Hooks
+storm_atlas_application_properties = default('/configurations/storm-atlas-application.properties', {})
+enable_atlas_hook = default('/configurations/storm-env/storm.atlas.hook', False)
+atlas_hook_filename = default('/configurations/atlas-env/metadata_conf_file', 'atlas-application.properties')
+
+if enable_atlas_hook:
+ # Only append /etc/atlas/conf to classpath if on HDP 2.4.*
+ if check_stack_feature(StackFeature.ATLAS_CONF_DIR_IN_PATH, stack_version_formatted):
+ atlas_conf_dir = format('{stack_root}/current/atlas-server/conf')
+ jar_jvm_opts += '-Datlas.conf=' + atlas_conf_dir
+#endregion
+
+storm_ui_port = config['configurations']['storm-site']['ui.port']
+
+#Storm log4j properties
+storm_a1_maxfilesize = default('/configurations/storm-cluster-log4j/storm_a1_maxfilesize', 100)
+storm_a1_maxbackupindex = default('/configurations/storm-cluster-log4j/storm_a1_maxbackupindex', 9)
+storm_wrkr_a1_maxfilesize = default('/configurations/storm-worker-log4j/storm_wrkr_a1_maxfilesize', 100)
+storm_wrkr_a1_maxbackupindex = default('/configurations/storm-worker-log4j/storm_wrkr_a1_maxbackupindex', 9)
+storm_wrkr_out_maxfilesize = default('/configurations/storm-worker-log4j/storm_wrkr_out_maxfilesize', 100)
+storm_wrkr_out_maxbackupindex = default('/configurations/storm-worker-log4j/storm_wrkr_out_maxbackupindex', 4)
+storm_wrkr_err_maxfilesize = default('/configurations/storm-worker-log4j/storm_wrkr_err_maxfilesize', 100)
+storm_wrkr_err_maxbackupindex = default('/configurations/storm-worker-log4j/storm_wrkr_err_maxbackupindex', 4)
+
+storm_cluster_log4j_content = config['configurations']['storm-cluster-log4j']['content']
+storm_worker_log4j_content = config['configurations']['storm-worker-log4j']['content']
+
+# some commands may need to supply the JAAS location when running as storm
+storm_jaas_file = format("{conf_dir}/storm_jaas.conf")
+
+# for curl command in ranger plugin to get db connector
+jdk_location = config['hostLevelParams']['jdk_location']
+
+# ranger storm plugin start section
+
+# ranger host
+ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", [])
+has_ranger_admin = not len(ranger_admin_hosts) == 0
+
+# ranger support xml_configuration flag, instead of depending on ranger xml_configurations_supported/ranger-env, using stack feature
+xml_configurations_supported = check_stack_feature(StackFeature.RANGER_XML_CONFIGURATION, version_for_stack_feature_checks)
+
+# ambari-server hostname
+ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0]
+
+# ranger storm plugin enabled property
+enable_ranger_storm = default("/configurations/ranger-storm-plugin-properties/ranger-storm-plugin-enabled", "No")
+enable_ranger_storm = True if enable_ranger_storm.lower() == 'yes' else False
+
+# ranger storm properties
+if enable_ranger_storm:
+ # get ranger policy url
+ policymgr_mgr_url = config['configurations']['admin-properties']['policymgr_external_url']
+ if xml_configurations_supported:
+ policymgr_mgr_url = config['configurations']['ranger-storm-security']['ranger.plugin.storm.policy.rest.url']
+
+ if not is_empty(policymgr_mgr_url) and policymgr_mgr_url.endswith('/'):
+ policymgr_mgr_url = policymgr_mgr_url.rstrip('/')
+
+ # ranger audit db user
+ xa_audit_db_user = default('/configurations/admin-properties/audit_db_user', 'rangerlogger')
+
+ # ranger storm service name
+ repo_name = str(config['clusterName']) + '_storm'
+ repo_name_value = config['configurations']['ranger-storm-security']['ranger.plugin.storm.service.name']
+ if not is_empty(repo_name_value) and repo_name_value != "{{repo_name}}":
+ repo_name = repo_name_value
+
+ common_name_for_certificate = config['configurations']['ranger-storm-plugin-properties']['common.name.for.certificate']
+ repo_config_username = config['configurations']['ranger-storm-plugin-properties']['REPOSITORY_CONFIG_USERNAME']
+
+ # ranger-env config
+ ranger_env = config['configurations']['ranger-env']
+
+ # create ranger-env config having external ranger credential properties
+ if not has_ranger_admin and enable_ranger_storm:
+ external_admin_username = default('/configurations/ranger-storm-plugin-properties/external_admin_username', 'admin')
+ external_admin_password = default('/configurations/ranger-storm-plugin-properties/external_admin_password', 'admin')
+ external_ranger_admin_username = default('/configurations/ranger-storm-plugin-properties/external_ranger_admin_username', 'amb_ranger_admin')
+ external_ranger_admin_password = default('/configurations/ranger-storm-plugin-properties/external_ranger_admin_password', 'amb_ranger_admin')
+ ranger_env = {}
+ ranger_env['admin_username'] = external_admin_username
+ ranger_env['admin_password'] = external_admin_password
+ ranger_env['ranger_admin_username'] = external_ranger_admin_username
+ ranger_env['ranger_admin_password'] = external_ranger_admin_password
+
+ ranger_plugin_properties = config['configurations']['ranger-storm-plugin-properties']
+ policy_user = storm_user
+ repo_config_password = config['configurations']['ranger-storm-plugin-properties']['REPOSITORY_CONFIG_PASSWORD']
+
+ xa_audit_db_password = ''
+ if not is_empty(config['configurations']['admin-properties']['audit_db_password']) and stack_supports_ranger_audit_db and has_ranger_admin:
+ xa_audit_db_password = config['configurations']['admin-properties']['audit_db_password']
+
+ repo_config_password = config['configurations']['ranger-storm-plugin-properties']['REPOSITORY_CONFIG_PASSWORD']
+
+ downloaded_custom_connector = None
+ previous_jdbc_jar_name = None
+ driver_curl_source = None
+ driver_curl_target = None
+ previous_jdbc_jar = None
+
+ if has_ranger_admin and stack_supports_ranger_audit_db:
+ xa_audit_db_flavor = config['configurations']['admin-properties']['DB_FLAVOR']
+ jdbc_jar_name, previous_jdbc_jar_name, audit_jdbc_url, jdbc_driver = get_audit_configs(config)
+
+ downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+ driver_curl_source = format("{jdk_location}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+ driver_curl_target = format("{storm_component_home_dir}/lib/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+ previous_jdbc_jar = format("{storm_component_home_dir}/lib/{previous_jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+ sql_connector_jar = ''
+
+ storm_ranger_plugin_config = {
+ 'username': repo_config_username,
+ 'password': repo_config_password,
+ 'nimbus.url': 'http://' + storm_ui_host[0].lower() + ':' + str(storm_ui_port),
+ 'commonNameForCertificate': common_name_for_certificate
+ }
+
+ storm_ranger_plugin_repo = {
+ 'isActive': 'true',
+ 'config': json.dumps(storm_ranger_plugin_config),
+ 'description': 'storm repo',
+ 'name': repo_name,
+ 'repositoryType': 'storm',
+ 'assetType': '6'
+ }
+
+ custom_ranger_service_config = generate_ranger_service_config(ranger_plugin_properties)
+ if len(custom_ranger_service_config) > 0:
+ storm_ranger_plugin_config.update(custom_ranger_service_config)
+
+ if stack_supports_ranger_kerberos and security_enabled:
+ policy_user = format('{storm_user},{storm_bare_jaas_principal}')
+ storm_ranger_plugin_config['policy.download.auth.users'] = policy_user
+ storm_ranger_plugin_config['tag.download.auth.users'] = policy_user
+ storm_ranger_plugin_config['ambari.service.check.user'] = policy_user
+
+ storm_ranger_plugin_repo = {
+ 'isEnabled': 'true',
+ 'configs': storm_ranger_plugin_config,
+ 'description': 'storm repo',
+ 'name': repo_name,
+ 'type': 'storm'
+ }
+
+ ranger_storm_principal = None
+ ranger_storm_keytab = None
+ if stack_supports_ranger_kerberos and security_enabled:
+ ranger_storm_principal = storm_jaas_principal
+ ranger_storm_keytab = storm_keytab_path
+
+ xa_audit_db_is_enabled = False
+ if xml_configurations_supported and stack_supports_ranger_audit_db:
+ xa_audit_db_is_enabled = config['configurations']['ranger-storm-audit']['xasecure.audit.destination.db']
+
+ xa_audit_hdfs_is_enabled = default('/configurations/ranger-storm-audit/xasecure.audit.destination.hdfs', False)
+ ssl_keystore_password = config['configurations']['ranger-storm-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password'] if xml_configurations_supported else None
+ ssl_truststore_password = config['configurations']['ranger-storm-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password'] if xml_configurations_supported else None
+ credential_file = format('/etc/ranger/{repo_name}/cred.jceks')
+
+ # for SQLA explicitly disable audit to DB for Ranger
+ if has_ranger_admin and stack_supports_ranger_audit_db and xa_audit_db_flavor.lower() == 'sqla':
+ xa_audit_db_is_enabled = False
+
+# ranger storm plugin end section
+
+namenode_hosts = default("/clusterHostInfo/namenode_host", [])
+has_namenode = not len(namenode_hosts) == 0
+
+availableServices = config['availableServices']
+
+hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if has_namenode else None
+hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] if has_namenode else None
+hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode else None
+hdfs_site = config['configurations']['hdfs-site'] if has_namenode else None
+default_fs = config['configurations']['core-site']['fs.defaultFS'] if has_namenode else None
+hadoop_bin_dir = stack_select.get_hadoop_dir("bin") if has_namenode else None
+hadoop_conf_dir = conf_select.get_hadoop_conf_dir() if has_namenode else None
+kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
+
+import functools
+#create partial functions with common arguments for every HdfsResource call
+#to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code
+HdfsResource = functools.partial(
+ HdfsResource,
+ user=hdfs_user,
+ hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
+ security_enabled = security_enabled,
+ keytab = hdfs_user_keytab,
+ kinit_path_local = kinit_path_local,
+ hadoop_bin_dir = hadoop_bin_dir,
+ hadoop_conf_dir = hadoop_conf_dir,
+ principal_name = hdfs_principal_name,
+ hdfs_site = hdfs_site,
+ default_fs = default_fs,
+ immutable_paths = get_not_managed_resources()
+)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py
new file mode 100644
index 0000000..a758375
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_windows.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from status_params import *
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.default import default
+
+# server configurations
+config = Script.get_config()
+
+stack_is_hdp23_or_further = Script.is_stack_greater_or_equal("2.3")
+
+stack_root = os.path.abspath(os.path.join(os.environ["HADOOP_HOME"],".."))
+conf_dir = os.environ["STORM_CONF_DIR"]
+hadoop_user = config["configurations"]["cluster-env"]["hadoop.user.name"]
+storm_user = hadoop_user
+
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+default_topology_max_replication_wait_time_sec = default('/configurations/storm-site/topology.max.replication.wait.time.sec.default', -1)
+nimbus_hosts = default("/clusterHostInfo/nimbus_hosts", [])
+default_topology_min_replication_count = default('/configurations/storm-site/topology.min.replication.count.default', 1)
+
+#Calculate topology.max.replication.wait.time.sec and topology.min.replication.count
+if len(nimbus_hosts) > 1:
+ # for HA Nimbus
+ actual_topology_max_replication_wait_time_sec = -1
+ actual_topology_min_replication_count = len(nimbus_hosts) / 2 + 1
+else:
+ # for non-HA Nimbus
+ actual_topology_max_replication_wait_time_sec = default_topology_max_replication_wait_time_sec
+ actual_topology_min_replication_count = default_topology_min_replication_count
+
+if stack_is_hdp23_or_further:
+ if security_enabled:
+ storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.secure.transport']
+ else:
+ storm_thrift_transport = config['configurations']['storm-site']['_storm.thrift.nonsecure.transport']
+
+service_map = {
+ "nimbus" : nimbus_win_service_name,
+ "supervisor" : supervisor_win_service_name,
+ "ui" : ui_win_service_name
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py
new file mode 100644
index 0000000..f9b3b80
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/rest_api.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+from resource_management.libraries.functions import check_process_status
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import format
+from resource_management.core.resources.system import Execute
+
+from storm import storm
+from service import service
+from service_check import ServiceCheck
+
+
+class StormRestApi(Script):
+ """
+ Storm REST API.
+ It was available in HDP 2.0 and 2.1.
+ In HDP 2.2, it was removed since the functionality was moved to Storm UI Server.
+ """
+
+ def get_component_name(self):
+ return "storm-client"
+
+ def install(self, env):
+ self.install_packages(env)
+ self.configure(env)
+
+ def configure(self, env):
+ import params
+ env.set_params(params)
+
+ storm()
+
+ def start(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+ self.configure(env)
+
+ service("rest_api", action="start")
+
+ def stop(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ service("rest_api", action="stop")
+
+ def status(self, env):
+ import status_params
+ env.set_params(status_params)
+ check_process_status(status_params.pid_rest_api)
+
+ def get_log_folder(self):
+ import params
+ return params.log_dir
+
+ def get_user(self):
+ import params
+ return params.storm_user
+
+ def get_pid_files(self):
+ import status_params
+ return [status_params.pid_rest_api]
+
+if __name__ == "__main__":
+ StormRestApi().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py
new file mode 100644
index 0000000..b5e5cd5
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import os
+
+from resource_management.core.resources import Execute
+from resource_management.core.resources import File
+from resource_management.core.shell import as_user
+from resource_management.core import shell
+from resource_management.core.logger import Logger
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions import get_user_call_output
+from resource_management.libraries.functions.show_logs import show_logs
+import time
+
+
+def service(name, action = 'start'):
+ import params
+ import status_params
+
+ pid_file = status_params.pid_files[name]
+ no_op_test = as_user(format(
+ "ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1"), user=params.storm_user)
+
+ if name == 'ui':
+ process_grep = "storm.ui.core$"
+ elif name == "rest_api":
+ process_grep = format("{rest_lib_dir}/storm-rest-.*\.jar$")
+ else:
+ process_grep = format("storm.daemon.{name}$")
+
+ find_proc = format("{jps_binary} -l | grep {process_grep}")
+ write_pid = format("{find_proc} | awk {{'print $1'}} > {pid_file}")
+ crt_pid_cmd = format("{find_proc} && {write_pid}")
+ storm_env = format(
+ "source {conf_dir}/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH")
+
+ if action == "start":
+ if name == "rest_api":
+ process_cmd = format(
+ "{storm_env} ; java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server")
+ cmd = format(
+ "{process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log 2>&1")
+ else:
+ # Storm start script gets forked into actual storm java process.
+ # Which means we can use the pid of start script as a pid of start component
+ cmd = format("{storm_env} ; storm {name} > {log_dir}/{name}.out 2>&1")
+
+ cmd = format("{cmd} &\n echo $! > {pid_file}")
+
+ Execute(cmd,
+ not_if = no_op_test,
+ user = params.storm_user,
+ path = params.storm_bin_dir,
+ )
+
+ File(pid_file,
+ owner = params.storm_user,
+ group = params.user_group
+ )
+ elif action == "stop":
+ process_dont_exist = format("! ({no_op_test})")
+ if os.path.exists(pid_file):
+ pid = get_user_call_output.get_user_call_output(format("! test -f {pid_file} || cat {pid_file}"), user=params.storm_user)[1]
+
+ # if multiple processes are running (for example user can start logviewer from console)
+ # there can be more than one id
+ pid = pid.replace("\n", " ")
+
+ Execute(format("{sudo} kill {pid}"),
+ not_if = process_dont_exist)
+
+ Execute(format("{sudo} kill -9 {pid}"),
+ not_if = format(
+ "sleep 2; {process_dont_exist} || sleep 20; {process_dont_exist}"),
+ ignore_failures = True)
+
+ File(pid_file, action = "delete")
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py
new file mode 100644
index 0000000..80ea0f5
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/service_check.py
@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import os
+
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions import get_unique_id_and_date
+from resource_management.core.resources import File
+from resource_management.core.resources import Execute
+from resource_management.libraries.script import Script
+from resource_management.core.source import StaticFile
+from ambari_commons import OSCheck, OSConst
+from ambari_commons.os_family_impl import OsFamilyImpl
+
+class ServiceCheck(Script):
+ pass
+
+
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class ServiceCheckWindows(ServiceCheck):
+ def service_check(self, env):
+ import params
+ env.set_params(params)
+ smoke_cmd = os.path.join(params.stack_root,"Run-SmokeTests.cmd")
+ service = "STORM"
+ Execute(format("cmd /C {smoke_cmd} {service}", smoke_cmd=smoke_cmd, service=service), user=params.storm_user, logoutput=True)
+
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class ServiceCheckDefault(ServiceCheck):
+ def service_check(self, env):
+ import params
+ env.set_params(params)
+
+ unique = get_unique_id_and_date()
+
+ File("/tmp/wordCount.jar",
+ content=StaticFile("wordCount.jar"),
+ owner=params.storm_user
+ )
+
+ cmd = ""
+ if params.nimbus_seeds_supported:
+ # Because this command is guaranteed to run on one of the hosts with storm client, there is no need
+ # to specify "-c nimbus.seeds={nimbus_seeds}"
+ cmd = format("storm jar /tmp/wordCount.jar storm.starter.WordCountTopology WordCount{unique}")
+ elif params.nimbus_host is not None:
+ cmd = format("storm jar /tmp/wordCount.jar storm.starter.WordCountTopology WordCount{unique} -c nimbus.host={nimbus_host}")
+
+ Execute(cmd,
+ logoutput=True,
+ path=params.storm_bin_dir,
+ user=params.storm_user
+ )
+
+ Execute(format("storm kill WordCount{unique}"),
+ path=params.storm_bin_dir,
+ user=params.storm_user
+ )
+
+if __name__ == "__main__":
+ ServiceCheck().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py
new file mode 100644
index 0000000..c04496e
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/setup_ranger_storm.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management.core.logger import Logger
+from resource_management.libraries.functions.setup_ranger_plugin_xml import setup_core_site_for_required_plugins
+from resource_management.libraries.resources.xml_config import XmlConfig
+from resource_management.libraries.functions.format import format
+from resource_management.core.resources import File, Directory
+
+def setup_ranger_storm(upgrade_type=None):
+ """
+ :param upgrade_type: Upgrade Type such as "rolling" or "nonrolling"
+ """
+ import params
+ if params.enable_ranger_storm and params.security_enabled:
+
+ stack_version = None
+ if upgrade_type is not None:
+ stack_version = params.version
+
+ if params.retryAble:
+ Logger.info("Storm: Setup ranger: command retry enables thus retrying if ranger admin is down !")
+ else:
+ Logger.info("Storm: Setup ranger: command retry not enabled thus skipping if ranger admin is down !")
+
+ if params.xml_configurations_supported and params.enable_ranger_storm and params.xa_audit_hdfs_is_enabled:
+ if params.has_namenode:
+ params.HdfsResource("/ranger/audit",
+ type="directory",
+ action="create_on_execute",
+ owner=params.hdfs_user,
+ group=params.hdfs_user,
+ mode=0755,
+ recursive_chmod=True
+ )
+ params.HdfsResource("/ranger/audit/storm",
+ type="directory",
+ action="create_on_execute",
+ owner=params.storm_user,
+ group=params.storm_user,
+ mode=0700,
+ recursive_chmod=True
+ )
+ params.HdfsResource(None, action="execute")
+
+ if params.xml_configurations_supported:
+ api_version=None
+ if params.stack_supports_ranger_kerberos:
+ api_version='v2'
+ from resource_management.libraries.functions.setup_ranger_plugin_xml import setup_ranger_plugin
+ setup_ranger_plugin('storm-nimbus', 'storm', params.previous_jdbc_jar,
+ params.downloaded_custom_connector, params.driver_curl_source,
+ params.driver_curl_target, params.java64_home,
+ params.repo_name, params.storm_ranger_plugin_repo,
+ params.ranger_env, params.ranger_plugin_properties,
+ params.policy_user, params.policymgr_mgr_url,
+ params.enable_ranger_storm, conf_dict=params.conf_dir,
+ component_user=params.storm_user, component_group=params.user_group, cache_service_list=['storm'],
+ plugin_audit_properties=params.config['configurations']['ranger-storm-audit'], plugin_audit_attributes=params.config['configuration_attributes']['ranger-storm-audit'],
+ plugin_security_properties=params.config['configurations']['ranger-storm-security'], plugin_security_attributes=params.config['configuration_attributes']['ranger-storm-security'],
+ plugin_policymgr_ssl_properties=params.config['configurations']['ranger-storm-policymgr-ssl'], plugin_policymgr_ssl_attributes=params.config['configuration_attributes']['ranger-storm-policymgr-ssl'],
+ component_list=['storm-client', 'storm-nimbus'], audit_db_is_enabled=params.xa_audit_db_is_enabled,
+ credential_file=params.credential_file, xa_audit_db_password=params.xa_audit_db_password,
+ ssl_truststore_password=params.ssl_truststore_password, ssl_keystore_password=params.ssl_keystore_password,
+ stack_version_override = stack_version, skip_if_rangeradmin_down= not params.retryAble,api_version=api_version,
+ is_security_enabled = params.security_enabled,
+ is_stack_supports_ranger_kerberos = params.stack_supports_ranger_kerberos,
+ component_user_principal=params.ranger_storm_principal if params.security_enabled else None,
+ component_user_keytab=params.ranger_storm_keytab if params.security_enabled else None)
+ else:
+ from resource_management.libraries.functions.setup_ranger_plugin import setup_ranger_plugin
+ setup_ranger_plugin('storm-nimbus', 'storm', params.previous_jdbc_jar,
+ params.downloaded_custom_connector, params.driver_curl_source,
+ params.driver_curl_target, params.java64_home,
+ params.repo_name, params.storm_ranger_plugin_repo,
+ params.ranger_env, params.ranger_plugin_properties,
+ params.policy_user, params.policymgr_mgr_url,
+ params.enable_ranger_storm, conf_dict=params.conf_dir,
+ component_user=params.storm_user, component_group=params.user_group, cache_service_list=['storm'],
+ plugin_audit_properties=params.config['configurations']['ranger-storm-audit'], plugin_audit_attributes=params.config['configuration_attributes']['ranger-storm-audit'],
+ plugin_security_properties=params.config['configurations']['ranger-storm-security'], plugin_security_attributes=params.config['configuration_attributes']['ranger-storm-security'],
+ plugin_policymgr_ssl_properties=params.config['configurations']['ranger-storm-policymgr-ssl'], plugin_policymgr_ssl_attributes=params.config['configuration_attributes']['ranger-storm-policymgr-ssl'],
+ component_list=['storm-client', 'storm-nimbus'], audit_db_is_enabled=params.xa_audit_db_is_enabled,
+ credential_file=params.credential_file, xa_audit_db_password=params.xa_audit_db_password,
+ ssl_truststore_password=params.ssl_truststore_password, ssl_keystore_password=params.ssl_keystore_password,
+ stack_version_override = stack_version, skip_if_rangeradmin_down= not params.retryAble)
+
+
+ site_files_create_path = format('{storm_component_home_dir}/extlib-daemon/ranger-storm-plugin-impl/conf')
+ Directory(site_files_create_path,
+ owner = params.storm_user,
+ group = params.user_group,
+ mode=0775,
+ create_parents = True,
+ cd_access = 'a'
+ )
+
+ if params.stack_supports_core_site_for_ranger_plugin and params.enable_ranger_storm and params.has_namenode and params.security_enabled:
+ Logger.info("Stack supports core-site.xml creation for Ranger plugin, creating create core-site.xml from namenode configuraitions")
+ setup_core_site_for_required_plugins(component_user=params.storm_user,component_group=params.user_group,create_core_site_path = site_files_create_path, config = params.config)
+ if len(params.namenode_hosts) > 1:
+ Logger.info('Ranger Storm plugin is enabled along with security and NameNode is HA , creating hdfs-site.xml')
+ XmlConfig("hdfs-site.xml",
+ conf_dir=site_files_create_path,
+ configurations=params.config['configurations']['hdfs-site'],
+ configuration_attributes=params.config['configuration_attributes']['hdfs-site'],
+ owner=params.storm_user,
+ group=params.user_group,
+ mode=0644
+ )
+ else:
+ Logger.info('Ranger Storm plugin is not enabled or security is disabled, removing hdfs-site.xml')
+ File(format('{site_files_create_path}/hdfs-site.xml'), action="delete")
+ else:
+ Logger.info("Stack does not support core-site.xml creation for Ranger plugin, skipping core-site.xml configurations")
+ else:
+ Logger.info('Ranger Storm plugin is not enabled')
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py
new file mode 100644
index 0000000..d84b095
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/status_params.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions import get_kinit_path
+from resource_management.libraries.functions import default, format
+from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from ambari_commons import OSCheck
+
+# a map of the Ambari role to the component name
+# for use with <stack-root>/current/<component>
+SERVER_ROLE_DIRECTORY_MAP = {
+ 'NIMBUS' : 'storm-nimbus',
+ 'SUPERVISOR' : 'storm-supervisor',
+ 'STORM_UI_SERVER' : 'storm-client',
+ 'DRPC_SERVER' : 'storm-client',
+ 'STORM_SERVICE_CHECK' : 'storm-client'
+}
+
+component_directory = Script.get_component_from_role(SERVER_ROLE_DIRECTORY_MAP, "STORM_SERVICE_CHECK")
+
+config = Script.get_config()
+stack_root = Script.get_stack_root()
+stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
+stack_version_formatted = format_stack_version(stack_version_unformatted)
+
+if OSCheck.is_windows_family():
+ nimbus_win_service_name = "nimbus"
+ supervisor_win_service_name = "supervisor"
+ ui_win_service_name = "ui"
+else:
+ pid_dir = config['configurations']['storm-env']['storm_pid_dir']
+ pid_nimbus = format("{pid_dir}/nimbus.pid")
+ pid_supervisor = format("{pid_dir}/supervisor.pid")
+ pid_drpc = format("{pid_dir}/drpc.pid")
+ pid_ui = format("{pid_dir}/ui.pid")
+ pid_logviewer = format("{pid_dir}/logviewer.pid")
+ pid_rest_api = format("{pid_dir}/restapi.pid")
+
+ pid_files = {
+ "logviewer":pid_logviewer,
+ "ui": pid_ui,
+ "nimbus": pid_nimbus,
+ "supervisor": pid_supervisor,
+ "drpc": pid_drpc,
+ "rest_api": pid_rest_api
+ }
+
+ # Security related/required params
+ hostname = config['hostname']
+ security_enabled = config['configurations']['cluster-env']['security_enabled']
+ kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
+ tmp_dir = Script.get_tmp_dir()
+
+ storm_component_home_dir = "/usr/lib/storm"
+ conf_dir = "/etc/storm/conf"
+ if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
+ storm_component_home_dir = format("{stack_root}/current/{component_directory}")
+ conf_dir = format("{stack_root}/current/{component_directory}/conf")
+
+ storm_user = config['configurations']['storm-env']['storm_user']
+ storm_ui_principal = default('/configurations/storm-env/storm_ui_principal_name', None)
+ storm_ui_keytab = default('/configurations/storm-env/storm_ui_keytab', None)
+
+stack_name = default("/hostLevelParams/stack_name", None)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py
new file mode 100644
index 0000000..99579d2
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm.py
@@ -0,0 +1,182 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.core.exceptions import Fail
+from resource_management.core.resources.service import ServiceConfig
+from resource_management.core.resources.system import Directory, Execute, File, Link
+from resource_management.core.source import InlineTemplate
+from resource_management.libraries.resources.template_config import TemplateConfig
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.script.script import Script
+from resource_management.core.source import Template
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from storm_yaml_utils import yaml_config_template, yaml_config
+from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
+from ambari_commons import OSConst
+from resource_management.libraries.functions.setup_atlas_hook import has_atlas_in_cluster, setup_atlas_hook, setup_atlas_jar_symlinks
+from ambari_commons.constants import SERVICE
+
+
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def storm(name=None):
+ import params
+ yaml_config("storm.yaml",
+ conf_dir=params.conf_dir,
+ configurations=params.config['configurations']['storm-site'],
+ owner=params.storm_user
+ )
+
+ if params.service_map.has_key(name):
+ service_name = params.service_map[name]
+ ServiceConfig(service_name,
+ action="change_user",
+ username = params.storm_user,
+ password = Script.get_password(params.storm_user))
+
+
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+def storm(name=None):
+ import params
+ import os
+
+ Directory(params.log_dir,
+ owner=params.storm_user,
+ group=params.user_group,
+ mode=0777,
+ create_parents = True,
+ cd_access="a",
+ )
+
+ Directory([params.pid_dir, params.local_dir],
+ owner=params.storm_user,
+ group=params.user_group,
+ create_parents = True,
+ cd_access="a",
+ mode=0755,
+ )
+
+ Directory(params.conf_dir,
+ group=params.user_group,
+ create_parents = True,
+ cd_access="a",
+ )
+
+ File(format("{limits_conf_dir}/storm.conf"),
+ owner='root',
+ group='root',
+ mode=0644,
+ content=Template("storm.conf.j2")
+ )
+
+ File(format("{conf_dir}/config.yaml"),
+ content=Template("config.yaml.j2"),
+ owner=params.storm_user,
+ group=params.user_group
+ )
+
+ configurations = params.config['configurations']['storm-site']
+
+ File(format("{conf_dir}/storm.yaml"),
+ content=yaml_config_template(configurations),
+ owner=params.storm_user,
+ group=params.user_group
+ )
+
+ File(format("{conf_dir}/storm-env.sh"),
+ owner=params.storm_user,
+ content=InlineTemplate(params.storm_env_sh_template)
+ )
+
+ # Generate atlas-application.properties.xml file and symlink the hook jars
+ if params.enable_atlas_hook:
+ atlas_hook_filepath = os.path.join(params.conf_dir, params.atlas_hook_filename)
+ setup_atlas_hook(SERVICE.STORM, params.storm_atlas_application_properties, atlas_hook_filepath, params.storm_user, params.user_group)
+ storm_extlib_dir = os.path.join(params.storm_component_home_dir, "extlib")
+ setup_atlas_jar_symlinks("storm", storm_extlib_dir)
+
+ if params.has_metric_collector:
+ File(format("{conf_dir}/storm-metrics2.properties"),
+ owner=params.storm_user,
+ group=params.user_group,
+ content=Template("storm-metrics2.properties.j2")
+ )
+
+ # Remove symlinks. They can be there, if you doing upgrade from HDP < 2.2 to HDP >= 2.2
+ Link(format("{storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+ action="delete")
+ # On old HDP 2.1 versions, this symlink may also exist and break EU to newer versions
+ Link("/usr/lib/storm/lib/ambari-metrics-storm-sink.jar", action="delete")
+
+ if check_stack_feature(StackFeature.STORM_METRICS_APACHE_CLASSES, params.version_for_stack_feature_checks):
+ sink_jar = params.metric_collector_sink_jar
+ else:
+ sink_jar = params.metric_collector_legacy_sink_jar
+
+ Execute(format("{sudo} ln -s {sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+ not_if=format("ls {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+ only_if=format("ls {sink_jar}")
+ )
+
+ if params.storm_logs_supported:
+ Directory(params.log4j_dir,
+ owner=params.storm_user,
+ group=params.user_group,
+ mode=0755,
+ create_parents = True
+ )
+
+ File(format("{log4j_dir}/cluster.xml"),
+ owner=params.storm_user,
+ content=InlineTemplate(params.storm_cluster_log4j_content)
+ )
+ File(format("{log4j_dir}/worker.xml"),
+ owner=params.storm_user,
+ content=InlineTemplate(params.storm_worker_log4j_content)
+ )
+
+ if params.security_enabled:
+ TemplateConfig(format("{conf_dir}/storm_jaas.conf"),
+ owner=params.storm_user
+ )
+ if params.stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.stack_version_formatted):
+ TemplateConfig(format("{conf_dir}/client_jaas.conf"),
+ owner=params.storm_user
+ )
+ minRuid = configurations['_storm.min.ruid'] if configurations.has_key('_storm.min.ruid') else ''
+
+ min_user_ruid = int(minRuid) if minRuid.isdigit() else _find_real_user_min_uid()
+
+ File(format("{conf_dir}/worker-launcher.cfg"),
+ content=Template("worker-launcher.cfg.j2", min_user_ruid = min_user_ruid),
+ owner='root',
+ group=params.user_group
+ )
+
+
+'''
+Finds minimal real user UID
+'''
+def _find_real_user_min_uid():
+ with open('/etc/login.defs') as f:
+ for line in f:
+ if line.strip().startswith('UID_MIN') and len(line.split()) == 2 and line.split()[1].isdigit():
+ return int(line.split()[1])
+ raise Fail("Unable to find UID_MIN in file /etc/login.defs. Expecting format e.g.: 'UID_MIN 500'")
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py
new file mode 100644
index 0000000..bc245c4
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_upgrade.py
@@ -0,0 +1,177 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
+import os
+
+from ambari_commons import yaml_utils
+from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+from resource_management.core.resources.system import Directory
+from resource_management.core.resources.system import File
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.default import default
+from resource_management.libraries.functions.format import format
+
+class StormUpgrade(Script):
+ """
+ Applies to Rolling/Express Upgrade from HDP 2.1 or 2.2 to 2.3 or higher.
+
+ Requirements: Needs to run from a host with ZooKeeper Client.
+
+ This class helps perform some of the upgrade tasks needed for Storm during
+ a Rolling or Express upgrade. Storm writes data to disk locally and to ZooKeeper.
+ If any HDP 2.1 or 2.2 bits exist in these directories when an HDP 2.3 instance
+ starts up, it will fail to start properly. Because the upgrade framework in
+ Ambari doesn't yet have a mechanism to say "stop all" before starting to
+ upgrade each component, we need to rely on a Storm trick to bring down
+ running daemons. By removing the ZooKeeper data with running daemons, those
+ daemons will die.
+ """
+
+ def delete_storm_zookeeper_data(self, env):
+ """
+ Deletes the Storm data from ZooKeeper, effectively bringing down all
+ Storm daemons.
+ :return:
+ """
+ import params
+
+ Logger.info('Clearing Storm data from ZooKeeper')
+
+ storm_zookeeper_root_dir = params.storm_zookeeper_root_dir
+ if storm_zookeeper_root_dir is None:
+ raise Fail("The storm ZooKeeper directory specified by storm-site/storm.zookeeper.root must be specified")
+
+ # The zookeeper client must be given a zookeeper host to contact. Guaranteed to have at least one host.
+ storm_zookeeper_server_list = yaml_utils.get_values_from_yaml_array(params.storm_zookeeper_servers)
+ if storm_zookeeper_server_list is None:
+ Logger.info("Unable to extract ZooKeeper hosts from '{0}', assuming localhost").format(params.storm_zookeeper_servers)
+ storm_zookeeper_server_list = ["localhost"]
+
+ # For every zk server, try to remove /storm
+ zookeeper_data_cleared = False
+ for storm_zookeeper_server in storm_zookeeper_server_list:
+ # Determine where the zkCli.sh shell script is
+ zk_command_location = os.path.join(params.stack_root, "current", "zookeeper-client", "bin", "zkCli.sh")
+ if params.version is not None:
+ zk_command_location = os.path.join(params.stack_root, params.version, "zookeeper", "bin", "zkCli.sh")
+
+ # create the ZooKeeper delete command
+ command = "{0} -server {1}:{2} rmr /storm".format(
+ zk_command_location, storm_zookeeper_server, params.storm_zookeeper_port)
+
+ # clean out ZK
+ try:
+ # the ZK client requires Java to run; ensure it's on the path
+ env_map = {
+ 'JAVA_HOME': params.java64_home
+ }
+
+ # AMBARI-12094: if security is enabled, then we need to tell zookeeper where the
+ # JAAS file is located since we don't use kinit directly with STORM
+ if params.security_enabled:
+ env_map['JVMFLAGS'] = "-Djava.security.auth.login.config={0}".format(params.storm_jaas_file)
+
+ Execute(command, user=params.storm_user, environment=env_map,
+ logoutput=True, tries=1)
+
+ zookeeper_data_cleared = True
+ break
+ except:
+ # the command failed, try a different ZK server
+ pass
+
+ # fail if the ZK data could not be cleared
+ if not zookeeper_data_cleared:
+ raise Fail("Unable to clear ZooKeeper Storm data on any of the following ZooKeeper hosts: {0}".format(
+ storm_zookeeper_server_list))
+
+
+ def delete_storm_local_data(self, env):
+ """
+ Deletes Storm data from local directories. This will create a marker file
+ with JSON data representing the upgrade stack and request/stage ID. This
+ will prevent multiple Storm components on the same host from removing
+ the local directories more than once.
+ :return:
+ """
+ import params
+
+ Logger.info('Clearing Storm data from local directories...')
+
+ storm_local_directory = params.local_dir
+ if storm_local_directory is None:
+ raise Fail("The storm local directory specified by storm-site/storm.local.dir must be specified")
+
+ request_id = default("/requestId", None)
+
+ stack_name = params.stack_name
+ stack_version = params.version
+ upgrade_direction = params.upgrade_direction
+
+ json_map = {}
+ json_map["requestId"] = request_id
+ json_map["stackName"] = stack_name
+ json_map["stackVersion"] = stack_version
+ json_map["direction"] = upgrade_direction
+
+ temp_directory = params.tmp_dir
+ marker_file = os.path.join(temp_directory, "storm-upgrade-{0}.json".format(stack_version))
+ Logger.info("Marker file for upgrade/downgrade of Storm, {0}".format(marker_file))
+
+ if os.path.exists(marker_file):
+ Logger.info("The marker file exists.")
+ try:
+ with open(marker_file) as file_pointer:
+ existing_json_map = json.load(file_pointer)
+
+ if cmp(json_map, existing_json_map) == 0:
+ Logger.info("The storm upgrade has already removed the local directories for {0}-{1} for "
+ "request {2} and direction {3}. Nothing else to do.".format(stack_name, stack_version, request_id, upgrade_direction))
+
+ # Nothing else to do here for this as it appears to have already been
+ # removed by another component being upgraded
+ return
+ else:
+ Logger.info("The marker file differs from the new value. Will proceed to delete Storm local dir, "
+ "and generate new file. Current marker file: {0}".format(str(existing_json_map)))
+ except Exception, e:
+ Logger.error("The marker file {0} appears to be corrupt; removing it. Error: {1}".format(marker_file, str(e)))
+ File(marker_file, action="delete")
+ else:
+ Logger.info('The marker file {0} does not exist; will attempt to delete local Storm directory if it exists.'.format(marker_file))
+
+ # Delete from local directory
+ if os.path.isdir(storm_local_directory):
+ Logger.info("Deleting storm local directory, {0}".format(storm_local_directory))
+ Directory(storm_local_directory, action="delete", create_parents = True)
+
+ # Recreate storm local directory
+ Logger.info("Recreating storm local directory, {0}".format(storm_local_directory))
+ Directory(storm_local_directory, mode=0755, owner=params.storm_user,
+ group=params.user_group, create_parents = True)
+
+ # The file doesn't exist, so create it
+ Logger.info("Saving marker file to {0} with contents: {1}".format(marker_file, str(json_map)))
+ with open(marker_file, 'w') as file_pointer:
+ json.dump(json_map, file_pointer, indent=2)
+
+if __name__ == "__main__":
+ StormUpgrade().execute()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py
new file mode 100644
index 0000000..9d78e71
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/storm_yaml_utils.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+import resource_management
+
+from ambari_commons.yaml_utils import escape_yaml_property
+from resource_management.core.source import InlineTemplate
+from resource_management.core.resources.system import File
+
+def replace_jaas_placeholder(name, security_enabled, conf_dir):
+ if name.find('_JAAS_PLACEHOLDER') > -1:
+ if security_enabled:
+ return name.replace('_JAAS_PLACEHOLDER', '-Djava.security.auth.login.config=' + conf_dir + '/storm_jaas.conf')
+ else:
+ return name.replace('_JAAS_PLACEHOLDER', '')
+ else:
+ return name
+
+storm_yaml_template = """{% for key, value in configurations|dictsort if not key.startswith('_') %}{{key}} : {{ escape_yaml_property(replace_jaas_placeholder(resource_management.core.source.InlineTemplate(value).get_content().strip(), security_enabled, conf_dir)) }}
+{% endfor %}"""
+
+def yaml_config_template(configurations):
+ return InlineTemplate(storm_yaml_template, configurations=configurations,
+ extra_imports=[escape_yaml_property, replace_jaas_placeholder, resource_management,
+ resource_management.core, resource_management.core.source])
+
+def yaml_config(filename, configurations = None, conf_dir = None, owner = None, group = None):
+ import params
+ config_content = InlineTemplate('''{% for key, value in configurations_dict|dictsort %}{{ key }}: {{ escape_yaml_property(resource_management.core.source.InlineTemplate(value).get_content()) }}
+{% endfor %}''', configurations_dict=configurations, extra_imports=[escape_yaml_property, resource_management, resource_management.core, resource_management.core.source])
+
+ File (os.path.join(params.conf_dir, filename),
+ content = config_content,
+ owner = owner,
+ mode = "f"
+ )
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py
new file mode 100644
index 0000000..ec3f533
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+from resource_management.libraries.functions import check_process_status
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import format
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from storm import storm
+from service import service
+from ambari_commons import OSConst
+from ambari_commons.os_family_impl import OsFamilyImpl
+from resource_management.core.resources.service import Service
+
+
+class Supervisor(Script):
+ def get_component_name(self):
+ return "storm-supervisor"
+
+ def install(self, env):
+ self.install_packages(env)
+ self.configure(env)
+
+ def configure(self, env):
+ import params
+ env.set_params(params)
+ storm("supervisor")
+
+
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class SupervisorWindows(Supervisor):
+ def start(self, env):
+ import status_params
+ env.set_params(status_params)
+ self.configure(env)
+ Service(status_params.supervisor_win_service_name, action="start")
+
+ def stop(self, env):
+ import status_params
+ env.set_params(status_params)
+ Service(status_params.supervisor_win_service_name, action="stop")
+
+ def status(self, env):
+ import status_params
+ from resource_management.libraries.functions.windows_service_utils import check_windows_service_status
+ env.set_params(status_params)
+ check_windows_service_status(status_params.supervisor_win_service_name)
+
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class SupervisorDefault(Supervisor):
+
+ def pre_upgrade_restart(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
+ conf_select.select(params.stack_name, "storm", params.version)
+ stack_select.select("storm-client", params.version)
+ stack_select.select("storm-supervisor", params.version)
+
+ def start(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+ self.configure(env)
+
+ service("supervisor", action="start")
+ service("logviewer", action="start")
+
+ def stop(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ service("supervisor", action="stop")
+ service("logviewer", action="stop")
+
+ def status(self, env):
+ import status_params
+ env.set_params(status_params)
+ check_process_status(status_params.pid_supervisor)
+
+ def get_log_folder(self):
+ import params
+ return params.log_dir
+
+ def get_user(self):
+ import params
+ return params.storm_user
+
+ def get_pid_files(self):
+ import status_params
+ return [status_params.pid_supervisor]
+
+if __name__ == "__main__":
+ Supervisor().execute()
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py
new file mode 100644
index 0000000..d6c3545
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisor_prod.py
@@ -0,0 +1,84 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+from storm import storm
+from service import service
+from supervisord_service import supervisord_service, supervisord_check_status
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import format
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+
+
+class Supervisor(Script):
+
+ def get_component_name(self):
+ return "storm-supervisor"
+
+ def install(self, env):
+ self.install_packages(env)
+ self.configure(env)
+
+ def configure(self, env):
+ import params
+ env.set_params(params)
+ storm()
+
+ def pre_upgrade_restart(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
+ conf_select.select(params.stack_name, "storm", params.version)
+ stack_select.select("storm-client", params.version)
+ stack_select.select("storm-supervisor", params.version)
+
+ def start(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+ self.configure(env)
+
+ supervisord_service("supervisor", action="start")
+ service("logviewer", action="start")
+
+ def stop(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+
+ supervisord_service("supervisor", action="stop")
+ service("logviewer", action="stop")
+
+ def status(self, env):
+ supervisord_check_status("supervisor")
+
+ def get_log_folder(self):
+ import params
+ return params.log_dir
+
+ def get_user(self):
+ import params
+ return params.storm_user
+
+if __name__ == "__main__":
+ Supervisor().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py
new file mode 100644
index 0000000..6ff9f9c
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/supervisord_service.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.format import format
+
+def supervisord_service(component_name, action):
+ Execute(format("supervisorctl {action} storm-{component_name}"),
+ wait_for_finish=False
+ )
+
+def supervisord_check_status(component_name):
+ try:
+ Execute(format("supervisorctl status storm-{component_name} | grep RUNNING"))
+ except Fail:
+ raise ComponentIsNotRunning()
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py
new file mode 100644
index 0000000..e257ef9
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/ui_server.py
@@ -0,0 +1,137 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+from storm import storm
+from service import service
+from service_check import ServiceCheck
+from resource_management.libraries.functions import check_process_status
+from resource_management.libraries.script import Script
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import format
+from resource_management.core.resources.system import Link
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.security_commons import build_expectations, \
+ cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
+ FILE_TYPE_JAAS_CONF
+from setup_ranger_storm import setup_ranger_storm
+from ambari_commons import OSConst
+from ambari_commons.os_family_impl import OsFamilyImpl
+from resource_management.core.resources.service import Service
+
+
+class UiServer(Script):
+
+ def get_component_name(self):
+ return "storm-client"
+
+ def install(self, env):
+ self.install_packages(env)
+ self.configure(env)
+
+ def configure(self, env):
+ import params
+ env.set_params(params)
+ storm("ui")
+
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class UiServerWindows(UiServer):
+ def start(self, env):
+ import status_params
+ env.set_params(status_params)
+ self.configure(env)
+ Service(status_params.ui_win_service_name, action="start")
+
+ def stop(self, env):
+ import status_params
+ env.set_params(status_params)
+ Service(status_params.ui_win_service_name, action="stop")
+
+ def status(self, env):
+ import status_params
+ env.set_params(status_params)
+ from resource_management.libraries.functions.windows_service_utils import check_windows_service_status
+ check_windows_service_status(status_params.ui_win_service_name)
+
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class UiServerDefault(UiServer):
+
+ def pre_upgrade_restart(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+ if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
+ conf_select.select(params.stack_name, "storm", params.version)
+ stack_select.select("storm-client", params.version)
+
+ def link_metrics_sink_jar(self):
+ import params
+ # Add storm metrics reporter JAR to storm-ui-server classpath.
+ # Remove symlinks. They can be there, if you doing upgrade from HDP < 2.2 to HDP >= 2.2
+ Link(format("{storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+ action="delete")
+ # On old HDP 2.1 versions, this symlink may also exist and break EU to newer versions
+ Link("/usr/lib/storm/lib/ambari-metrics-storm-sink.jar", action="delete")
+
+ if check_stack_feature(StackFeature.STORM_METRICS_APACHE_CLASSES, params.version_for_stack_feature_checks):
+ sink_jar = params.metric_collector_sink_jar
+ else:
+ sink_jar = params.metric_collector_legacy_sink_jar
+
+ Execute(format("{sudo} ln -s {sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+ not_if=format("ls {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+ only_if=format("ls {sink_jar}")
+ )
+
+ def start(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+ self.configure(env)
+ self.link_metrics_sink_jar()
+ setup_ranger_storm(upgrade_type=upgrade_type)
+ service("ui", action="start")
+
+ def stop(self, env, upgrade_type=None):
+ import params
+ env.set_params(params)
+ service("ui", action="stop")
+
+ def status(self, env):
+ import status_params
+ env.set_params(status_params)
+ check_process_status(status_params.pid_ui)
+
+ def get_log_folder(self):
+ import params
+ return params.log_dir
+
+ def get_user(self):
+ import params
+ return params.storm_user
+
+ def get_pid_files(self):
+ import status_params
+ return [status_params.pid_ui]
+
+if __name__ == "__main__":
+ UiServer().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ab4d28a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2 b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2
new file mode 100644
index 0000000..b061cd1
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/templates/client_jaas.conf.j2
@@ -0,0 +1,33 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+StormClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useTicketCache=true
+ renewTicket=true
+ serviceName="{{nimbus_bare_jaas_principal}}";
+};
+
+{% if kafka_bare_jaas_principal %}
+KafkaClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useTicketCache=true
+ renewTicket=true
+ serviceName="{{kafka_bare_jaas_principal}}";
+};
+{% endif %}