You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/04/20 10:05:53 UTC

[ambari] branch trunk updated: AMBARI-23631. After ambari-server reset ambari-agent caches should be reset. (aonishuk)

This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0ad7020  AMBARI-23631. After ambari-server reset ambari-agent caches should be reset. (aonishuk)
0ad7020 is described below

commit 0ad7020642873208d4569b62d9e90a120ba6675a
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Fri Apr 20 12:01:01 2018 +0300

    AMBARI-23631. After ambari-server reset ambari-agent caches should be reset. (aonishuk)
---
 .../src/main/python/ambari_agent/AlertStatusReporter.py      | 12 ++++++++++++
 ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py |  7 ++++++-
 .../src/main/python/ambari_agent/HostStatusReporter.py       |  9 ++++-----
 ambari-agent/src/main/python/ambari_agent/main.py            |  3 +++
 4 files changed, 25 insertions(+), 6 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
index bfb0e4f..cef0996 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
@@ -38,6 +38,7 @@ class AlertStatusReporter(threading.Thread):
     self.collector = initializer_module.alert_scheduler_handler.collector()
     self.stop_event = initializer_module.stop_event
     self.alert_reports_interval = initializer_module.config.alert_reports_interval
+    self.alert_definitions_cache = initializer_module.alert_definitions_cache
     self.stale_alerts_monitor = initializer_module.stale_alerts_monitor
     self.reported_alerts = defaultdict(lambda:defaultdict(lambda:[]))
     threading.Thread.__init__(self)
@@ -53,6 +54,7 @@ class AlertStatusReporter(threading.Thread):
     while not self.stop_event.is_set():
       try:
         if self.initializer_module.is_registered:
+          self.clean_not_existing_clusters_info()
           alerts = self.collector.alerts()
           self.stale_alerts_monitor.save_executed_alerts(alerts)
 
@@ -94,6 +96,16 @@ class AlertStatusReporter(threading.Thread):
 
     return changed_alerts
     
+
+  def clean_not_existing_clusters_info(self):
+    """
+    This needs to be done to remove information about clusters which where deleted (e.g. ambari-server reset)
+    """
+    for cluster_id in self.reported_alerts.keys():
+      if not cluster_id in self.alert_definitions_cache.get_cluster_ids():
+        del self.reported_alerts[cluster_id]
+
+
   @staticmethod
   def log_sending(message_dict):
     """
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index ae8e003..4885e8b 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -78,6 +78,7 @@ class HeartbeatThread(threading.Thread):
     self.responseId = 0
     self.file_cache = initializer_module.file_cache
     self.stale_alerts_monitor = initializer_module.stale_alerts_monitor
+    self.post_registration_actions = [self.file_cache.reset]
 
 
   def run(self):
@@ -142,11 +143,15 @@ class HeartbeatThread(threading.Thread):
 
     self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
 
-    self.file_cache.reset()
+    self.run_post_registration_actions()
     self.initializer_module.is_registered = True
     # now when registration is done we can expose connection to other threads.
     self.initializer_module._connection = self.connection
 
+  def run_post_registration_actions(self):
+    for post_registration_action in self.post_registration_actions:
+      post_registration_action()
+
   def unregister(self):
     """
     Disconnect and remove connection object from initializer_module so other threads cannot use it
diff --git a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
index 3de79fe..622c4bd 100644
--- a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
@@ -50,10 +50,6 @@ class HostStatusReporter(threading.Thread):
           if self.initializer_module.is_registered and not Utils.are_dicts_equal(report, self.last_report, keys_to_skip=["agentTimeStampAtReporting"]):
             self.initializer_module.connection.send(message=report, destination=Constants.HOST_STATUS_REPORTS_ENDPOINT)
             self.last_report = report
-
-        # don't use else to avoid race condition
-        if not self.initializer_module.is_registered:
-          self.last_report = {}
       except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
         pass
       except:
@@ -72,4 +68,7 @@ class HostStatusReporter(threading.Thread):
       'mounts': self.hardware.osdisks(),
     }
 
-    return report
\ No newline at end of file
+    return report
+
+  def clean_cache(self):
+    self.last_report = {}
\ No newline at end of file
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 6280a4c..693b04e 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -376,6 +376,9 @@ def run_threads(initializer_module):
   alert_status_reporter = AlertStatusReporter(initializer_module)
   alert_status_reporter.start()
 
+  # clean caches for non-existing clusters (ambari-server reset case)
+  heartbeat_thread.post_registration_actions += [component_status_executor.clean_not_existing_clusters_info, alert_status_reporter.clean_not_existing_clusters_info, host_status_reporter.clean_cache]
+
   initializer_module.action_queue.start()
 
   while not initializer_module.stop_event.is_set():

-- 
To stop receiving notification emails like this one, please contact
aonishuk@apache.org.