You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/05/10 14:34:22 UTC
ambari git commit: AMBARI-20916. Run new heartbeatThread and
statusCommand executor instead of old (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-perf 2e579f11c -> cef3f245e
AMBARI-20916. Run new heartbeatThread and statusCommand executor instead of old (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cef3f245
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cef3f245
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cef3f245
Branch: refs/heads/branch-3.0-perf
Commit: cef3f245eed01c36a7e3a9c628040a7b4e7ee1d0
Parents: 2e579f1
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Wed May 10 17:34:14 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Wed May 10 17:34:14 2017 +0300
----------------------------------------------------------------------
.../ambari_agent/ComponentStatusExecutor.py | 2 +
.../main/python/ambari_agent/HeartbeatThread.py | 2 +-
.../src/main/python/ambari_agent/main.py | 62 ++++++++++++--------
.../ambari_agent/TestAgentStompResponses.py | 2 -
4 files changed, 39 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/cef3f245/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index ebe9156..a3798c6 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -69,6 +69,7 @@ class ComponentStatusExecutor(threading.Thread):
if self.stop_event.is_set():
break
+ service_name = component_dict.serviceName
component_name = component_dict.componentName
# TODO STOMP: run real command
@@ -76,6 +77,7 @@ class ComponentStatusExecutor(threading.Thread):
#self.customServiceOrchestrator.requestComponentStatus(command)
status = random.choice(["INSTALLED","STARTED"])
result = {
+ 'serviceName': service_name,
'componentName': component_name,
'command': command,
'status': status,
http://git-wip-us.apache.org/repos/asf/ambari/blob/cef3f245/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 1f7d576..57748a0 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -113,7 +113,7 @@ class HeartbeatThread(threading.Thread):
"""
Heartbeat body to be send to server
"""
- return {'heartbeat-request-test':'true'}
+ return {'hostname':'true'}
def subscribe_and_listen(self):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/cef3f245/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 19c92b0..29eb926 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -109,6 +109,10 @@ from HeartbeatHandlers import bind_signal_handlers
from ambari_commons.constants import AMBARI_SUDO_BINARY
from resource_management.core.logger import Logger
+from ambari_agent import HeartbeatThread
+from ambari_agent.InitializerModule import InitializerModule
+from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
+
logger = logging.getLogger()
alerts_logger = logging.getLogger('ambari_alerts')
@@ -140,7 +144,7 @@ def setup_logging(logger, filename, logging_level):
rotateLog.setFormatter(formatter)
_file_logging_handlers[filename] = rotateLog
logger.addHandler(rotateLog)
-
+
logging.basicConfig(format=formatstr, level=logging_level, filename=filename)
logger.setLevel(logging_level)
logger.info("loglevel=logging.{0}".format(logging._levelNames[logging_level]))
@@ -150,18 +154,18 @@ GRACEFUL_STOP_TRIES_SLEEP = 3
def add_syslog_handler(logger):
-
+
syslog_enabled = config.has_option("logging","syslog_enabled") and (int(config.get("logging","syslog_enabled")) == 1)
-
+
#add syslog handler if we are on linux and syslog is enabled in ambari config
if syslog_enabled and IS_LINUX:
logger.info("Adding syslog handler to ambari agent logger")
syslog_handler = SysLogHandler(address="/dev/log",
facility=SysLogHandler.LOG_LOCAL1)
-
+
syslog_handler.setFormatter(SYSLOG_FORMATTER)
logger.addHandler(syslog_handler)
-
+
def update_log_level(config):
# Setting loglevel based on config file
global logger
@@ -172,7 +176,7 @@ def update_log_level(config):
# create logger
logger = logging.getLogger(__name__)
logger.info("Logging configured by " + log_cfg_file)
- else:
+ else:
try:
loglevel = config.get('agent', 'loglevel')
if loglevel is not None:
@@ -210,19 +214,19 @@ def check_sudo():
# don't need to check sudo for root.
if os.geteuid() == 0:
return
-
+
runner = shellRunner()
test_command = [AMBARI_SUDO_BINARY, '/usr/bin/test', '/']
test_command_str = ' '.join(test_command)
-
+
start_time = time.time()
res = runner.run(test_command)
end_time = time.time()
run_time = end_time - start_time
-
+
if res['exitCode'] != 0:
raise Exception("Please check your sudo configurations.\n" + test_command_str + " failed with " + res['error'] + res['output']) # bad sudo configurations
-
+
if run_time > 2:
logger.warn(("Sudo commands on this host are running slowly ('{0}' took {1} seconds).\n" +
"This will create a significant slow down for ambari-agent service tasks.").format(test_command_str, run_time))
@@ -275,7 +279,7 @@ def perform_prestart_checks(expected_hostname):
logger.error(msg)
print(msg)
sys.exit(1)
-
+
check_sudo()
@@ -291,7 +295,7 @@ def stop_agent():
with open(ProcessHelper.pidfile, 'r') as f:
pid = f.read()
pid = int(pid)
-
+
runner.run([AMBARI_SUDO_BINARY, 'kill', '-15', str(pid)])
for i in range(GRACEFUL_STOP_TRIES):
result = runner.run([AMBARI_SUDO_BINARY, 'kill', '-0', str(pid)])
@@ -344,19 +348,25 @@ def reset_agent(options):
MAX_RETRIES = 10
-def run_threads(server_hostname, heartbeat_stop_callback):
- # Launch Controller communication
- controller = Controller(config, server_hostname, heartbeat_stop_callback)
- controller.start()
- time.sleep(2) # in order to get controller.statusCommandsExecutor initialized
- while controller.is_alive():
- time.sleep(0.1)
+# TODO STOMP: remove from globals
+initializer_module = None
+
+def run_threads():
+ global initializer_module
+ initializer_module = InitializerModule()
- need_relaunch, reason = controller.get_status_commands_executor().need_relaunch
- if need_relaunch:
- controller.get_status_commands_executor().relaunch(reason)
+ heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
+ heartbeat_thread.start()
- controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False)
+ component_status_executor = ComponentStatusExecutor(initializer_module)
+ component_status_executor.start()
+
+ while not initializer_module.stop_event.is_set():
+ time.sleep(0.1)
+
+ # TODO STOMP: if thread cannot stop by itself kill it hard after some timeout.
+ heartbeat_thread.join()
+ component_status_executor.join()
# event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
# we need this for windows os, where no sigterm available
@@ -403,7 +413,7 @@ def main(heartbeat_stop_callback=None):
# Check for ambari configuration file.
resolve_ambari_config()
-
+
# Add syslog hanlder based on ambari config file
add_syslog_handler(logger)
@@ -468,7 +478,7 @@ def main(heartbeat_stop_callback=None):
# Set the active server
active_server = server_hostname
# Launch Controller communication
- run_threads(server_hostname, heartbeat_stop_callback)
+ run_threads()
#
# If Ambari Agent connected to the server or
@@ -488,7 +498,7 @@ if __name__ == "__main__":
is_logger_setup = False
try:
heartbeat_stop_callback = bind_signal_handlers(agentPid)
-
+
main(heartbeat_stop_callback)
except SystemExit:
raise
http://git-wip-us.apache.org/repos/asf/ambari/blob/cef3f245/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index c05f350..cab8fe1 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -40,7 +40,6 @@ class TestAgentStompResponses(BaseStompServerTestCase):
initializer_module = InitializerModule()
heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
- heartbeat_thread.heartbeat_interval = 0
heartbeat_thread.start()
connect_frame = self.server.frames_queue.get()
@@ -94,7 +93,6 @@ class TestAgentStompResponses(BaseStompServerTestCase):
self.server.frames_queue.queue.clear()
heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
- heartbeat_thread.heartbeat_interval = 0
heartbeat_thread.start()
connect_frame = self.server.frames_queue.get()