You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/11/06 12:01:04 UTC
ambari git commit: AMBARI-22341. A bunch of alerts fails due to
absence of configs (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-perf c0a57a724 -> 4656f1d4e
AMBARI-22341. A bunch of alerts fails due to absence of configs (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4656f1d4
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4656f1d4
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4656f1d4
Branch: refs/heads/branch-3.0-perf
Commit: 4656f1d4e9f3d8ee7ebbce7606fa45e3231e3aa6
Parents: c0a57a7
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon Nov 6 14:00:36 2017 +0200
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon Nov 6 14:00:36 2017 +0200
----------------------------------------------------------------------
.../ambari_agent/AlertSchedulerHandler.py | 5 +-
.../main/python/ambari_agent/ClusterCache.py | 2 +-
.../ambari_agent/ClusterConfigurationCache.py | 22 ---
.../python/ambari_agent/ConfigurationBuilder.py | 80 +++++++++++
.../ambari_agent/CustomServiceOrchestrator.py | 56 +-------
.../python/ambari_agent/InitializerModule.py | 4 +-
.../python/ambari_agent/alerts/base_alert.py | 133 +++++++++++--------
.../python/ambari_agent/alerts/metric_alert.py | 52 ++++----
.../python/ambari_agent/alerts/port_alert.py | 3 +-
.../python/ambari_agent/alerts/script_alert.py | 8 +-
.../python/ambari_agent/alerts/web_alert.py | 16 ++-
11 files changed, 216 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/4656f1d4/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
index 3e2a55b..cf2fe2e 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
@@ -56,6 +56,7 @@ class AlertSchedulerHandler():
self.common_services_dir = initializer_module.config.common_services_dir
self.extensions_dir = initializer_module.config.extensions_dir
self.host_scripts_dir = initializer_module.config.host_scripts_dir
+ self.configuration_builder = initializer_module.configuration_builder
self._cluster_configuration = initializer_module.configurations_cache
self.alert_definitions_cache = initializer_module.alert_definitions_cache
@@ -260,7 +261,7 @@ class AlertSchedulerHandler():
if alert is None:
continue
- alert.set_helpers(self._collector, self._cluster_configuration)
+ alert.set_helpers(self._collector, self._cluster_configuration, self.configuration_builder)
definitions.append(alert)
@@ -376,7 +377,7 @@ class AlertSchedulerHandler():
logger.info("[AlertScheduler] Executing on-demand alert {0} ({1})".format(alert.get_name(),
alert.get_uuid()))
- alert.set_helpers(self._collector, self._cluster_configuration)
+ alert.set_helpers(self._collector, self._cluster_configuration, self.configuration_builder)
alert.collect()
except:
logger.exception("[AlertScheduler] Unable to execute the alert outside of the job scheduler")
http://git-wip-us.apache.org/repos/asf/ambari/blob/4656f1d4/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index b924420..09e01fe 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -146,7 +146,7 @@ class ClusterCache(dict):
try:
return super(ClusterCache, self).__getitem__(key)
except KeyError:
- raise KeyError("{0} for cluster_id={1} are missing. Check if server sent it.".format(self.get_cache_name().title(), key))
+ raise KeyError("{0} for cluster_id={1} is missing. Check if server sent it.".format(self.get_cache_name().title(), key))
def on_cache_update(self):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/4656f1d4/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
index 77ca4c1..677fff2 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
@@ -40,25 +40,3 @@ class ClusterConfigurationCache(ClusterCache):
def get_cache_name(self):
return 'configurations'
-
- def get_configuration_value(self, cluster_id, key):
- """
- Gets a value from the cluster configuration map for the given cluster and
- key. The key is expected to be of the form 'foo-bar/baz' or
- 'foo-bar/bar-baz/foobarbaz' where every / denotes a new mapping
- :param key: a lookup key, like 'foo-bar/baz'
- :return: the value, or None if not found
- """
- self._cache_lock.acquire()
- try:
- dictionary = self[str(cluster_id)]['configurations']
- for layer_key in key.split('/'):
- dictionary = dictionary[layer_key]
-
- return dictionary
-
- except KeyError:
- logger.debug("Cache miss for configuration property {0} in cluster {1}".format(key, cluster_id))
- return None
- finally:
- self._cache_lock.release()
http://git-wip-us.apache.org/repos/asf/ambari/blob/4656f1d4/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py b/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py
new file mode 100644
index 0000000..f8bdb42
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import hostname
+
+class ConfigurationBuilder:
+ def __init__(self, initializer_module):
+ self.config = initializer_module.config
+ self.metadata_cache = initializer_module.metadata_cache
+ self.topology_cache = initializer_module.topology_cache
+ self.host_level_params_cache = initializer_module.host_level_params_cache
+ self.configurations_cache = initializer_module.configurations_cache
+
+ def get_configuration(self, cluster_id, service_name, component_name):
+ if cluster_id:
+ metadata_cache = self.metadata_cache[cluster_id]
+ configurations_cache = self.configurations_cache[cluster_id]
+ host_level_params_cache = self.host_level_params_cache[cluster_id]
+
+ command_dict = {
+ 'clusterLevelParams': metadata_cache.clusterLevelParams,
+ 'hostLevelParams': host_level_params_cache,
+ 'clusterHostInfo': self.topology_cache.get_cluster_host_info(cluster_id),
+ 'localComponents': self.topology_cache.get_cluster_local_components(cluster_id),
+ 'agentLevelParams': {'hostname': self.topology_cache.get_current_host_info(cluster_id)['hostName']}
+ }
+
+ if service_name is not None and service_name != 'null':
+ command_dict['serviceLevelParams'] = metadata_cache.serviceLevelParams[service_name]
+
+ host_repos = host_level_params_cache.hostRepositories
+ if component_name in host_repos.componentRepos:
+ repo_version_id = host_repos.componentRepos[component_name]
+ command_dict['repositoryFile'] = host_repos.commandRepos[str(repo_version_id)]
+
+ component_dict = self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name)
+ if component_dict is not None:
+ command_dict.update({
+ 'componentLevelParams': component_dict.componentLevelParams,
+ 'commandParams': component_dict.commandParams
+ })
+
+ command_dict.update(configurations_cache)
+ else:
+ command_dict = {'agentLevelParams': {}}
+
+ command_dict['ambariLevelParams'] = self.metadata_cache.get_cluster_indepedent_data().clusterLevelParams
+
+ command_dict['agentLevelParams'].update({
+ 'public_hostname': self.public_fqdn,
+ 'agentCacheDir': self.config.get('agent', 'cache_dir'),
+ })
+ command_dict['agentLevelParams']["agentConfigParams"] = {
+ "agent": {
+ "parallel_execution": self.config.get_parallel_exec_option(),
+ "use_system_proxy_settings": self.config.use_system_proxy_setting()
+ }
+ }
+ return command_dict
+
+ @property
+ def public_fqdn(self):
+ hostname.public_hostname(self.config)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/4656f1d4/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 1cf02d1..0debb1b 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -25,7 +25,6 @@ import sys
from ambari_commons import shell
import threading
-from FileCache import FileCache
from AgentException import AgentException
from PythonExecutor import PythonExecutor
from resource_management.libraries.functions.log_process_information import log_process_information
@@ -33,7 +32,6 @@ from resource_management.core.utils import PasswordString
from ambari_agent.Utils import Utils
import subprocess
import Constants
-import hostname
logger = logging.getLogger()
@@ -78,9 +76,7 @@ class CustomServiceOrchestrator():
CREDENTIAL_STORE_CLASS_PATH_NAME = 'credentialStoreClassPath'
def __init__(self, initializer_module):
- self.metadata_cache = initializer_module.metadata_cache
- self.topology_cache = initializer_module.topology_cache
- self.configurations_cache = initializer_module.configurations_cache
+ self.configuration_builder = initializer_module.configuration_builder
self.host_level_params_cache = initializer_module.host_level_params_cache
self.config = initializer_module.config
self.tmp_dir = self.config.get('agent', 'prefix')
@@ -92,7 +88,6 @@ class CustomServiceOrchestrator():
'status_command_stdout.txt')
self.status_commands_stderr = os.path.join(self.tmp_dir,
'status_command_stderr.txt')
- self.public_fqdn = hostname.public_hostname(self.config)
# Construct the hadoop credential lib JARs path
self.credential_shell_lib_path = os.path.join(self.config.get('security', 'credential_lib_dir',
@@ -400,7 +395,7 @@ class CustomServiceOrchestrator():
log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
script_params = [command_name, json_path, current_base_dir, tmpstrucoutfile, logger_level, self.exec_tmp_dir,
self.force_https_protocol, self.ca_cert_file_path]
-
+
if log_out_files:
script_params.append("-o")
@@ -461,51 +456,12 @@ class CustomServiceOrchestrator():
if cluster_id != '-1' and cluster_id != 'null':
service_name = command_header['serviceName']
component_name = command_header['role']
-
- metadata_cache = self.metadata_cache[cluster_id]
- configurations_cache = self.configurations_cache[cluster_id]
- host_level_params_cache = self.host_level_params_cache[cluster_id]
-
- command_dict = {
- 'clusterLevelParams': metadata_cache.clusterLevelParams,
- 'hostLevelParams': host_level_params_cache,
- 'clusterHostInfo': self.topology_cache.get_cluster_host_info(cluster_id),
- 'localComponents': self.topology_cache.get_cluster_local_components(cluster_id),
- 'agentLevelParams': {'hostname': self.topology_cache.get_current_host_info(cluster_id)['hostName']}
- }
-
- if service_name is not None and service_name != 'null':
- command_dict['serviceLevelParams'] = metadata_cache.serviceLevelParams[service_name]
-
- host_repos = host_level_params_cache.hostRepositories
- if component_name in host_repos.componentRepos:
- repo_version_id = host_repos.componentRepos[component_name]
- command_dict['repositoryFile'] = host_repos.commandRepos[str(repo_version_id)]
-
- component_dict = self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name)
- if component_dict is not None:
- command_dict.update({
- 'componentLevelParams': component_dict.componentLevelParams,
- 'commandParams': component_dict.commandParams
- })
-
- command_dict.update(configurations_cache)
else:
- command_dict = {'agentLevelParams': {}}
-
- command_dict['ambariLevelParams'] = self.metadata_cache.get_cluster_indepedent_data().clusterLevelParams
-
- command_dict['agentLevelParams'].update({
- 'public_hostname': self.public_fqdn,
- 'agentCacheDir': self.config.get('agent', 'cache_dir'),
- })
- command_dict['agentLevelParams']["agentConfigParams"] = {
- "agent": {
- "parallel_execution": self.config.get_parallel_exec_option(),
- "use_system_proxy_settings": self.config.use_system_proxy_setting()
- }
- }
+ cluster_id = None
+ service_name = None
+ component_name = None
+ command_dict = self.configuration_builder.get_configuration(cluster_id, service_name, component_name)
command = Utils.update_nested(Utils.get_mutable_copy(command_dict), command_header)
return command
http://git-wip-us.apache.org/repos/asf/ambari/blob/4656f1d4/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 2c80218..dadd508 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -33,6 +33,7 @@ from ambari_agent.CommandStatusDict import CommandStatusDict
from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
from ambari_agent.RecoveryManager import RecoveryManager
from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
+from ambari_agent.ConfigurationBuilder import ConfigurationBuilder
from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
logger = logging.getLogger(__name__)
@@ -58,9 +59,10 @@ class InitializerModule:
self.metadata_cache = ClusterMetadataCache(self.config.cluster_cache_dir)
self.topology_cache = ClusterTopologyCache(self.config.cluster_cache_dir, self.config)
- self.configurations_cache = ClusterConfigurationCache(self.config.cluster_cache_dir)
self.host_level_params_cache = ClusterHostLevelParamsCache(self.config.cluster_cache_dir)
+ self.configurations_cache = ClusterConfigurationCache(self.config.cluster_cache_dir)
self.alert_definitions_cache = ClusterAlertDefinitionsCache(self.config.cluster_cache_dir)
+ self.configuration_builder = ConfigurationBuilder(self)
self.file_cache = FileCache(self.config)
http://git-wip-us.apache.org/repos/asf/ambari/blob/4656f1d4/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
index 79a686f..eb658be 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
@@ -49,7 +49,7 @@ class BaseAlert(object):
self.host_name = ''
self.public_host_name = ''
self.config = config
-
+
def interval(self):
""" gets the defined interval this check should run """
if not self.alert_meta.has_key('interval'):
@@ -64,7 +64,7 @@ class BaseAlert(object):
gets whether the definition is enabled
"""
return self.alert_meta['enabled']
-
+
def get_name(self):
"""
@@ -80,12 +80,13 @@ class BaseAlert(object):
return self.alert_meta['uuid']
- def set_helpers(self, collector, cluster_configuration_cache):
+ def set_helpers(self, collector, cluster_configuration_cache, configuration_builder):
"""
sets helper objects for alerts without having to use them in a constructor
"""
self.collector = collector
self.cluster_configuration_cache = cluster_configuration_cache
+ self.configuration_builder = configuration_builder
def set_cluster(self, cluster_name, cluster_id, host_name, public_host_name = None):
@@ -110,10 +111,10 @@ class BaseAlert(object):
def collect(self):
""" method used for collection. defers to _collect() """
-
+
res = (BaseAlert.RESULT_UNKNOWN, [])
res_base_text = None
-
+
try:
res = self._collect()
result_state = res[0]
@@ -132,17 +133,17 @@ class BaseAlert(object):
except Exception as exception:
message = "[Alert][{0}] Unable to execute alert. {1}".format(
self.get_name(), str(exception))
-
+
# print the exception if in DEBUG, otherwise just log the warning
- if logger.isEnabledFor(logging.DEBUG):
- logger.exception(message)
- else:
- logger.warning(message)
+ #if logger.isEnabledFor(logging.DEBUG):
+ logger.exception(message)
+ #else:
+ # logger.warning(message)
res = (BaseAlert.RESULT_UNKNOWN, [str(exception)])
res_base_text = "{0}"
-
-
+
+
if logger.isEnabledFor(logging.DEBUG):
logger.debug("[Alert][{0}] result = {1}".format(self.get_name(), str(res)))
@@ -181,7 +182,7 @@ class BaseAlert(object):
self.collector.put(self.cluster_name, data)
- def _get_configuration_value(self, key):
+ def _get_configuration_value(self, configurations, key):
"""
Gets the value of the specified configuration key from the cache. The key
should be of the form {{foo-bar/baz}}. If the key given is not a lookup key
@@ -216,8 +217,7 @@ class BaseAlert(object):
# for every match, get its configuration value and replace it in the key
resolved_key = key
for placeholder_key in placeholder_keys:
- value = self.cluster_configuration_cache.get_configuration_value(
- self.cluster_id, placeholder_key)
+ value = self.get_configuration_value(configurations, placeholder_key)
# if any of the placeholder keys is missing from the configuration, then
# return None as per the contract of this function
@@ -235,21 +235,44 @@ class BaseAlert(object):
return resolved_key
-
+ def get_configuration_value(self, configurations, key):
+ """
+ Gets a value from the cluster configuration map for the given cluster and
+ key. The key is expected to be of the form 'foo-bar/baz' or
+ 'foo-bar/bar-baz/foobarbaz' where every / denotes a new mapping
+ :param key: a lookup key, like 'foo-bar/baz'
+ :return: the value, or None if not found
+ """
+ if not key.startswith("/"):
+ key = "/configurations/" + key
+
+ try:
+ curr_dict = configurations
+ subdicts = filter(None, key.split('/'))
+
+ for layer_key in subdicts:
+ curr_dict = curr_dict[layer_key]
+
+ return curr_dict
+ except KeyError:
+ logger.debug("Cache miss for configuration property {0}".format(key))
+ return None
+
+
def _lookup_uri_property_keys(self, uri_structure):
"""
Loads the configuration lookup keys that the URI structure needs. This
will return a named tuple that contains the keys needed to lookup
parameterized URI values from the cached configuration.
The URI structure looks something like:
-
- "uri":{
+
+ "uri":{
"http": foo,
"https": bar,
...
}
"""
-
+
if uri_structure is None:
return None
@@ -271,13 +294,13 @@ class BaseAlert(object):
if 'http' in uri_structure:
http_key = uri_structure['http']
-
+
if 'https' in uri_structure:
https_key = uri_structure['https']
-
+
if 'https_property' in uri_structure:
https_property_key = uri_structure['https_property']
-
+
if 'https_property_value' in uri_structure:
https_property_value_key = uri_structure['https_property_value']
@@ -306,11 +329,11 @@ class BaseAlert(object):
ha_https_pattern = ha['https_pattern']
- AlertUriLookupKeys = namedtuple('AlertUriLookupKeys',
+ AlertUriLookupKeys = namedtuple('AlertUriLookupKeys',
'acceptable_codes http https https_property https_property_value default_port '
'kerberos_keytab kerberos_principal '
'ha_nameservice ha_alias_key ha_http_pattern ha_https_pattern')
-
+
alert_uri_lookup_keys = AlertUriLookupKeys(
acceptable_codes=acceptable_codes_key,
http=http_key,
@@ -321,44 +344,46 @@ class BaseAlert(object):
ha_nameservice=ha_nameservice, ha_alias_key=ha_alias_key,
ha_http_pattern=ha_http_pattern, ha_https_pattern=ha_https_pattern
)
-
+
return alert_uri_lookup_keys
-
+
def _get_uri_from_structure(self, alert_uri_lookup_keys):
"""
Gets the URI to use by examining the URI structure from the definition.
This will return a named tuple that has the uri and the SSL flag. The
URI structure looks something like:
-
- "uri":{
+
+ "uri":{
"http": foo,
"https": bar,
...
}
"""
-
+
if alert_uri_lookup_keys is None:
return None
-
+
http_uri = None
https_uri = None
+ configurations = self.configuration_builder.get_configuration(self.cluster_id, None, None)
+
# first thing is first; if there are HA keys then try to dynamically build
# the property which is used to get the actual value of the uri
# (ie dfs.namenode.http-address.c1ha.nn2)
if alert_uri_lookup_keys.ha_nameservice is not None or alert_uri_lookup_keys.ha_alias_key is not None:
- alert_uri = self._get_uri_from_ha_structure(alert_uri_lookup_keys)
+ alert_uri = self._get_uri_from_ha_structure(alert_uri_lookup_keys, configurations)
if alert_uri is not None:
return alert_uri
# attempt to parse and parameterize the various URIs; properties that
# do not exist int he lookup map are returned as None
if alert_uri_lookup_keys.http is not None:
- http_uri = self._get_configuration_value(alert_uri_lookup_keys.http)
-
+ http_uri = self._get_configuration_value(configurations, alert_uri_lookup_keys.http)
+
if alert_uri_lookup_keys.https is not None:
- https_uri = self._get_configuration_value(alert_uri_lookup_keys.https)
+ https_uri = self._get_configuration_value(configurations, alert_uri_lookup_keys.https)
# without a URI, there's no way to create the structure we need - return
# the default port if specified, otherwise throw an exception
@@ -372,21 +397,21 @@ class BaseAlert(object):
# start out assuming plaintext
uri = http_uri
is_ssl_enabled = False
-
+
if https_uri is not None:
# https without http implies SSL, otherwise look it up based on the properties
if http_uri is None:
is_ssl_enabled = True
uri = https_uri
- elif self._check_uri_ssl_property(alert_uri_lookup_keys):
+ elif self._check_uri_ssl_property(alert_uri_lookup_keys, configurations):
is_ssl_enabled = True
uri = https_uri
-
+
alert_uri = AlertUri(uri=uri, is_ssl_enabled=is_ssl_enabled)
return alert_uri
- def _get_uri_from_ha_structure(self, alert_uri_lookup_keys):
+ def _get_uri_from_ha_structure(self, alert_uri_lookup_keys, configurations):
"""
Attempts to parse the HA URI structure in order to build a dynamic key
that represents the correct host URI to check.
@@ -398,7 +423,7 @@ class BaseAlert(object):
logger.debug("[Alert][{0}] HA URI structure detected in definition, attempting to lookup dynamic HA properties".format(self.get_name()))
- ha_nameservice = self._get_configuration_value(alert_uri_lookup_keys.ha_nameservice)
+ ha_nameservice = self._get_configuration_value(configurations, alert_uri_lookup_keys.ha_nameservice)
ha_alias_key = alert_uri_lookup_keys.ha_alias_key
ha_http_pattern = alert_uri_lookup_keys.ha_http_pattern
ha_https_pattern = alert_uri_lookup_keys.ha_https_pattern
@@ -411,25 +436,25 @@ class BaseAlert(object):
# if there is a HA nameservice defined, but it can not be evaluated then it's not HA environment
if ha_nameservice is None:
return None
-
+
# convert dfs.ha.namenodes.{{ha-nameservice}} into dfs.ha.namenodes.c1ha
ha_alias_key = ha_alias_key.replace(self.HA_NAMESERVICE_PARAM, ha_nameservice)
- ha_nameservice_alias = self._get_configuration_value(ha_alias_key)
-
+ ha_nameservice_alias = self._get_configuration_value(configurations, ha_alias_key)
+
if ha_nameservice_alias is None:
logger.warning("[Alert][{0}] HA nameservice value is present but there are no aliases for {1}".format(
self.get_name(), ha_alias_key))
return None
else:
- ha_nameservice_alias = self._get_configuration_value(ha_alias_key)
-
+ ha_nameservice_alias = self._get_configuration_value(configurations, ha_alias_key)
+
# if HA nameservice is not defined then the fact that the HA alias_key could not be evaluated shows that it's not HA environment
if ha_nameservice_alias is None:
return None
# determine which pattern to use (http or https)
ha_pattern = ha_http_pattern
- is_ssl_enabled = self._check_uri_ssl_property(alert_uri_lookup_keys)
+ is_ssl_enabled = self._check_uri_ssl_property(alert_uri_lookup_keys, configurations)
if is_ssl_enabled:
ha_pattern = ha_https_pattern
@@ -459,14 +484,14 @@ class BaseAlert(object):
# get the host for dfs.namenode.http-address.c1ha.nn1 and see if it's
# this host
- value = self._get_configuration_value(key)
+ value = self._get_configuration_value(configurations, key)
if value is not None and (self.host_name in value or self.public_host_name in value):
return AlertUri(uri=value, is_ssl_enabled=is_ssl_enabled)
return None
- def _check_uri_ssl_property(self, alert_uri_lookup_keys):
+ def _check_uri_ssl_property(self, alert_uri_lookup_keys, configurations):
"""
Gets whether the SSL property and value on the URI indicate an SSL
connection.
@@ -478,10 +503,10 @@ class BaseAlert(object):
https_property_value = None
if alert_uri_lookup_keys.https_property is not None:
- https_property = self._get_configuration_value(alert_uri_lookup_keys.https_property)
+ https_property = self._get_configuration_value(configurations, alert_uri_lookup_keys.https_property)
if alert_uri_lookup_keys.https_property_value is not None:
- https_property_value = self._get_configuration_value(alert_uri_lookup_keys.https_property_value)
+ https_property_value = self._get_configuration_value(configurations, alert_uri_lookup_keys.https_property_value)
if https_property is None:
return False
@@ -523,11 +548,11 @@ class BaseAlert(object):
def get_host_from_url(uri):
if uri is None:
return None
-
+
# if not a string, return None
if not isinstance(uri, basestring):
- return None
-
+ return None
+
# RFC3986, Appendix B
parts = re.findall('^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?', uri)
@@ -548,8 +573,8 @@ class BaseAlert(object):
if -1 == host_and_port.find(':'):
if host_and_port.isdigit():
- return None
-
+ return None
+
return host_and_port
else:
return host_and_port.split(':')[0]
http://git-wip-us.apache.org/repos/asf/ambari/blob/4656f1d4/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py
index da49d2a..f86c8d6 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py
@@ -40,13 +40,13 @@ SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}'
DEFAULT_CONNECTION_TIMEOUT = 5.0
class MetricAlert(BaseAlert):
-
+
def __init__(self, alert_meta, alert_source_meta, config):
super(MetricAlert, self).__init__(alert_meta, alert_source_meta, config)
connection_timeout = DEFAULT_CONNECTION_TIMEOUT
- self.metric_info = None
+ self.metric_info = None
if 'jmx' in alert_source_meta:
self.metric_info = JmxMetric(alert_source_meta['jmx'])
@@ -70,13 +70,13 @@ class MetricAlert(BaseAlert):
def _collect(self):
if self.metric_info is None:
raise Exception("Could not determine result. Specific metric collector is not defined.")
-
+
if self.uri_property_keys is None:
raise Exception("Could not determine result. URL(s) were not defined.")
# use the URI lookup keys to get a final URI value to query
- alert_uri = self._get_uri_from_structure(self.uri_property_keys)
-
+ alert_uri = self._get_uri_from_structure(self.uri_property_keys)
+
logger.debug("[Alert][{0}] Calculated metric URI to be {1} (ssl={2})".format(
self.get_name(), alert_uri.uri, str(alert_uri.is_ssl_enabled)))
@@ -85,7 +85,7 @@ class MetricAlert(BaseAlert):
host = self.host_name
port = 80 # probably not very realistic
- try:
+ try:
port = int(get_port_from_url(alert_uri.uri))
except:
pass
@@ -104,22 +104,22 @@ class MetricAlert(BaseAlert):
value_list.extend(jmx_property_values)
check_value = self.metric_info.calculate(value_list)
value_list.append(check_value)
-
+
collect_result = self._get_result(value_list[0] if check_value is None else check_value)
logger.debug("[Alert][{0}] Resolved values = {1}".format(self.get_name(), str(value_list)))
-
+
return (collect_result, value_list)
-
+
def _get_result(self, value):
ok_value = self.__find_threshold('ok')
warn_value = self.__find_threshold('warning')
crit_value = self.__find_threshold('critical')
-
+
# critical values are higher
critical_direction_up = crit_value >= warn_value
-
+
if critical_direction_up:
# critical values are higher
if value >= crit_value:
@@ -149,19 +149,19 @@ class MetricAlert(BaseAlert):
else:
return self.RESULT_OK
-
+
def __find_threshold(self, reporting_type):
""" find the defined thresholds for alert values """
-
+
if not 'reporting' in self.alert_source_meta:
return None
-
+
if not reporting_type in self.alert_source_meta['reporting']:
return None
-
+
if not 'value' in self.alert_source_meta['reporting'][reporting_type]:
return None
-
+
return self.alert_source_meta['reporting'][reporting_type]['value']
@@ -174,10 +174,12 @@ class MetricAlert(BaseAlert):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(str(jmx_metric.property_map))
- security_enabled = str(self._get_configuration_value(SECURITY_ENABLED_KEY)).upper() == 'TRUE'
+ configurations = self.configuration_builder.get_configuration(self.cluster_id, None, None)
+
+ security_enabled = str(self._get_configuration_value(configurations, SECURITY_ENABLED_KEY)).upper() == 'TRUE'
if self.uri_property_keys.kerberos_principal is not None:
- kerberos_principal = self._get_configuration_value(
+ kerberos_principal = self._get_configuration_value(configurations,
self.uri_property_keys.kerberos_principal)
if kerberos_principal is not None:
@@ -185,7 +187,7 @@ class MetricAlert(BaseAlert):
kerberos_principal = kerberos_principal.replace('_HOST', self.host_name)
if self.uri_property_keys.kerberos_keytab is not None:
- kerberos_keytab = self._get_configuration_value(self.uri_property_keys.kerberos_keytab)
+ kerberos_keytab = self._get_configuration_value(configurations, self.uri_property_keys.kerberos_keytab)
if "0.0.0.0" in str(host):
host = self.host_name
@@ -204,8 +206,8 @@ class MetricAlert(BaseAlert):
if tmp_dir is None:
tmp_dir = gettempdir()
- kerberos_executable_search_paths = self._get_configuration_value('{{kerberos-env/executable_search_paths}}')
- smokeuser = self._get_configuration_value('{{cluster-env/smokeuser}}')
+ kerberos_executable_search_paths = self._get_configuration_value(configurations, '{{kerberos-env/executable_search_paths}}')
+ smokeuser = self._get_configuration_value(configurations, '{{cluster-env/smokeuser}}')
response, error_msg, time_millis = curl_krb_request(tmp_dir, kerberos_keytab, kerberos_principal, url,
"metric_alert", kerberos_executable_search_paths, False, self.get_name(), smokeuser,
@@ -270,7 +272,7 @@ class MetricAlert(BaseAlert):
'''
return '{0}'
-
+
class JmxMetric:
DYNAMIC_CODE_TEMPLATE = """
# ensure that division yields a float, use // for integer division
@@ -287,18 +289,18 @@ def f(args):
if 'value' in jmx_info:
realcode = re.sub('(\{(\d+)\})', 'args[\g<2>]', jmx_info['value'])
-
+
self.custom_module = imp.new_module(str(uuid.uuid4()))
code = self.DYNAMIC_CODE_TEMPLATE.format(realcode)
exec code in self.custom_module.__dict__
-
+
for p in self.property_list:
parts = p.split('/')
if not parts[0] in self.property_map:
self.property_map[parts[0]] = []
self.property_map[parts[0]].append(parts[1])
-
+
def calculate(self, args):
if self.custom_module is not None:
return self.custom_module.f(args)
http://git-wip-us.apache.org/repos/asf/ambari/blob/4656f1d4/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
index 3642550..0f4a196 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
@@ -87,9 +87,10 @@ class PortAlert(BaseAlert):
def _collect(self):
+ configurations = self.configuration_builder.get_configuration(self.cluster_id, None, None)
# can be parameterized or static
# if not parameterized, this will return the static value
- uri_value = self._get_configuration_value(self.uri)
+ uri_value = self._get_configuration_value(configurations, self.uri)
host_not_specified = False
if uri_value is None:
http://git-wip-us.apache.org/repos/asf/ambari/blob/4656f1d4/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py
index 301e440..2ad6c33 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py
@@ -24,6 +24,7 @@ import os
import re
from alerts.base_alert import BaseAlert
from resource_management.core.environment import Environment
+from resource_management.libraries.script.script import Script
from resource_management.libraries.functions.curl_krb_request import KERBEROS_KINIT_TIMER_PARAMETER
from ambari_agent import Constants
@@ -89,6 +90,7 @@ class ScriptAlert(BaseAlert):
def _collect(self):
cmd_module = self._load_source()
+ full_configurations = self.configuration_builder.get_configuration(self.cluster_id, None, None)
if cmd_module is not None:
configurations = {}
@@ -98,7 +100,7 @@ class ScriptAlert(BaseAlert):
# for each token, if there is a value, store in; otherwise don't store
# a key with a value of None
for token in tokens:
- value = self._get_configuration_value(token)
+ value = self._get_configuration_value(full_configurations, token)
if value is not None:
configurations[token] = value
except AttributeError:
@@ -106,6 +108,8 @@ class ScriptAlert(BaseAlert):
# be passed in so hopefully the script doesn't need any
logger.debug("The script {0} does not have a get_tokens() function".format(str(cmd_module)))
+ Script.config = full_configurations
+
# try to get basedir for scripts
# it's needed for server side scripts to properly use resource management
matchObj = re.match( r'((.*)services(.*)package)', self.path_to_script)
@@ -162,7 +166,7 @@ class ScriptAlert(BaseAlert):
logger.debug("[Alert][{0}] Executing script check {1}".format(
self.get_name(), self.path_to_script))
-
+
if (not self.path_to_script.endswith('.py')):
logger.error("[Alert][{0}] Unable to execute script {1}".format(
self.get_name(), self.path_to_script))
http://git-wip-us.apache.org/repos/asf/ambari/blob/4656f1d4/ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py
index 0e400f7..7b8d464 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py
@@ -89,7 +89,7 @@ class WebAlert(BaseAlert):
raise Exception("Could not determine result. URL(s) were not defined.")
# use the URI lookup keys to get a final URI value to query
- alert_uri = self._get_uri_from_structure(self.uri_property_keys)
+ alert_uri = self._get_uri_from_structure(self.uri_property_keys)
logger.debug("[Alert][{0}] Calculated web URI to be {1} (ssl={2})".format(
self.get_name(), alert_uri.uri, str(alert_uri.is_ssl_enabled)))
@@ -174,8 +174,10 @@ class WebAlert(BaseAlert):
kerberos_keytab = None
kerberos_principal = None
+ configurations = self.configuration_builder.get_configuration(self.cluster_id, None, None)
+
if self.uri_property_keys.kerberos_principal is not None:
- kerberos_principal = self._get_configuration_value(
+ kerberos_principal = self._get_configuration_value(configurations,
self.uri_property_keys.kerberos_principal)
if kerberos_principal is not None:
@@ -183,10 +185,10 @@ class WebAlert(BaseAlert):
kerberos_principal = kerberos_principal.replace('_HOST', self.host_name)
if self.uri_property_keys.kerberos_keytab is not None:
- kerberos_keytab = self._get_configuration_value(self.uri_property_keys.kerberos_keytab)
+ kerberos_keytab = self._get_configuration_value(configurations, self.uri_property_keys.kerberos_keytab)
+
+ security_enabled = self._get_configuration_value(configurations, '{{cluster-env/security_enabled}}')
- security_enabled = self._get_configuration_value('{{cluster-env/security_enabled}}')
-
if kerberos_principal is not None and kerberos_keytab is not None \
and security_enabled is not None and security_enabled.lower() == "true":
# Create the kerberos credentials cache (ccache) file and set it in the environment to use
@@ -197,8 +199,8 @@ class WebAlert(BaseAlert):
tmp_dir = gettempdir()
# Get the configured Kerberos executables search paths, if any
- kerberos_executable_search_paths = self._get_configuration_value('{{kerberos-env/executable_search_paths}}')
- smokeuser = self._get_configuration_value('{{cluster-env/smokeuser}}')
+ kerberos_executable_search_paths = self._get_configuration_value(configurations, '{{kerberos-env/executable_search_paths}}')
+ smokeuser = self._get_configuration_value(configurations, '{{cluster-env/smokeuser}}')
response_code, error_msg, time_millis = curl_krb_request(tmp_dir, kerberos_keytab, kerberos_principal, url,
"web_alert", kerberos_executable_search_paths, True, self.get_name(), smokeuser,