You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/05/18 11:24:13 UTC

[ambari] branch trunk updated: AMBARI-23894. ZooKeepers Show As Down After EU to HDP 3.0 But They Are Not (aonishuk)

This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2c57b45  AMBARI-23894. ZooKeepers Show As Down After EU to HDP 3.0 But They Are Not (aonishuk)
2c57b45 is described below

commit 2c57b457caee50e67c747dfd4ae6d5a2d83e9dac
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Fri May 18 13:31:02 2018 +0300

    AMBARI-23894. ZooKeepers Show As Down After EU to HDP 3.0 But They Are Not (aonishuk)
---
 .../src/main/python/ambari_agent/ActionQueue.py    |  9 +++
 .../python/ambari_agent/ComponentStatusExecutor.py | 77 +++++++++++++---------
 .../main/python/ambari_agent/HeartbeatThread.py    |  4 +-
 .../main/python/ambari_agent/InitializerModule.py  | 19 ++++++
 ambari-agent/src/main/python/ambari_agent/main.py  | 39 +++--------
 .../resource_management/core/providers/accounts.py | 14 +++-
 6 files changed, 100 insertions(+), 62 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 4ac5d67..65239ed 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -83,6 +83,7 @@ class ActionQueue(threading.Thread):
     self.tmpdir = self.config.get('agent', 'prefix')
     self.customServiceOrchestrator = initializer_module.customServiceOrchestrator
     self.parallel_execution = self.config.get_parallel_exec_option()
+    self.component_status_executor = initializer_module.component_status_executor
     if self.parallel_execution == 1:
       logger.info("Parallel execution is enabled, will execute agent commands in parallel")
     self.lock = threading.Lock()
@@ -421,6 +422,14 @@ class ActionQueue(threading.Thread):
     self.recovery_manager.process_execution_command_result(command, status)
     self.commandStatuses.put_command_status(command, roleResult)
 
+    cluster_id = str(command['clusterId'])
+
+    if cluster_id != '-1' and cluster_id != 'null':
+      service_name = command['serviceName']
+      if service_name != 'null':
+        component_name = command['role']
+        self.component_status_executor.check_component_status(clusterId, service_name, component_name, "STATUS", report=True)
+
   def log_command_output(self, text, taskId):
     """
     Logs a message as multiple enumerated log messages every of which is not larger than MAX_SYMBOLS_PER_LOG_MESSAGE.
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index e2c73bd..c9f86da 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -99,39 +99,10 @@ class ComponentStatusExecutor(threading.Thread):
                 logger.info("Skipping status command for {0}. Since command for it is running".format(component_name))
                 continue
 
-              command_dict = {
-                'serviceName': service_name,
-                'role': component_name,
-                'clusterId': cluster_id,
-                'commandType': 'STATUS_COMMAND',
-              }
+              result = self.check_component_status(cluster_id, service_name, component_name, command_name)
 
-              component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict)
-              status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS
-
-              # if exec command for component started to run after status command completion
-              if self.customServiceOrchestrator.commandsRunningForComponent(cluster_id, component_name):
-                logger.info("Skipped status command result for {0}. Since command for it is running".format(component_name))
-                continue
-
-              # log if status command failed
-              if status == LiveStatus.DEAD_STATUS:
-                stderr = component_status_result['stderr']
-                if not "ComponentIsNotRunning" in stderr and not "ClientComponentHasNoStatus" in stderr:
-                  logger.info("Status command for {0} failed:\n{1}".format(component_name, stderr))
-
-              result = {
-                'serviceName': service_name,
-                'componentName': component_name,
-                'command': command_name,
-                'status': status,
-                'clusterId': cluster_id,
-              }
-
-              if status != self.reported_component_status[cluster_id][component_name][command_name]:
-                logging.info("Status for {0} has changed to {1}".format(component_name, status))
+              if result:
                 cluster_reports[cluster_id].append(result)
-                self.recovery_manager.handle_status_change(component_name, status)
 
         self.send_updates_to_server(cluster_reports)
       except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
@@ -142,6 +113,50 @@ class ComponentStatusExecutor(threading.Thread):
       self.stop_event.wait(self.status_commands_run_interval)
     logger.info("ComponentStatusExecutor has successfully finished")
 
+  def check_component_status(self, cluster_id, service_name, component_name, command_name, report=False):
+    """
+    Returns components status if it has changed, otherwise None.
+    """
+
+    # if not a component
+    if self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name) is None:
+      return None
+
+    command_dict = {
+      'serviceName': service_name,
+      'role': component_name,
+      'clusterId': cluster_id,
+      'commandType': 'STATUS_COMMAND',
+    }
+
+    component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict)
+    status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS
+
+    # log if status command failed
+    if status == LiveStatus.DEAD_STATUS:
+      stderr = component_status_result['stderr']
+      if not "ComponentIsNotRunning" in stderr and not "ClientComponentHasNoStatus" in stderr:
+        logger.info("Status command for {0} failed:\n{1}".format(component_name, stderr))
+
+    result = {
+      'serviceName': service_name,
+      'componentName': component_name,
+      'command': command_name,
+      'status': status,
+      'clusterId': cluster_id,
+    }
+
+    if status != self.reported_component_status[cluster_id][component_name][command_name]:
+      logging.info("Status for {0} has changed to {1}".format(component_name, status))
+      self.recovery_manager.handle_status_change(component_name, status)
+
+      if report:
+        self.send_updates_to_server({cluster_id: [result]})
+
+      return result
+
+    return None
+
   def send_updates_to_server(self, cluster_reports):
     if not cluster_reports or not self.initializer_module.is_registered:
       return
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 01bf3c5..179cf6b 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -78,7 +78,9 @@ class HeartbeatThread(threading.Thread):
     self.responseId = 0
     self.file_cache = initializer_module.file_cache
     self.stale_alerts_monitor = initializer_module.stale_alerts_monitor
-    self.post_registration_actions = [self.file_cache.reset]
+    self.post_registration_actions = [self.file_cache.reset, initializer_module.component_status_executor.clean_not_existing_clusters_info,
+                                      initializer_module.alert_status_reporter.clean_not_existing_clusters_info, initializer_module.host_status_reporter.clean_cache]
+
 
 
   def run(self):
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 61c1e65..052e8c1 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -38,6 +38,12 @@ from ambari_agent.StaleAlertsMonitor import StaleAlertsMonitor
 from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
 from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
 
+from ambari_agent import HeartbeatThread
+from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
+from ambari_agent.CommandStatusReporter import CommandStatusReporter
+from ambari_agent.HostStatusReporter import HostStatusReporter
+from ambari_agent.AlertStatusReporter import AlertStatusReporter
+
 logger = logging.getLogger(__name__)
 
 class InitializerModule:
@@ -74,8 +80,21 @@ class InitializerModule:
 
     self.recovery_manager = RecoveryManager(self.config.recovery_cache_dir)
     self.commandStatuses = CommandStatusDict(self)
+
+    self.init_threads()
+
+
+  def init_threads(self):
+    """
+    Initialize thread objects
+    """
+    self.component_status_executor = ComponentStatusExecutor(self)
     self.action_queue = ActionQueue(self)
     self.alert_scheduler_handler = AlertSchedulerHandler(self)
+    self.command_status_reporter = CommandStatusReporter(self)
+    self.host_status_reporter = HostStatusReporter(self)
+    self.alert_status_reporter = AlertStatusReporter(self)
+    self.heartbeat_thread = HeartbeatThread.HeartbeatThread(self)
 
   @property
   def connection(self):
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 693b04e..b7b9042 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -106,12 +106,7 @@ from resource_management.core.logger import Logger
 #from resource_management.core.resources.system import File
 #from resource_management.core.environment import Environment
 
-from ambari_agent import HeartbeatThread
 from ambari_agent.InitializerModule import InitializerModule
-from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
-from ambari_agent.CommandStatusReporter import CommandStatusReporter
-from ambari_agent.HostStatusReporter import HostStatusReporter
-from ambari_agent.AlertStatusReporter import AlertStatusReporter
 
 #logging.getLogger('ambari_agent').propagate = False
 
@@ -360,25 +355,11 @@ MAX_RETRIES = 10
 
 def run_threads(initializer_module):
   initializer_module.alert_scheduler_handler.start()
-
-  heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
-  heartbeat_thread.start()
-
-  component_status_executor = ComponentStatusExecutor(initializer_module)
-  component_status_executor.start()
-
-  command_status_reporter = CommandStatusReporter(initializer_module)
-  command_status_reporter.start()
-
-  host_status_reporter = HostStatusReporter(initializer_module)
-  host_status_reporter.start()
-
-  alert_status_reporter = AlertStatusReporter(initializer_module)
-  alert_status_reporter.start()
-
-  # clean caches for non-existing clusters (ambari-server reset case)
-  heartbeat_thread.post_registration_actions += [component_status_executor.clean_not_existing_clusters_info, alert_status_reporter.clean_not_existing_clusters_info, host_status_reporter.clean_cache]
-
+  initializer_module.heartbeat_thread.start()
+  initializer_module.component_status_executor.start()
+  initializer_module.command_status_reporter.start()
+  initializer_module.host_status_reporter.start()
+  initializer_module.alert_status_reporter.start()
   initializer_module.action_queue.start()
 
   while not initializer_module.stop_event.is_set():
@@ -386,11 +367,11 @@ def run_threads(initializer_module):
 
   initializer_module.action_queue.interrupt()
 
-  command_status_reporter.join()
-  component_status_executor.join()
-  host_status_reporter.join()
-  alert_status_reporter.join()
-  heartbeat_thread.join()
+  initializer_module.command_status_reporter.join()
+  initializer_module.component_status_executor.join()
+  initializer_module.host_status_reporter.join()
+  initializer_module.alert_status_reporter.join()
+  initializer_module.heartbeat_thread.join()
   initializer_module.action_queue.join()
 
 # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
diff --git a/ambari-common/src/main/python/resource_management/core/providers/accounts.py b/ambari-common/src/main/python/resource_management/core/providers/accounts.py
index fa70989..990169d 100644
--- a/ambari-common/src/main/python/resource_management/core/providers/accounts.py
+++ b/ambari-common/src/main/python/resource_management/core/providers/accounts.py
@@ -28,8 +28,11 @@ from resource_management.core import shell
 from resource_management.core.providers import Provider
 from resource_management.core.logger import Logger
 from resource_management.core.utils import lazy_property
+from resource_management.core.exceptions import ExecutionFailed
 
 class UserProvider(Provider):
+  USERADD_USER_ALREADY_EXISTS_EXITCODE = 9
+
   options = dict(
     comment=(lambda self: self.user.pw_gecos, "-c"),
     gid=(lambda self: grp.getgrgid(self.user.pw_gid).gr_name, "-g"),
@@ -42,9 +45,11 @@ class UserProvider(Provider):
     
   def action_create(self):
     if not self.user:
+      creating_user = True
       command = ['useradd', "-m"]
       Logger.info("Adding user %s" % self.resource)
     else:
+      creating_user = False
       command = ['usermod']
       
       for option_name, attributes in self.options.iteritems():
@@ -81,7 +86,14 @@ class UserProvider(Provider):
 
     command.append(self.resource.username)
     
-    shell.checked_call(command, sudo=True)
+    try:
+      shell.checked_call(command, sudo=True)
+    except ExecutionFailed as ex:
+      # this "user already exists" can happen due to race condition when multiple processes create user at the same time
+      if creating_user and ex.code == UserProvider.USERADD_USER_ALREADY_EXISTS_EXITCODE and self.user:
+        self.action_create() # run modification of the user
+      else:
+        raise
 
   def action_remove(self):
     if self.user:

-- 
To stop receiving notification emails like this one, please contact
aonishuk@apache.org.