You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/09/18 15:54:49 UTC
[ambari] branch branch-2.7 updated: AMBARI-24654. Tasks fail on
ambari-agent intermittently under cpu load due to race condition in
ambari-agent (aonishuk)
This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 02b237c AMBARI-24654. Tasks fail on ambari-agent intermittently under cpu load due to race condition in ambari-agent (aonishuk)
02b237c is described below
commit 02b237c41dd8f7ee04d884a8b658e3a33567a659
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Tue Sep 18 11:07:20 2018 +0300
AMBARI-24654. Tasks fail on ambari-agent intermittently under cpu load due to race condition in ambari-agent (aonishuk)
---
.../src/main/python/ambari_agent/ClusterTopologyCache.py | 12 +++++++++++-
ambari-agent/src/main/python/ambari_agent/Utils.py | 14 +++++++++++++-
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
index 316885d..b7863c6 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
@@ -21,12 +21,14 @@ limitations under the License.
from ambari_agent import hostname
from ambari_agent.ClusterCache import ClusterCache
-from ambari_agent.Utils import ImmutableDictionary
+from ambari_agent.Utils import ImmutableDictionary, synchronized
from collections import defaultdict
+import threading
import logging
logger = logging.getLogger(__name__)
+topology_update_lock = threading.RLock()
class ClusterTopologyCache(ClusterCache):
"""
@@ -53,6 +55,7 @@ class ClusterTopologyCache(ClusterCache):
def get_cache_name(self):
return 'topology'
+ @synchronized(topology_update_lock)
def on_cache_update(self):
self.cluster_host_info = None
@@ -95,6 +98,7 @@ class ClusterTopologyCache(ClusterCache):
self.hosts_to_id = ImmutableDictionary(hosts_to_id)
self.components_by_key = ImmutableDictionary(components_by_key)
+ @synchronized(topology_update_lock)
def get_cluster_host_info(self, cluster_id):
"""
Get dictionary used in commands as clusterHostInfo
@@ -124,6 +128,7 @@ class ClusterTopologyCache(ClusterCache):
self.cluster_host_info = cluster_host_info
return cluster_host_info
+ @synchronized(topology_update_lock)
def get_component_info_by_key(self, cluster_id, service_name, component_name):
"""
Find component by service_name and component_name in list of component dictionaries.
@@ -135,12 +140,15 @@ class ClusterTopologyCache(ClusterCache):
except KeyError:
return None
+ @synchronized(topology_update_lock)
def get_cluster_local_components(self, cluster_id):
return self.cluster_local_components[cluster_id]
+ @synchronized(topology_update_lock)
def get_cluster_component_version_map(self, cluster_id):
return self.component_version_map[cluster_id]
+ @synchronized(topology_update_lock)
def get_host_info_by_id(self, cluster_id, host_id):
"""
Find host by id in list of host dictionaries.
@@ -150,10 +158,12 @@ class ClusterTopologyCache(ClusterCache):
except KeyError:
return None
+ @synchronized(topology_update_lock)
def get_current_host_info(self, cluster_id):
current_host_id = self.current_host_ids_to_cluster[cluster_id]
return self.get_host_info_by_id(cluster_id, current_host_id)
+ @synchronized(topology_update_lock)
def get_current_host_id(self, cluster_id):
try:
return self.current_host_ids_to_cluster[cluster_id]
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py
index 33851f3..4bf6ddf 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -171,7 +171,8 @@ class ImmutableDictionary(dict):
"""
Recursively turn dict to ImmutableDictionary
"""
- for k, v in dictionary.iteritems():
+ if not isinstance(dictionary, ImmutableDictionary):
+ for k, v in dictionary.iteritems():
dictionary[k] = Utils.make_immutable(v)
super(ImmutableDictionary, self).__init__(dictionary)
@@ -221,6 +222,17 @@ def lazy_property(undecorated):
return decorated
+def synchronized(lock):
+ def wrap(f):
+ def newFunction(*args, **kw):
+ lock.acquire()
+ try:
+ return f(*args, **kw)
+ finally:
+ lock.release()
+ return newFunction
+ return wrap
+
def execute_with_retries(tries, try_sleep, retry_exception_class, func, *args, **kwargs):
for i in range(tries):
try: