You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/07/12 15:49:35 UTC
ambari git commit: AMBARI-21452. Support metadata update,
fix inability to re-register after server restart (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-perf 063877e47 -> 650e23df8
AMBARI-21452. Support metadata update, fix inability to re-register after server restart (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/650e23df
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/650e23df
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/650e23df
Branch: refs/heads/branch-3.0-perf
Commit: 650e23df8aaa464204b6e72b849b4da699cebd66
Parents: 063877e
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Wed Jul 12 18:48:16 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Wed Jul 12 18:48:16 2017 +0300
----------------------------------------------------------------------
.../main/python/ambari_agent/ClusterCache.py | 9 +++++++
.../ambari_agent/ComponentStatusExecutor.py | 5 +++-
.../ambari_agent/CustomServiceOrchestrator.py | 4 +--
.../main/python/ambari_agent/HeartbeatThread.py | 28 ++++++++++++++++----
.../python/ambari_agent/InitializerModule.py | 13 +++++++--
.../listeners/MetadataEventListener.py | 2 +-
.../listeners/TopologyEventListener.py | 4 +--
.../src/main/python/ambari_agent/security.py | 3 +++
8 files changed, 54 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index 5acdb18..2316866 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -86,6 +86,15 @@ class ClusterCache(dict):
self.on_cache_update()
self.persist_cache()
+ def cache_update(self, update_dict):
+ """
+ Update the current dictionary by other one
+ """
+ merged_dict = Utils.update_nested(self._get_mutable_copy(), update_dict)
+ self.rewrite_cache(merged_dict)
+
+ def cache_delete(self, delete_dict):
+ raise NotImplemented()
def rewrite_cluster_cache(self, cluster_id, cache):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 5e53ed8..be3eb5b 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -60,6 +60,9 @@ class ComponentStatusExecutor(threading.Thread):
status_commands_to_run = metadata_cache.status_commands_to_run
+ if not 'components' in topology_cache:
+ continue
+
cluster_components = topology_cache.components
for component_dict in cluster_components:
for command_name in status_commands_to_run:
@@ -78,7 +81,7 @@ class ComponentStatusExecutor(threading.Thread):
}
component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict)
- logger.info(component_status_result)
+ # TODO STOMP: if status command failed with exception show exception
status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS
result = {
http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 8bc050a..aa03bc9 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -86,14 +86,12 @@ class CustomServiceOrchestrator():
self.tmp_dir = self.config.get('agent', 'prefix')
self.force_https_protocol = self.config.get_force_https_protocol()
self.exec_tmp_dir = Constants.AGENT_TMP_DIR
- self.file_cache = FileCache(self.config)
+ self.file_cache = initializer_module.file_cache
self.status_commands_stdout = os.path.join(self.tmp_dir,
'status_command_stdout.txt')
self.status_commands_stderr = os.path.join(self.tmp_dir,
'status_command_stderr.txt')
self.public_fqdn = hostname.public_hostname(self.config)
- # TODO STOMP: cache reset should be called on every agent registration
- #controller.registration_listeners.append(self.file_cache.reset)
# Construct the hadoop credential lib JARs path
self.credential_shell_lib_path = os.path.join(self.config.get('security', 'credential_lib_dir',
http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 6ba84e6..ab24bb4 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -21,7 +21,9 @@ limitations under the License.
import logging
import ambari_stomp
import threading
+from socket import error as socket_error
+from ambari_agent.security import ConnectionFailed
from ambari_agent import Constants
from ambari_agent.Register import Register
from ambari_agent.Utils import BlockingDictionary
@@ -67,6 +69,7 @@ class HeartbeatThread(threading.Thread):
(Constants.HOST_LEVEL_PARAMS_TOPIC_ENPOINT, initializer_module.host_level_params_cache, self.host_level_params_events_listener)
]
self.responseId = 0
+ self.file_cache = initializer_module.file_cache
def run(self):
@@ -83,11 +86,18 @@ class HeartbeatThread(threading.Thread):
response = self.blocking_request(heartbeat_body, Constants.HEARTBEAT_ENDPOINT)
logger.debug("Heartbeat response is {0}".format(response))
self.handle_heartbeat_reponse(response)
- except:
- logger.exception("Exception in HeartbeatThread. Re-running the registration")
+ except Exception as ex:
+ if not isinstance(ex, (socket_error, ConnectionFailed)):
+ logger.exception("Exception in HeartbeatThread. Re-running the registration")
+
self.initializer_module.is_registered = False
- self.initializer_module.connection.disconnect()
- delattr(self.initializer_module, '_connection')
+ try:
+ self.initializer_module.connection.disconnect()
+ except:
+ # if exception happened due to connection problem, disconnect might not work
+ pass
+ if hasattr(self.initializer_module, '_connection'):
+ delattr(self.initializer_module, '_connection')
self.stop_event.wait(self.heartbeat_interval)
@@ -124,6 +134,7 @@ class HeartbeatThread(threading.Thread):
raise
self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
+ self.file_cache.reset()
self.initializer_module.is_registered = True
def handle_registration_response(self, response):
@@ -179,7 +190,14 @@ class HeartbeatThread(threading.Thread):
"""
Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id.
"""
- correlation_id = self.initializer_module.connection.send(message=message, destination=destination)
+ try:
+ correlation_id = self.initializer_module.connection.send(message=message, destination=destination)
+ except AttributeError:
+ # this happens when trying to connect to broken connection. Happens if ambari-server is restarted.
+ err_msg = "Connection failed while trying to connect to {0}".format(destination)
+ logger.warn(err_msg)
+ raise ConnectionFailed(err_msg)
+
try:
return self.server_responses_listener.responses.blocking_pop(str(correlation_id), timeout=timeout)
except BlockingDictionary.DictionaryPopTimeout:
http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index dbc26e8..fb73d6d 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -21,6 +21,8 @@ limitations under the License.
import threading
import logging
import os
+from socket import error as socket_error
+
from ambari_agent.FileCache import FileCache
from ambari_agent.AmbariConfig import AmbariConfig
from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache
@@ -75,6 +77,9 @@ class InitializerModule:
self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, self.config)
self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
self.host_level_params_cache = ClusterHostLevelParamsCache(self.cluster_cache_dir)
+
+ self.file_cache = FileCache(self.config)
+
self.customServiceOrchestrator = CustomServiceOrchestrator(self)
self.recovery_manager = RecoveryManager(self.recovery_cache_dir)
@@ -92,7 +97,11 @@ class InitializerModule:
logging.info("Connecting to {0}".format(connection_url))
conn = AmbariStompConnection(connection_url)
- conn.start()
- conn.connect(wait=True)
+ try:
+ conn.start()
+ conn.connect(wait=True)
+ except socket_error:
+ logger.warn("Could not connect to {0}".format(connection_url))
+ raise
return conn
http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
index 5802ffe..1e9b6e7 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
@@ -46,7 +46,7 @@ class MetadataEventListener(EventListener):
if message == {}:
return
- self.metadata_cache.rewrite_cache(message['clusters'])
+ self.metadata_cache.cache_update(message['clusters'])
self.metadata_cache.hash = message['hash']
def get_handled_path(self):
http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
index d1c644c..19a1d32 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
@@ -51,10 +51,10 @@ class TopologyEventListener(EventListener):
self.topology_cache.hash = message['hash']
elif event_type == 'UPDATE':
self.topology_cache.cache_update(message['clusters'])
- self.topology_cache.hash = "abc" #message['hash']
+ self.topology_cache.hash = message['hash']
elif event_type == 'DELETE':
self.topology_cache.cache_delete(message['clusters'])
- self.topology_cache.hash = "abcd" #message['hash']
+ self.topology_cache.hash = message['hash']
else:
logger.error("Unknown event type '{0}' for topology event")
http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py
index ca90f0e..230af62 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -102,6 +102,9 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
return sock
+class ConnectionFailed(Exception):
+ pass
+
class AmbariStompConnection(WsConnection):
def __init__(self, url):
self.lock = threading.RLock()