You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2014/08/22 16:08:07 UTC
git commit: AMBARI-6983. Alerts: persist and allow configuration
substitution variables for alert definitions (ncole)
Repository: ambari
Updated Branches:
refs/heads/branch-alerts-dev a533c1b4f -> 6480115d2
AMBARI-6983. Alerts: persist and allow configuration substitution variables for alert definitions (ncole)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6480115d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6480115d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6480115d
Branch: refs/heads/branch-alerts-dev
Commit: 6480115d23d2e9a37958e6720bc7062afc0641a7
Parents: a533c1b
Author: Nate Cole <nc...@hortonworks.com>
Authored: Thu Aug 21 21:13:29 2014 -0400
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Fri Aug 22 10:07:21 2014 -0400
----------------------------------------------------------------------
.../ambari_agent/AlertSchedulerHandler.py | 112 +++++++++++++++----
.../src/main/python/ambari_agent/Controller.py | 1 +
.../src/main/python/ambari_agent/Heartbeat.py | 2 +-
.../python/ambari_agent/alerts/base_alert.py | 60 ++++++++--
.../python/ambari_agent/alerts/port_alert.py | 32 +++---
.../src/test/python/ambari_agent/TestAlerts.py | 34 +++++-
.../test/python/ambari_agent/TestController.py | 1 +
.../dummy_files/alert_definitions.def | 49 --------
.../ambari_agent/dummy_files/definitions.json | 56 ++++++++++
.../server/agent/AlertDefinitionCommand.java | 56 ++++++++++
.../apache/ambari/server/agent/HeartBeat.java | 6 +
.../ambari/server/agent/HeartBeatHandler.java | 1 +
.../ambari/server/agent/HeartbeatMonitor.java | 1 -
13 files changed, 314 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
index 10fdef7..6308f7c 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
@@ -21,12 +21,12 @@ limitations under the License.
'''
http://apscheduler.readthedocs.org/en/v2.1.2
'''
-import glob
import json
import logging
import os
import sys
import time
+import traceback
from apscheduler.scheduler import Scheduler
from alerts.collector import AlertCollector
from alerts.port_alert import PortAlert
@@ -37,6 +37,9 @@ logger = logging.getLogger()
class AlertSchedulerHandler():
make_cachedir = True
+ FILENAME = 'definitions.json'
+ TYPE_PORT = 'PORT'
+
def __init__(self, cachedir, in_minutes=True):
self.cachedir = cachedir
@@ -44,6 +47,7 @@ class AlertSchedulerHandler():
try:
os.makedirs(cachedir)
except:
+ logger.critical("Could not create the cache directory {0}".format(cachedir))
pass
config = {
@@ -56,11 +60,13 @@ class AlertSchedulerHandler():
self.__in_minutes = in_minutes
self.__loaded = False
self.__collector = AlertCollector()
+ self.__config_maps = {}
def update_definitions(self, alert_commands, refresh_jobs=False):
- for command in alert_commands:
- with open(os.path.join(self.cachedir, command['clusterName'] + '.def'), 'w') as f:
- json.dump(command, f, indent=2)
+ ''' updates the persisted definitions and restarts the scheduler '''
+
+ with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f:
+ json.dump(alert_commands, f, indent=2)
if refresh_jobs:
self.__scheduler.shutdown(wait=False)
@@ -71,6 +77,7 @@ class AlertSchedulerHandler():
return lambda: alert_def.collect()
def start(self):
+ ''' loads definitions from file and starts the scheduler '''
if not self.__loaded:
alert_callables = self.__load_definitions()
@@ -92,48 +99,105 @@ class AlertSchedulerHandler():
self.__scheduler = None
def collector(self):
+ ''' gets the collector for reporting to the server '''
return self.__collector
def __load_definitions(self):
+ ''' loads all alert commands from the file. all clusters are stored in one file '''
definitions = []
+
+ all_commands = None
try:
- for deffile in glob.glob(os.path.join(self.cachedir, '*.def')):
- with open(deffile, 'r') as f:
- command_json = json.load(f)
-
- for definition in command_json['alertDefinitions']:
- obj = self.__json_to_callable(definition)
-
- if obj is not None:
- obj.set_cluster(
- '' if not 'clusterName' in command_json else command_json['clusterName'],
- '' if not 'hostName' in command_json else command_json['hostName'])
-
- definitions.append(obj)
-
+ with open(os.path.join(self.cachedir, self.FILENAME)) as fp:
+ all_commands = json.load(fp)
except:
- import traceback
- traceback.print_exc()
- pass
+ if (logger.isEnabledFor(logging.DEBUG)):
+ traceback.print_exc()
+ return definitions
+
+ for command_json in all_commands:
+ clusterName = '' if not 'clusterName' in command_json else command_json['clusterName']
+ hostName = '' if not 'hostName' in command_json else command_json['hostName']
+
+ configmap = None
+ # each cluster gets a map of key/value pairs of substitution values
+ self.__config_maps[clusterName] = {}
+ if 'configurations' in command_json:
+ configmap = command_json['configurations']
+
+ for definition in command_json['alertDefinitions']:
+ obj = self.__json_to_callable(definition)
+ if obj is None:
+ continue
+
+ obj.set_cluster(clusterName, hostName)
+
+ # get the config values for the alerts 'lookup keys',
+ # eg: hdfs-site/dfs.namenode.http-address : host_and_port
+ vals = self.__find_config_values(configmap, obj.get_lookup_keys())
+ self.__config_maps[clusterName].update(vals)
+
+ obj.set_helpers(self.__collector, self.__config_maps[clusterName])
+
+ definitions.append(obj)
+
return definitions
def __json_to_callable(self, json_definition):
+ '''
+ converts the json that represents all aspects of a definition
+ and makes an object that extends BaseAlert that is used for individual
+ '''
source = json_definition['source']
source_type = source.get('type', '')
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug("Creating job type {0} with {1}".format(source_type, str(json_definition)))
+
alert = None
if source_type == 'METRIC':
pass
elif source_type == 'PORT':
- alert = PortAlert(self.__collector, json_definition, source)
+ alert = PortAlert(json_definition, source)
elif type == 'SCRIPT':
pass
return alert
- def __json_to_meta(self, json_definition):
- pass
+ def __find_config_values(self, configmap, obj_keylist):
+ ''' finds templated values in the configuration map provided by the server '''
+ if configmap is None:
+ return {}
+
+ result = {}
+
+ for key in obj_keylist:
+ try:
+ obj = configmap
+ for layer in key.split('/'):
+ obj = obj[layer]
+ result[key] = obj
+ except KeyError: # the nested key is missing somewhere
+ pass
+
+ return result
+
+ def update_configurations(self, commands):
+ '''
+ when an execution command comes in, update any necessary values.
+ status commands do not contain useful configurations
+ '''
+ for command in commands:
+ clusterName = command['clusterName']
+ if not clusterName in self.__config_maps:
+ continue
+
+ if 'configurations' in command:
+ configmap = command['configurations']
+ keylist = self.__config_maps[clusterName].keys()
+ vals = self.__find_config_values(configmap, keylist)
+ self.__config_maps[clusterName].update(vals)
def main():
args = list(sys.argv)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 4537434..0ceeeff 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -246,6 +246,7 @@ class Controller(threading.Thread):
if 'executionCommands' in response.keys():
self.addToQueue(response['executionCommands'])
+ self.alert_scheduler_handler.update_configurations(response['executionCommands'])
pass
if 'statusCommands' in response.keys():
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
index fb41759..a6ecee1 100644
--- a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
+++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
@@ -97,7 +97,7 @@ class Heartbeat:
nodeStatus["alerts"] = hostInfo.createAlerts(nodeStatus["alerts"])
if self.collector is not None:
- nodeStatus['alerts'].extend(self.collector.alerts())
+ heartbeat['alerts'] = self.collector.alerts()
return heartbeat
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
index 6e99692..ea81d78 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
@@ -19,6 +19,8 @@ limitations under the License.
'''
import logging
+import re
+import traceback
logger = logging.getLogger()
@@ -28,31 +30,45 @@ class BaseAlert(object):
RESULT_CRITICAL = 'CRITICAL'
RESULT_UNKNOWN = 'UNKNOWN'
- def __init__(self, collector, alert_meta, alert_source_meta):
- self.collector = collector
+ def __init__(self, alert_meta, alert_source_meta):
self.alert_meta = alert_meta
self.alert_source_meta = alert_source_meta
self.cluster = ''
self.hostname = ''
+ self._lookup_keys = []
+
def interval(self):
+ ''' gets the defined interval this check should run '''
if not self.alert_meta.has_key('interval'):
return 1
else:
- return self.alert_meta['interval']
+ interval = self.alert_meta['interval']
+ return 1 if interval < 1 else interval
+
+ def set_helpers(self, collector, value_dict):
+ ''' sets helper objects for alerts without having to use them in a constructor '''
+ self.collector = collector
+ self.config_value_dict = value_dict
def set_cluster(self, cluster, host):
+ ''' sets cluster information for the alert '''
self.cluster = cluster
self.hostname = host
def collect(self):
+ ''' method used for collection. defers to _collect() '''
+
res = (BaseAlert.RESULT_UNKNOWN, [])
+ res_base_text = "Unknown {0}"
+
try:
res = self._collect()
+ res_base_text = self.alert_source_meta['reporting'][res[0].lower()]['text']
except Exception as e:
- res = (BaseAlert.RESULT_CRITICAL, [str(e)])
-
- res_base_text = self.alert_source_meta['reporting'][res[0].lower()]['text']
+ traceback.print_exc()
+ res = (BaseAlert.RESULT_UNKNOWN, [str(e)])
+ res_base_text = "Unknown {0}"
data = {}
data['name'] = self._find_value('name')
@@ -61,15 +77,45 @@ class BaseAlert(object):
data['text'] = res_base_text.format(*res[1])
data['cluster'] = self.cluster
data['service'] = self._find_value('service')
- data['component'] = self._find_value('component')
+ data['component'] = self._find_value('componentName')
self.collector.put(self.cluster, data)
def _find_value(self, meta_key):
+ ''' safe way to get a value when outputting result json. will not throw an exception '''
if self.alert_meta.has_key(meta_key):
return self.alert_meta[meta_key]
else:
return None
+
+ def get_lookup_keys(self):
+ ''' returns a list of lookup keys found for this alert '''
+ return self._lookup_keys
+
+ def _find_lookup_property(self, key):
+ '''
+ check if the supplied key is parameterized
+ '''
+ keys = re.findall("{{([\S]+)}}", key)
+
+ if len(keys) > 0:
+ logger.debug("Found parameterized key {0} for {1}".format(str(keys), str(self)))
+ self._lookup_keys.append(keys[0])
+ return keys[0]
+
+ return key
+
+ def _lookup_property_value(self, key):
+ '''
+ in the case of specifying a configuration path, lookup that path's value
+ '''
+ if not key in self._lookup_keys:
+ return key
+
+ if key in self.config_value_dict:
+ return self.config_value_dict[key]
+ else:
+ return None
def _collect(self):
'''
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
index 2f051c8..d06d1f4 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
@@ -30,31 +30,37 @@ logger = logging.getLogger()
class PortAlert(BaseAlert):
- def __init__(self, collector, alert_meta, alert_source_meta):
- super(PortAlert, self).__init__(collector, alert_meta, alert_source_meta)
+ def __init__(self, alert_meta, alert_source_meta):
+ super(PortAlert, self).__init__(alert_meta, alert_source_meta)
- default_port = alert_source_meta['default_port']
- uri = alert_source_meta['uri']
+ # can be parameterized
+ self.uri = self._find_lookup_property(alert_source_meta['uri'])
+ self.port = alert_source_meta['default_port']
- self.port = default_port
- self.host = get_host_from_url(uri)
+ def _collect(self):
+ urivalue = self._lookup_property_value(self.uri)
+ host = get_host_from_url(urivalue)
+ port = self.port
+
try:
- self.port = int(get_port_from_url(uri))
+ port = int(get_port_from_url(urivalue))
except:
- # only when port parsing fails
+ # if port not found, default port already set to port
pass
-
- def _collect(self):
- s = None
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug("checking {0} listening on port {1}".format(host, str(port)))
+
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1.5)
t = time.time()
- s.connect((self.host, self.port))
+ s.connect((host, port))
millis = time.time() - t
- return (self.RESULT_OK, [millis/1000, self.port])
+ return (self.RESULT_OK, [millis/1000, port])
+ except Exception as e:
+ return (self.RESULT_CRITICAL, [str(e), host, port])
finally:
if s is not None:
try:
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
index 0d0563a..6674fdf 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
@@ -55,7 +55,7 @@ class TestAlerts(TestCase):
"scope": "host",
"source": {
"type": "PORT",
- "uri": "http://c6409.ambari.apache.org:50070",
+ "uri": "{{hdfs-site/my-key}}",
"default_port": 50070,
"reporting": {
"ok": {
@@ -70,10 +70,40 @@ class TestAlerts(TestCase):
collector = AlertCollector()
- pa = PortAlert(collector, json, json['source'])
+ pa = PortAlert(json, json['source'])
+ pa.set_helpers(collector, {'hdfs-site/my-key': 'value1'})
self.assertEquals(6, pa.interval())
res = pa.collect()
pass
+ def test_port_alert_no_sub(self):
+ json = { "name": "namenode_process",
+ "service": "HDFS",
+ "component": "NAMENODE",
+ "label": "NameNode process",
+ "interval": 6,
+ "scope": "host",
+ "source": {
+ "type": "PORT",
+ "uri": "http://c6401.ambari.apache.org",
+ "default_port": 50070,
+ "reporting": {
+ "ok": {
+ "text": "TCP OK - {0:.4f} response time on port {1}"
+ },
+ "critical": {
+ "text": "Could not load process info: {0}"
+ }
+ }
+ }
+ }
+
+ pa = PortAlert(json, json['source'])
+ pa.set_helpers(AlertCollector(), '')
+ self.assertEquals('http://c6401.ambari.apache.org', pa.uri)
+
+ res = pa.collect()
+
+ pass
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index ad8303f..4674475 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -406,6 +406,7 @@ class TestController(unittest.TestCase):
# one successful request, after stop
self.controller.actionQueue = actionQueue
+ self.controller.alert_scheduler_handler = MagicMock()
self.controller.heartbeatWithServer()
self.assertTrue(sendRequest.called)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def
deleted file mode 100644
index 45fb8d0..0000000
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def
+++ /dev/null
@@ -1,49 +0,0 @@
-{
- "clusterName": "c1",
- "hostName": "c6401.ambari.apache.org",
- "hash": "12341234134412341243124",
- "alertDefinitions": [
- {
- "name": "namenode_cpu",
- "label": "NameNode host CPU Utilization",
- "scope": "host",
- "source": {
- "type": "METRIC",
- "jmx": "java.lang:type=OperatingSystem/SystemCpuLoad",
- "host": "{{hdfs-site/dfs.namenode.secondary.http-address}}"
- }
- },
- {
- "name": "namenode_process",
- "service": "HDFS",
- "component": "NAMENODE",
- "label": "NameNode process",
- "interval": 6,
- "scope": "host",
- "source": {
- "type": "PORT",
- "uri": "http://c6401.ambari.apache.org:50070",
- "default_port": 50070,
- "reporting": {
- "ok": {
- "text": "TCP OK - {0:.4f} response time on port {1}"
- },
- "critical": {
- "text": "Could not load process info: {0}"
- }
- }
- }
- },
- {
- "name": "hdfs_last_checkpoint",
- "label": "Last Checkpoint Time",
- "interval": 1,
- "scope": "service",
- "enabled": false,
- "source": {
- "type": "SCRIPT",
- "path": "scripts/alerts/last_checkpoint.py"
- }
- }
- ]
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json
new file mode 100644
index 0000000..4894b62
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json
@@ -0,0 +1,56 @@
+[
+ {
+ "clusterName": "c1",
+ "hostName": "c6401.ambari.apache.org",
+ "hash": "12341234134412341243124",
+ "configurations": {
+ "hdfs-site": {
+ "dfs.namenode.http-address": "c6401.ambari.apache.org:50070"
+ }
+ },
+ "alertDefinitions": [
+ {
+ "name": "namenode_cpu",
+ "label": "NameNode host CPU Utilization",
+ "scope": "host",
+ "source": {
+ "type": "METRIC",
+ "jmx": "java.lang:type=OperatingSystem/SystemCpuLoad",
+ "host": "{{hdfs-site/dfs.namenode.secondary.http-address}}"
+ }
+ },
+ {
+ "name": "namenode_process",
+ "service": "HDFS",
+ "component": "NAMENODE",
+ "label": "NameNode process",
+ "interval": 6,
+ "scope": "host",
+ "source": {
+ "type": "PORT",
+ "uri": "{{hdfs-site/dfs.namenode.http-address}}",
+ "default_port": 50070,
+ "reporting": {
+ "ok": {
+ "text": "TCP OK - {0:.4f} response time on port {1}"
+ },
+ "critical": {
+ "text": "Could not load process info: {0} on host {1}:{2}"
+ }
+ }
+ }
+ },
+ {
+ "name": "hdfs_last_checkpoint",
+ "label": "Last Checkpoint Time",
+ "interval": 1,
+ "scope": "service",
+ "enabled": false,
+ "source": {
+ "type": "SCRIPT",
+ "path": "scripts/alerts/last_checkpoint.py"
+ }
+ }
+ ]
+ }
+]
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java
index 3c9615f..5ae1741 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java
@@ -17,8 +17,14 @@
*/
package org.apache.ambari.server.agent;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigHelper;
import org.apache.ambari.server.state.alert.AlertDefinition;
import org.apache.ambari.server.state.alert.AlertDefinitionHash;
@@ -41,6 +47,9 @@ public class AlertDefinitionCommand extends AgentCommand {
@SerializedName("alertDefinitions")
private final List<AlertDefinition> m_definitions;
+
+ @SerializedName("configurations")
+ private Map<String, Map<String, String>> m_configurations;
/**
* Constructor.
@@ -106,4 +115,51 @@ public class AlertDefinitionCommand extends AgentCommand {
public String getHostName() {
return m_hostName;
}
+
+ /**
+ * Adds cluster configuration properties as required by commands sent to agent.
+ *
+ * @param configHelper the helper
+ * @param cluster the cluster, matching the cluster name specified by the command
+ */
+ public void addConfigs(ConfigHelper configHelper, Cluster cluster)
+ throws AmbariException {
+
+ m_configurations = new HashMap<String, Map<String, String>>();
+
+ Map<String, Map<String, String>> allConfigTags =
+ configHelper.getEffectiveDesiredTags(cluster, m_hostName);
+
+ for(Config clusterConfig: cluster.getAllConfigs()) {
+ if (null == clusterConfig) {
+ // !!! hard to believe
+ continue;
+ }
+
+ Map<String, String> props = new HashMap<String, String>(clusterConfig.getProperties());
+
+ Map<String, Map<String, String>> configTags = new HashMap<String,
+ Map<String, String>>();
+
+ for (Map.Entry<String, Map<String, String>> entry : allConfigTags.entrySet()) {
+ if (entry.getKey().equals(clusterConfig.getType())) {
+ configTags.put(clusterConfig.getType(), entry.getValue());
+ }
+ }
+
+ Map<String, Map<String, String>> properties = configHelper
+ .getEffectiveConfigProperties(cluster, configTags);
+
+ if (!properties.isEmpty()) {
+ for (Map<String, String> propertyMap : properties.values()) {
+ props.putAll(propertyMap);
+ }
+ }
+
+ m_configurations.put(clusterConfig.getType(), props);
+ }
+
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
index 80cad4e..601bd79 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
@@ -21,6 +21,7 @@ package org.apache.ambari.server.agent;
import java.util.ArrayList;
import java.util.List;
+import org.apache.ambari.server.state.Alert;
import org.codehaus.jackson.annotate.JsonProperty;
/**
@@ -39,6 +40,7 @@ public class HeartBeat {
private List<DiskInfo> mounts = new ArrayList<DiskInfo>();
HostStatus nodeStatus;
private AgentEnv agentEnv = null;
+ private List<Alert> alerts = null;
public long getResponseId() {
return responseId;
@@ -109,6 +111,10 @@ public class HeartBeat {
public void setMounts(List<DiskInfo> mounts) {
this.mounts = mounts;
}
+
+ public List<Alert> getAlerts() {
+ return alerts;
+ }
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 65b7b6f..dca3bd9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -808,6 +808,7 @@ public class HeartBeatHandler {
String hash = alertDefinitionHash.getHash(clusterName, hostname);
AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
hostname, hash, definitions);
+ command.addConfigs(configHelper, cluster);
commands.add(command);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6480115d/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index 5336694..c39ba29 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -218,7 +218,6 @@ public class HeartbeatMonitor implements Runnable {
return cmds;
}
-
/**
* Generates status command and fills all apropriate fields.
* @throws AmbariException