You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/07/12 15:49:35 UTC

ambari git commit: AMBARI-21452. Support metadata update, fix inability to re-register after server restart (aonishuk)

Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 063877e47 -> 650e23df8


AMBARI-21452. Support metadata update, fix inability to re-register after server restart (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/650e23df
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/650e23df
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/650e23df

Branch: refs/heads/branch-3.0-perf
Commit: 650e23df8aaa464204b6e72b849b4da699cebd66
Parents: 063877e
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Wed Jul 12 18:48:16 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Wed Jul 12 18:48:16 2017 +0300

----------------------------------------------------------------------
 .../main/python/ambari_agent/ClusterCache.py    |  9 +++++++
 .../ambari_agent/ComponentStatusExecutor.py     |  5 +++-
 .../ambari_agent/CustomServiceOrchestrator.py   |  4 +--
 .../main/python/ambari_agent/HeartbeatThread.py | 28 ++++++++++++++++----
 .../python/ambari_agent/InitializerModule.py    | 13 +++++++--
 .../listeners/MetadataEventListener.py          |  2 +-
 .../listeners/TopologyEventListener.py          |  4 +--
 .../src/main/python/ambari_agent/security.py    |  3 +++
 8 files changed, 54 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index 5acdb18..2316866 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -86,6 +86,15 @@ class ClusterCache(dict):
     self.on_cache_update()
     self.persist_cache()
 
+  def cache_update(self, update_dict):
+    """
+    Update the current dictionary by other one
+    """
+    merged_dict = Utils.update_nested(self._get_mutable_copy(), update_dict)
+    self.rewrite_cache(merged_dict)
+
+  def cache_delete(self, delete_dict):
+    raise NotImplemented()
 
   def rewrite_cluster_cache(self, cluster_id, cache):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 5e53ed8..be3eb5b 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -60,6 +60,9 @@ class ComponentStatusExecutor(threading.Thread):
 
           status_commands_to_run = metadata_cache.status_commands_to_run
 
+          if not 'components' in topology_cache:
+            continue
+
           cluster_components = topology_cache.components
           for component_dict in cluster_components:
             for command_name in status_commands_to_run:
@@ -78,7 +81,7 @@ class ComponentStatusExecutor(threading.Thread):
               }
 
               component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict)
-              logger.info(component_status_result)
+              # TODO STOMP: if status command failed with exception show exception
               status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS
 
               result = {

http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 8bc050a..aa03bc9 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -86,14 +86,12 @@ class CustomServiceOrchestrator():
     self.tmp_dir = self.config.get('agent', 'prefix')
     self.force_https_protocol = self.config.get_force_https_protocol()
     self.exec_tmp_dir = Constants.AGENT_TMP_DIR
-    self.file_cache = FileCache(self.config)
+    self.file_cache = initializer_module.file_cache
     self.status_commands_stdout = os.path.join(self.tmp_dir,
                                                'status_command_stdout.txt')
     self.status_commands_stderr = os.path.join(self.tmp_dir,
                                                'status_command_stderr.txt')
     self.public_fqdn = hostname.public_hostname(self.config)
-    # TODO STOMP: cache reset should be called on every agent registration
-    #controller.registration_listeners.append(self.file_cache.reset)
 
     # Construct the hadoop credential lib JARs path
     self.credential_shell_lib_path = os.path.join(self.config.get('security', 'credential_lib_dir',

http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 6ba84e6..ab24bb4 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -21,7 +21,9 @@ limitations under the License.
 import logging
 import ambari_stomp
 import threading
+from socket import error as socket_error
 
+from ambari_agent.security import ConnectionFailed
 from ambari_agent import Constants
 from ambari_agent.Register import Register
 from ambari_agent.Utils import BlockingDictionary
@@ -67,6 +69,7 @@ class HeartbeatThread(threading.Thread):
     (Constants.HOST_LEVEL_PARAMS_TOPIC_ENPOINT, initializer_module.host_level_params_cache, self.host_level_params_events_listener)
     ]
     self.responseId = 0
+    self.file_cache = initializer_module.file_cache
 
 
   def run(self):
@@ -83,11 +86,18 @@ class HeartbeatThread(threading.Thread):
         response = self.blocking_request(heartbeat_body, Constants.HEARTBEAT_ENDPOINT)
         logger.debug("Heartbeat response is {0}".format(response))
         self.handle_heartbeat_reponse(response)
-      except:
-        logger.exception("Exception in HeartbeatThread. Re-running the registration")
+      except Exception as ex:
+        if not isinstance(ex, (socket_error, ConnectionFailed)):
+          logger.exception("Exception in HeartbeatThread. Re-running the registration")
+
         self.initializer_module.is_registered = False
-        self.initializer_module.connection.disconnect()
-        delattr(self.initializer_module, '_connection')
+        try:
+          self.initializer_module.connection.disconnect()
+        except:
+          # if exception happened due to connection problem, disconnect might not work
+          pass
+        if hasattr(self.initializer_module, '_connection'):
+          delattr(self.initializer_module, '_connection')
 
       self.stop_event.wait(self.heartbeat_interval)
 
@@ -124,6 +134,7 @@ class HeartbeatThread(threading.Thread):
         raise
 
     self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
+    self.file_cache.reset()
     self.initializer_module.is_registered = True
 
   def handle_registration_response(self, response):
@@ -179,7 +190,14 @@ class HeartbeatThread(threading.Thread):
     """
     Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id.
     """
-    correlation_id = self.initializer_module.connection.send(message=message, destination=destination)
+    try:
+      correlation_id = self.initializer_module.connection.send(message=message, destination=destination)
+    except AttributeError:
+      # this happens when trying to connect to broken connection. Happens if ambari-server is restarted.
+      err_msg = "Connection failed while trying to connect to {0}".format(destination)
+      logger.warn(err_msg)
+      raise ConnectionFailed(err_msg)
+
     try:
       return self.server_responses_listener.responses.blocking_pop(str(correlation_id), timeout=timeout)
     except BlockingDictionary.DictionaryPopTimeout:

http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index dbc26e8..fb73d6d 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -21,6 +21,8 @@ limitations under the License.
 import threading
 import logging
 import os
+from socket import error as socket_error
+
 from ambari_agent.FileCache import FileCache
 from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache
@@ -75,6 +77,9 @@ class InitializerModule:
     self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, self.config)
     self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
     self.host_level_params_cache = ClusterHostLevelParamsCache(self.cluster_cache_dir)
+
+    self.file_cache = FileCache(self.config)
+
     self.customServiceOrchestrator = CustomServiceOrchestrator(self)
 
     self.recovery_manager = RecoveryManager(self.recovery_cache_dir)
@@ -92,7 +97,11 @@ class InitializerModule:
     logging.info("Connecting to {0}".format(connection_url))
 
     conn = AmbariStompConnection(connection_url)
-    conn.start()
-    conn.connect(wait=True)
+    try:
+      conn.start()
+      conn.connect(wait=True)
+    except socket_error:
+      logger.warn("Could not connect to {0}".format(connection_url))
+      raise
 
     return conn

http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
index 5802ffe..1e9b6e7 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
@@ -46,7 +46,7 @@ class MetadataEventListener(EventListener):
     if message == {}:
       return
 
-    self.metadata_cache.rewrite_cache(message['clusters'])
+    self.metadata_cache.cache_update(message['clusters'])
     self.metadata_cache.hash = message['hash']
 
   def get_handled_path(self):

http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
index d1c644c..19a1d32 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
@@ -51,10 +51,10 @@ class TopologyEventListener(EventListener):
       self.topology_cache.hash = message['hash']
     elif event_type == 'UPDATE':
       self.topology_cache.cache_update(message['clusters'])
-      self.topology_cache.hash = "abc" #message['hash']
+      self.topology_cache.hash = message['hash']
     elif event_type == 'DELETE':
       self.topology_cache.cache_delete(message['clusters'])
-      self.topology_cache.hash = "abcd" #message['hash']
+      self.topology_cache.hash = message['hash']
     else:
       logger.error("Unknown event type '{0}' for topology event")
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/650e23df/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py
index ca90f0e..230af62 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -102,6 +102,9 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
 
     return sock
 
+class ConnectionFailed(Exception):
+  pass
+
 class AmbariStompConnection(WsConnection):
   def __init__(self, url):
     self.lock = threading.RLock()