You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/05/10 14:34:22 UTC

ambari git commit: AMBARI-20916. Run new heartbeatThread and statusCommand executor instead of old (aonishuk)

Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 2e579f11c -> cef3f245e


AMBARI-20916. Run new heartbeatThread and statusCommand executor instead of old  (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cef3f245
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cef3f245
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cef3f245

Branch: refs/heads/branch-3.0-perf
Commit: cef3f245eed01c36a7e3a9c628040a7b4e7ee1d0
Parents: 2e579f1
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Wed May 10 17:34:14 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Wed May 10 17:34:14 2017 +0300

----------------------------------------------------------------------
 .../ambari_agent/ComponentStatusExecutor.py     |  2 +
 .../main/python/ambari_agent/HeartbeatThread.py |  2 +-
 .../src/main/python/ambari_agent/main.py        | 62 ++++++++++++--------
 .../ambari_agent/TestAgentStompResponses.py     |  2 -
 4 files changed, 39 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/cef3f245/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index ebe9156..a3798c6 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -69,6 +69,7 @@ class ComponentStatusExecutor(threading.Thread):
               if self.stop_event.is_set():
                 break
 
+              service_name = component_dict.serviceName
               component_name = component_dict.componentName
 
               # TODO STOMP: run real command
@@ -76,6 +77,7 @@ class ComponentStatusExecutor(threading.Thread):
               #self.customServiceOrchestrator.requestComponentStatus(command)
               status = random.choice(["INSTALLED","STARTED"])
               result = {
+                'serviceName': service_name,
                 'componentName': component_name,
                 'command': command,
                 'status': status,

http://git-wip-us.apache.org/repos/asf/ambari/blob/cef3f245/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 1f7d576..57748a0 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -113,7 +113,7 @@ class HeartbeatThread(threading.Thread):
     """
     Heartbeat body to be send to server
     """
-    return {'heartbeat-request-test':'true'}
+    return {'hostname':'true'}
 
   def subscribe_and_listen(self):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/cef3f245/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 19c92b0..29eb926 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -109,6 +109,10 @@ from HeartbeatHandlers import bind_signal_handlers
 from ambari_commons.constants import AMBARI_SUDO_BINARY
 from resource_management.core.logger import Logger
 
+from ambari_agent import HeartbeatThread
+from ambari_agent.InitializerModule import InitializerModule
+from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
+
 logger = logging.getLogger()
 alerts_logger = logging.getLogger('ambari_alerts')
 
@@ -140,7 +144,7 @@ def setup_logging(logger, filename, logging_level):
     rotateLog.setFormatter(formatter)
     _file_logging_handlers[filename] = rotateLog
   logger.addHandler(rotateLog)
-      
+
   logging.basicConfig(format=formatstr, level=logging_level, filename=filename)
   logger.setLevel(logging_level)
   logger.info("loglevel=logging.{0}".format(logging._levelNames[logging_level]))
@@ -150,18 +154,18 @@ GRACEFUL_STOP_TRIES_SLEEP = 3
 
 
 def add_syslog_handler(logger):
-    
+
   syslog_enabled = config.has_option("logging","syslog_enabled") and (int(config.get("logging","syslog_enabled")) == 1)
-      
+
   #add syslog handler if we are on linux and syslog is enabled in ambari config
   if syslog_enabled and IS_LINUX:
     logger.info("Adding syslog handler to ambari agent logger")
     syslog_handler = SysLogHandler(address="/dev/log",
                                    facility=SysLogHandler.LOG_LOCAL1)
-        
+
     syslog_handler.setFormatter(SYSLOG_FORMATTER)
     logger.addHandler(syslog_handler)
-    
+
 def update_log_level(config):
   # Setting loglevel based on config file
   global logger
@@ -172,7 +176,7 @@ def update_log_level(config):
     # create logger
     logger = logging.getLogger(__name__)
     logger.info("Logging configured by " + log_cfg_file)
-  else:  
+  else:
     try:
       loglevel = config.get('agent', 'loglevel')
       if loglevel is not None:
@@ -210,19 +214,19 @@ def check_sudo():
   # don't need to check sudo for root.
   if os.geteuid() == 0:
     return
-  
+
   runner = shellRunner()
   test_command = [AMBARI_SUDO_BINARY, '/usr/bin/test', '/']
   test_command_str = ' '.join(test_command)
-  
+
   start_time = time.time()
   res = runner.run(test_command)
   end_time = time.time()
   run_time = end_time - start_time
-  
+
   if res['exitCode'] != 0:
     raise Exception("Please check your sudo configurations.\n" + test_command_str + " failed with " + res['error'] + res['output']) # bad sudo configurations
-  
+
   if run_time > 2:
     logger.warn(("Sudo commands on this host are running slowly ('{0}' took {1} seconds).\n" +
                 "This will create a significant slow down for ambari-agent service tasks.").format(test_command_str, run_time))
@@ -275,7 +279,7 @@ def perform_prestart_checks(expected_hostname):
     logger.error(msg)
     print(msg)
     sys.exit(1)
-    
+
   check_sudo()
 
 
@@ -291,7 +295,7 @@ def stop_agent():
     with open(ProcessHelper.pidfile, 'r') as f:
       pid = f.read()
     pid = int(pid)
-    
+
     runner.run([AMBARI_SUDO_BINARY, 'kill', '-15', str(pid)])
     for i in range(GRACEFUL_STOP_TRIES):
       result = runner.run([AMBARI_SUDO_BINARY, 'kill', '-0', str(pid)])
@@ -344,19 +348,25 @@ def reset_agent(options):
 
 MAX_RETRIES = 10
 
-def run_threads(server_hostname, heartbeat_stop_callback):
-  # Launch Controller communication
-  controller = Controller(config, server_hostname, heartbeat_stop_callback)
-  controller.start()
-  time.sleep(2) # in order to get controller.statusCommandsExecutor initialized
-  while controller.is_alive():
-    time.sleep(0.1)
+# TODO STOMP: remove from globals
+initializer_module = None
+
+def run_threads():
+  global initializer_module
+  initializer_module = InitializerModule()
 
-    need_relaunch, reason = controller.get_status_commands_executor().need_relaunch
-    if need_relaunch:
-      controller.get_status_commands_executor().relaunch(reason)
+  heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
+  heartbeat_thread.start()
 
-  controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False)
+  component_status_executor = ComponentStatusExecutor(initializer_module)
+  component_status_executor.start()
+
+  while not initializer_module.stop_event.is_set():
+    time.sleep(0.1)
+
+  # TODO STOMP: if thread cannot stop by itself kill it hard after some timeout.
+  heartbeat_thread.join()
+  component_status_executor.join()
 
 # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
 # we need this for windows os, where no sigterm available
@@ -403,7 +413,7 @@ def main(heartbeat_stop_callback=None):
 
   # Check for ambari configuration file.
   resolve_ambari_config()
-  
+
   # Add syslog hanlder based on ambari config file
   add_syslog_handler(logger)
 
@@ -468,7 +478,7 @@ def main(heartbeat_stop_callback=None):
         # Set the active server
         active_server = server_hostname
         # Launch Controller communication
-        run_threads(server_hostname, heartbeat_stop_callback)
+        run_threads()
 
       #
       # If Ambari Agent connected to the server or
@@ -488,7 +498,7 @@ if __name__ == "__main__":
   is_logger_setup = False
   try:
     heartbeat_stop_callback = bind_signal_handlers(agentPid)
-  
+
     main(heartbeat_stop_callback)
   except SystemExit:
     raise

http://git-wip-us.apache.org/repos/asf/ambari/blob/cef3f245/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index c05f350..cab8fe1 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -40,7 +40,6 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     initializer_module = InitializerModule()
     heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
-    heartbeat_thread.heartbeat_interval = 0
     heartbeat_thread.start()
 
     connect_frame = self.server.frames_queue.get()
@@ -94,7 +93,6 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     self.server.frames_queue.queue.clear()
 
     heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
-    heartbeat_thread.heartbeat_interval = 0
     heartbeat_thread.start()
 
     connect_frame = self.server.frames_queue.get()