You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/05/18 11:24:13 UTC
[ambari] branch trunk updated: AMBARI-23894. ZooKeepers Show As
Down After EU to HDP 3.0 But They Are Not (aonishuk)
This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2c57b45 AMBARI-23894. ZooKeepers Show As Down After EU to HDP 3.0 But They Are Not (aonishuk)
2c57b45 is described below
commit 2c57b457caee50e67c747dfd4ae6d5a2d83e9dac
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Fri May 18 13:31:02 2018 +0300
AMBARI-23894. ZooKeepers Show As Down After EU to HDP 3.0 But They Are Not (aonishuk)
---
.../src/main/python/ambari_agent/ActionQueue.py | 9 +++
.../python/ambari_agent/ComponentStatusExecutor.py | 77 +++++++++++++---------
.../main/python/ambari_agent/HeartbeatThread.py | 4 +-
.../main/python/ambari_agent/InitializerModule.py | 19 ++++++
ambari-agent/src/main/python/ambari_agent/main.py | 39 +++--------
.../resource_management/core/providers/accounts.py | 14 +++-
6 files changed, 100 insertions(+), 62 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 4ac5d67..65239ed 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -83,6 +83,7 @@ class ActionQueue(threading.Thread):
self.tmpdir = self.config.get('agent', 'prefix')
self.customServiceOrchestrator = initializer_module.customServiceOrchestrator
self.parallel_execution = self.config.get_parallel_exec_option()
+ self.component_status_executor = initializer_module.component_status_executor
if self.parallel_execution == 1:
logger.info("Parallel execution is enabled, will execute agent commands in parallel")
self.lock = threading.Lock()
@@ -421,6 +422,14 @@ class ActionQueue(threading.Thread):
self.recovery_manager.process_execution_command_result(command, status)
self.commandStatuses.put_command_status(command, roleResult)
+ cluster_id = str(command['clusterId'])
+
+ if cluster_id != '-1' and cluster_id != 'null':
+ service_name = command['serviceName']
+ if service_name != 'null':
+ component_name = command['role']
+ self.component_status_executor.check_component_status(clusterId, service_name, component_name, "STATUS", report=True)
+
def log_command_output(self, text, taskId):
"""
Logs a message as multiple enumerated log messages every of which is not larger than MAX_SYMBOLS_PER_LOG_MESSAGE.
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index e2c73bd..c9f86da 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -99,39 +99,10 @@ class ComponentStatusExecutor(threading.Thread):
logger.info("Skipping status command for {0}. Since command for it is running".format(component_name))
continue
- command_dict = {
- 'serviceName': service_name,
- 'role': component_name,
- 'clusterId': cluster_id,
- 'commandType': 'STATUS_COMMAND',
- }
+ result = self.check_component_status(cluster_id, service_name, component_name, command_name)
- component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict)
- status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS
-
- # if exec command for component started to run after status command completion
- if self.customServiceOrchestrator.commandsRunningForComponent(cluster_id, component_name):
- logger.info("Skipped status command result for {0}. Since command for it is running".format(component_name))
- continue
-
- # log if status command failed
- if status == LiveStatus.DEAD_STATUS:
- stderr = component_status_result['stderr']
- if not "ComponentIsNotRunning" in stderr and not "ClientComponentHasNoStatus" in stderr:
- logger.info("Status command for {0} failed:\n{1}".format(component_name, stderr))
-
- result = {
- 'serviceName': service_name,
- 'componentName': component_name,
- 'command': command_name,
- 'status': status,
- 'clusterId': cluster_id,
- }
-
- if status != self.reported_component_status[cluster_id][component_name][command_name]:
- logging.info("Status for {0} has changed to {1}".format(component_name, status))
+ if result:
cluster_reports[cluster_id].append(result)
- self.recovery_manager.handle_status_change(component_name, status)
self.send_updates_to_server(cluster_reports)
except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
@@ -142,6 +113,50 @@ class ComponentStatusExecutor(threading.Thread):
self.stop_event.wait(self.status_commands_run_interval)
logger.info("ComponentStatusExecutor has successfully finished")
+ def check_component_status(self, cluster_id, service_name, component_name, command_name, report=False):
+ """
+ Returns components status if it has changed, otherwise None.
+ """
+
+ # if not a component
+ if self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name) is None:
+ return None
+
+ command_dict = {
+ 'serviceName': service_name,
+ 'role': component_name,
+ 'clusterId': cluster_id,
+ 'commandType': 'STATUS_COMMAND',
+ }
+
+ component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict)
+ status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS
+
+ # log if status command failed
+ if status == LiveStatus.DEAD_STATUS:
+ stderr = component_status_result['stderr']
+ if not "ComponentIsNotRunning" in stderr and not "ClientComponentHasNoStatus" in stderr:
+ logger.info("Status command for {0} failed:\n{1}".format(component_name, stderr))
+
+ result = {
+ 'serviceName': service_name,
+ 'componentName': component_name,
+ 'command': command_name,
+ 'status': status,
+ 'clusterId': cluster_id,
+ }
+
+ if status != self.reported_component_status[cluster_id][component_name][command_name]:
+ logging.info("Status for {0} has changed to {1}".format(component_name, status))
+ self.recovery_manager.handle_status_change(component_name, status)
+
+ if report:
+ self.send_updates_to_server({cluster_id: [result]})
+
+ return result
+
+ return None
+
def send_updates_to_server(self, cluster_reports):
if not cluster_reports or not self.initializer_module.is_registered:
return
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 01bf3c5..179cf6b 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -78,7 +78,9 @@ class HeartbeatThread(threading.Thread):
self.responseId = 0
self.file_cache = initializer_module.file_cache
self.stale_alerts_monitor = initializer_module.stale_alerts_monitor
- self.post_registration_actions = [self.file_cache.reset]
+ self.post_registration_actions = [self.file_cache.reset, initializer_module.component_status_executor.clean_not_existing_clusters_info,
+ initializer_module.alert_status_reporter.clean_not_existing_clusters_info, initializer_module.host_status_reporter.clean_cache]
+
def run(self):
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 61c1e65..052e8c1 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -38,6 +38,12 @@ from ambari_agent.StaleAlertsMonitor import StaleAlertsMonitor
from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
+from ambari_agent import HeartbeatThread
+from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
+from ambari_agent.CommandStatusReporter import CommandStatusReporter
+from ambari_agent.HostStatusReporter import HostStatusReporter
+from ambari_agent.AlertStatusReporter import AlertStatusReporter
+
logger = logging.getLogger(__name__)
class InitializerModule:
@@ -74,8 +80,21 @@ class InitializerModule:
self.recovery_manager = RecoveryManager(self.config.recovery_cache_dir)
self.commandStatuses = CommandStatusDict(self)
+
+ self.init_threads()
+
+
+ def init_threads(self):
+ """
+ Initialize thread objects
+ """
+ self.component_status_executor = ComponentStatusExecutor(self)
self.action_queue = ActionQueue(self)
self.alert_scheduler_handler = AlertSchedulerHandler(self)
+ self.command_status_reporter = CommandStatusReporter(self)
+ self.host_status_reporter = HostStatusReporter(self)
+ self.alert_status_reporter = AlertStatusReporter(self)
+ self.heartbeat_thread = HeartbeatThread.HeartbeatThread(self)
@property
def connection(self):
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 693b04e..b7b9042 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -106,12 +106,7 @@ from resource_management.core.logger import Logger
#from resource_management.core.resources.system import File
#from resource_management.core.environment import Environment
-from ambari_agent import HeartbeatThread
from ambari_agent.InitializerModule import InitializerModule
-from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
-from ambari_agent.CommandStatusReporter import CommandStatusReporter
-from ambari_agent.HostStatusReporter import HostStatusReporter
-from ambari_agent.AlertStatusReporter import AlertStatusReporter
#logging.getLogger('ambari_agent').propagate = False
@@ -360,25 +355,11 @@ MAX_RETRIES = 10
def run_threads(initializer_module):
initializer_module.alert_scheduler_handler.start()
-
- heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
- heartbeat_thread.start()
-
- component_status_executor = ComponentStatusExecutor(initializer_module)
- component_status_executor.start()
-
- command_status_reporter = CommandStatusReporter(initializer_module)
- command_status_reporter.start()
-
- host_status_reporter = HostStatusReporter(initializer_module)
- host_status_reporter.start()
-
- alert_status_reporter = AlertStatusReporter(initializer_module)
- alert_status_reporter.start()
-
- # clean caches for non-existing clusters (ambari-server reset case)
- heartbeat_thread.post_registration_actions += [component_status_executor.clean_not_existing_clusters_info, alert_status_reporter.clean_not_existing_clusters_info, host_status_reporter.clean_cache]
-
+ initializer_module.heartbeat_thread.start()
+ initializer_module.component_status_executor.start()
+ initializer_module.command_status_reporter.start()
+ initializer_module.host_status_reporter.start()
+ initializer_module.alert_status_reporter.start()
initializer_module.action_queue.start()
while not initializer_module.stop_event.is_set():
@@ -386,11 +367,11 @@ def run_threads(initializer_module):
initializer_module.action_queue.interrupt()
- command_status_reporter.join()
- component_status_executor.join()
- host_status_reporter.join()
- alert_status_reporter.join()
- heartbeat_thread.join()
+ initializer_module.command_status_reporter.join()
+ initializer_module.component_status_executor.join()
+ initializer_module.host_status_reporter.join()
+ initializer_module.alert_status_reporter.join()
+ initializer_module.heartbeat_thread.join()
initializer_module.action_queue.join()
# event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
diff --git a/ambari-common/src/main/python/resource_management/core/providers/accounts.py b/ambari-common/src/main/python/resource_management/core/providers/accounts.py
index fa70989..990169d 100644
--- a/ambari-common/src/main/python/resource_management/core/providers/accounts.py
+++ b/ambari-common/src/main/python/resource_management/core/providers/accounts.py
@@ -28,8 +28,11 @@ from resource_management.core import shell
from resource_management.core.providers import Provider
from resource_management.core.logger import Logger
from resource_management.core.utils import lazy_property
+from resource_management.core.exceptions import ExecutionFailed
class UserProvider(Provider):
+ USERADD_USER_ALREADY_EXISTS_EXITCODE = 9
+
options = dict(
comment=(lambda self: self.user.pw_gecos, "-c"),
gid=(lambda self: grp.getgrgid(self.user.pw_gid).gr_name, "-g"),
@@ -42,9 +45,11 @@ class UserProvider(Provider):
def action_create(self):
if not self.user:
+ creating_user = True
command = ['useradd', "-m"]
Logger.info("Adding user %s" % self.resource)
else:
+ creating_user = False
command = ['usermod']
for option_name, attributes in self.options.iteritems():
@@ -81,7 +86,14 @@ class UserProvider(Provider):
command.append(self.resource.username)
- shell.checked_call(command, sudo=True)
+ try:
+ shell.checked_call(command, sudo=True)
+ except ExecutionFailed as ex:
+ # this "user already exists" can happen due to race condition when multiple processes create user at the same time
+ if creating_user and ex.code == UserProvider.USERADD_USER_ALREADY_EXISTS_EXITCODE and self.user:
+ self.action_create() # run modification of the user
+ else:
+ raise
def action_remove(self):
if self.user:
--
To stop receiving notification emails like this one, please contact
aonishuk@apache.org.