You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/09/24 22:15:45 UTC
[ambari] branch trunk updated: [AMBARI-24679] Fix race condition in
agent during registration and topology updates. (#2368)
This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5a9b6e0 [AMBARI-24679] Fix race condition in agent during registration and topology updates. (#2368)
5a9b6e0 is described below
commit 5a9b6e03bc27493f427043b0ae735b20815cfd1e
Author: avijayanhwx <av...@hortonworks.com>
AuthorDate: Mon Sep 24 15:15:39 2018 -0700
[AMBARI-24679] Fix race condition in agent during registration and topology updates. (#2368)
---
.../main/python/ambari_agent/HeartbeatThread.py | 6 +++-
.../main/python/ambari_agent/listeners/__init__.py | 38 ++++++++++++++++++++--
2 files changed, 40 insertions(+), 4 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index b5510d1..ded5edd 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -145,7 +145,11 @@ class HeartbeatThread(threading.Thread):
logger.exception("Exception while handing response to request at {0}. {1}".format(endpoint, response))
raise
finally:
- listener.enabled = True
+ with listener.event_queue_lock:
+ logger.info("Enabling events for listener {0}".format(listener))
+ listener.enabled = True
+ # Process queued messages if any
+ listener.dequeue_unprocessed_events()
self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index 1f541aa..b50bdaa 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -25,16 +25,40 @@ import copy
from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
from ambari_agent import Constants
from ambari_agent.Utils import Utils
+from Queue import Queue
+import threading
logger = logging.getLogger(__name__)
class EventListener(ambari_stomp.ConnectionListener):
+
+ unprocessed_messages_queue = Queue(100)
+
"""
Base abstract class for event listeners on specific topics.
"""
def __init__(self, initializer_module):
self.initializer_module = initializer_module
self.enabled = True
+ self.event_queue_lock = threading.RLock()
+
+ def dequeue_unprocessed_events(self):
+ while not self.unprocessed_messages_queue.empty():
+ payload = self.unprocessed_messages_queue.get_nowait()
+ if payload:
+ logger.info("Processing event from unprocessed queue {0} {1}".format(payload[0], payload[1]))
+ destination = payload[0]
+ headers = payload[1]
+ message_json = payload[2]
+ message = payload[3]
+ try:
+ self.on_event(headers, message_json)
+ except Exception as ex:
+ logger.exception("Exception while handing event from {0} {1} {2}".format(destination, headers, message))
+ self.report_status_to_sender(headers, message, ex)
+ else:
+ self.report_status_to_sender(headers, message)
+
def on_message(self, headers, message):
"""
@@ -58,13 +82,21 @@ class EventListener(ambari_stomp.ConnectionListener):
logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, copy.deepcopy(message_json))))
if not self.enabled:
- logger.info("Ignoring event to {0} since event listener is disabled".format(destination))
- return
+ with self.event_queue_lock:
+ if not self.enabled:
+ logger.info("Queuing event as unprocessed {0} since event "
+ "listener is disabled".format(destination))
+ try:
+ self.unprocessed_messages_queue.put_nowait((destination, headers, message_json, message))
+ except Exception as ex:
+ logger.warning("Cannot queue any more unprocessed events since "
+ "queue is full! {0} {1}".format(destination, message))
+ return
try:
self.on_event(headers, message_json)
except Exception as ex:
- logger.exception("Exception while handing event from {0} {1}".format(destination, headers, message))
+ logger.exception("Exception while handing event from {0} {1} {2}".format(destination, headers, message))
self.report_status_to_sender(headers, message, ex)
else:
self.report_status_to_sender(headers, message)