You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/09/04 14:39:07 UTC
[ambari] branch branch-2.7 updated: AMBARI-24583. Ambari agent
status could be reported stale just after execution command thread has
finished execution (aonishuk)
This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 4d44151 AMBARI-24583. Ambari agent status could be reported stale just after execution command thread has finished execution (aonishuk)
4d44151 is described below
commit 4d44151d8cfad305f118123066d4dd708be0c556
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Sun Sep 2 12:37:54 2018 +0300
AMBARI-24583. Ambari agent status could be reported stale just after execution command thread has finished execution (aonishuk)
---
.../python/ambari_agent/ComponentStatusExecutor.py | 35 ++++++++++++++++++++++
1 file changed, 35 insertions(+)
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 64e6ae7..df72c88 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -21,10 +21,12 @@ import threading
from ambari_agent import Constants
from ambari_agent.LiveStatus import LiveStatus
+from ambari_agent.Utils import Utils
from collections import defaultdict
from ambari_agent.models.commands import AgentCommand
from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
+from resource_management.libraries.functions.default import default
class ComponentStatusExecutor(threading.Thread):
@@ -39,6 +41,8 @@ class ComponentStatusExecutor(threading.Thread):
self.reported_component_status = defaultdict(lambda:defaultdict(lambda:defaultdict(lambda:None))) # component statuses which were received by server
self.server_responses_listener = initializer_module.server_responses_listener
self.logger = logging.getLogger(__name__)
+ self.reports_to_discard = []
+ self.reports_to_discard_lock = threading.RLock()
threading.Thread.__init__(self)
def run(self):
@@ -54,6 +58,9 @@ class ComponentStatusExecutor(threading.Thread):
self.clean_not_existing_clusters_info()
cluster_reports = defaultdict(lambda:[])
+ with self.reports_to_discard_lock:
+ self.reports_to_discard = []
+
for cluster_id in self.topology_cache.get_cluster_ids():
# TODO: check if we can make clusters immutable too
try:
@@ -104,6 +111,8 @@ class ComponentStatusExecutor(threading.Thread):
if result:
cluster_reports[cluster_id].append(result)
+
+ cluster_reports = self.discard_stale_reports(cluster_reports)
self.send_updates_to_server(cluster_reports)
except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
pass
@@ -113,6 +122,29 @@ class ComponentStatusExecutor(threading.Thread):
self.stop_event.wait(self.status_commands_run_interval)
self.logger.info("ComponentStatusExecutor has successfully finished")
+ def discard_stale_reports(self, cluster_reports):
+ """
+ Remove reports which are already stale (meaning other process has already updated status to something different)
+ """
+ with self.reports_to_discard_lock:
+ # nothing to discard
+ if not self.reports_to_discard:
+ return cluster_reports
+
+ reports_to_discard = self.reports_to_discard[:]
+
+ new_cluster_reports = defaultdict(lambda:[])
+ for cluster_id, cluster_reports in cluster_reports.iteritems():
+ for cluster_report in cluster_reports:
+ for discarded_report in reports_to_discard:
+ if Utils.are_dicts_equal(cluster_report, discarded_report, keys_to_skip=['status']):
+ self.logger.info("Discarding outdated status {0} before sending".format(cluster_report))
+ break
+ else:
+ new_cluster_reports[cluster_id].append(cluster_report)
+
+ return new_cluster_reports
+
def check_component_status(self, cluster_id, service_name, component_name, command_name, report=False):
"""
Returns components status if it has changed, otherwise None.
@@ -151,6 +183,9 @@ class ComponentStatusExecutor(threading.Thread):
self.recovery_manager.handle_status_change(component_name, status)
if report:
+ with self.reports_to_discard_lock:
+ self.reports_to_discard.append(result)
+
self.send_updates_to_server({cluster_id: [result]})
return result