You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/09/18 15:54:49 UTC

[ambari] branch branch-2.7 updated: AMBARI-24654. Tasks fail on ambari-agent intermittently under cpu load due to race condition in ambari-agent (aonishuk)

This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 02b237c  AMBARI-24654. Tasks fail on ambari-agent intermittently under cpu load due to race condition in ambari-agent (aonishuk)
02b237c is described below

commit 02b237c41dd8f7ee04d884a8b658e3a33567a659
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Tue Sep 18 11:07:20 2018 +0300

    AMBARI-24654. Tasks fail on ambari-agent intermittently under cpu load due to race condition in ambari-agent (aonishuk)
---
 .../src/main/python/ambari_agent/ClusterTopologyCache.py   | 12 +++++++++++-
 ambari-agent/src/main/python/ambari_agent/Utils.py         | 14 +++++++++++++-
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
index 316885d..b7863c6 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
@@ -21,12 +21,14 @@ limitations under the License.
 
 from ambari_agent import hostname
 from ambari_agent.ClusterCache import ClusterCache
-from ambari_agent.Utils import ImmutableDictionary
+from ambari_agent.Utils import ImmutableDictionary, synchronized
 
 from collections import defaultdict
+import threading
 import logging
 
 logger = logging.getLogger(__name__)
+topology_update_lock = threading.RLock()
 
 class ClusterTopologyCache(ClusterCache):
   """
@@ -53,6 +55,7 @@ class ClusterTopologyCache(ClusterCache):
   def get_cache_name(self):
     return 'topology'
 
+  @synchronized(topology_update_lock)
   def on_cache_update(self):
     self.cluster_host_info = None
 
@@ -95,6 +98,7 @@ class ClusterTopologyCache(ClusterCache):
     self.hosts_to_id = ImmutableDictionary(hosts_to_id)
     self.components_by_key = ImmutableDictionary(components_by_key)
 
+  @synchronized(topology_update_lock)
   def get_cluster_host_info(self, cluster_id):
     """
     Get dictionary used in commands as clusterHostInfo
@@ -124,6 +128,7 @@ class ClusterTopologyCache(ClusterCache):
     self.cluster_host_info = cluster_host_info
     return cluster_host_info
 
+  @synchronized(topology_update_lock)
   def get_component_info_by_key(self, cluster_id, service_name, component_name):
     """
     Find component by service_name and component_name in list of component dictionaries.
@@ -135,12 +140,15 @@ class ClusterTopologyCache(ClusterCache):
     except KeyError:
       return None
 
+  @synchronized(topology_update_lock)
   def get_cluster_local_components(self, cluster_id):
     return self.cluster_local_components[cluster_id]
 
+  @synchronized(topology_update_lock)
   def get_cluster_component_version_map(self, cluster_id):
     return self.component_version_map[cluster_id]
 
+  @synchronized(topology_update_lock)
   def get_host_info_by_id(self, cluster_id, host_id):
     """
     Find host by id in list of host dictionaries.
@@ -150,10 +158,12 @@ class ClusterTopologyCache(ClusterCache):
     except KeyError:
       return None
 
+  @synchronized(topology_update_lock)
   def get_current_host_info(self, cluster_id):
     current_host_id = self.current_host_ids_to_cluster[cluster_id]
     return self.get_host_info_by_id(cluster_id, current_host_id)
 
+  @synchronized(topology_update_lock)
   def get_current_host_id(self, cluster_id):
     try:
       return self.current_host_ids_to_cluster[cluster_id]
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py
index 33851f3..4bf6ddf 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -171,7 +171,8 @@ class ImmutableDictionary(dict):
     """
     Recursively turn dict to ImmutableDictionary
     """
-    for k, v in dictionary.iteritems():
+    if not isinstance(dictionary, ImmutableDictionary):
+      for k, v in dictionary.iteritems():
         dictionary[k] = Utils.make_immutable(v)
 
     super(ImmutableDictionary, self).__init__(dictionary)
@@ -221,6 +222,17 @@ def lazy_property(undecorated):
 
   return decorated
 
+def synchronized(lock):
+    def wrap(f):
+        def newFunction(*args, **kw):
+            lock.acquire()
+            try:
+                return f(*args, **kw)
+            finally:
+                lock.release()
+        return newFunction
+    return wrap
+
 def execute_with_retries(tries, try_sleep, retry_exception_class, func, *args, **kwargs):
   for i in range(tries):
     try: