You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/04/20 10:05:53 UTC
[ambari] branch trunk updated: AMBARI-23631. After ambari-server
reset ambari-agent caches should be reset. (aonishuk)
This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0ad7020 AMBARI-23631. After ambari-server reset ambari-agent caches should be reset. (aonishuk)
0ad7020 is described below
commit 0ad7020642873208d4569b62d9e90a120ba6675a
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Fri Apr 20 12:01:01 2018 +0300
AMBARI-23631. After ambari-server reset ambari-agent caches should be reset. (aonishuk)
---
.../src/main/python/ambari_agent/AlertStatusReporter.py | 12 ++++++++++++
ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py | 7 ++++++-
.../src/main/python/ambari_agent/HostStatusReporter.py | 9 ++++-----
ambari-agent/src/main/python/ambari_agent/main.py | 3 +++
4 files changed, 25 insertions(+), 6 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
index bfb0e4f..cef0996 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
@@ -38,6 +38,7 @@ class AlertStatusReporter(threading.Thread):
self.collector = initializer_module.alert_scheduler_handler.collector()
self.stop_event = initializer_module.stop_event
self.alert_reports_interval = initializer_module.config.alert_reports_interval
+ self.alert_definitions_cache = initializer_module.alert_definitions_cache
self.stale_alerts_monitor = initializer_module.stale_alerts_monitor
self.reported_alerts = defaultdict(lambda:defaultdict(lambda:[]))
threading.Thread.__init__(self)
@@ -53,6 +54,7 @@ class AlertStatusReporter(threading.Thread):
while not self.stop_event.is_set():
try:
if self.initializer_module.is_registered:
+ self.clean_not_existing_clusters_info()
alerts = self.collector.alerts()
self.stale_alerts_monitor.save_executed_alerts(alerts)
@@ -94,6 +96,16 @@ class AlertStatusReporter(threading.Thread):
return changed_alerts
+
+ def clean_not_existing_clusters_info(self):
+ """
+ This needs to be done to remove information about clusters which where deleted (e.g. ambari-server reset)
+ """
+ for cluster_id in self.reported_alerts.keys():
+ if not cluster_id in self.alert_definitions_cache.get_cluster_ids():
+ del self.reported_alerts[cluster_id]
+
+
@staticmethod
def log_sending(message_dict):
"""
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index ae8e003..4885e8b 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -78,6 +78,7 @@ class HeartbeatThread(threading.Thread):
self.responseId = 0
self.file_cache = initializer_module.file_cache
self.stale_alerts_monitor = initializer_module.stale_alerts_monitor
+ self.post_registration_actions = [self.file_cache.reset]
def run(self):
@@ -142,11 +143,15 @@ class HeartbeatThread(threading.Thread):
self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
- self.file_cache.reset()
+ self.run_post_registration_actions()
self.initializer_module.is_registered = True
# now when registration is done we can expose connection to other threads.
self.initializer_module._connection = self.connection
+ def run_post_registration_actions(self):
+ for post_registration_action in self.post_registration_actions:
+ post_registration_action()
+
def unregister(self):
"""
Disconnect and remove connection object from initializer_module so other threads cannot use it
diff --git a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
index 3de79fe..622c4bd 100644
--- a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
@@ -50,10 +50,6 @@ class HostStatusReporter(threading.Thread):
if self.initializer_module.is_registered and not Utils.are_dicts_equal(report, self.last_report, keys_to_skip=["agentTimeStampAtReporting"]):
self.initializer_module.connection.send(message=report, destination=Constants.HOST_STATUS_REPORTS_ENDPOINT)
self.last_report = report
-
- # don't use else to avoid race condition
- if not self.initializer_module.is_registered:
- self.last_report = {}
except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
pass
except:
@@ -72,4 +68,7 @@ class HostStatusReporter(threading.Thread):
'mounts': self.hardware.osdisks(),
}
- return report
\ No newline at end of file
+ return report
+
+ def clean_cache(self):
+ self.last_report = {}
\ No newline at end of file
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 6280a4c..693b04e 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -376,6 +376,9 @@ def run_threads(initializer_module):
alert_status_reporter = AlertStatusReporter(initializer_module)
alert_status_reporter.start()
+ # clean caches for non-existing clusters (ambari-server reset case)
+ heartbeat_thread.post_registration_actions += [component_status_executor.clean_not_existing_clusters_info, alert_status_reporter.clean_not_existing_clusters_info, host_status_reporter.clean_cache]
+
initializer_module.action_queue.start()
while not initializer_module.stop_event.is_set():
--
To stop receiving notification emails like this one, please contact
aonishuk@apache.org.