You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2015/05/13 04:55:43 UTC

ambari git commit: AMBARI-11089. Auto recovery commands should not be scheduled till the execution commands are complete

Repository: ambari
Updated Branches:
  refs/heads/trunk 54e22bc94 -> e7fdb7146


AMBARI-11089. Auto recovery commands should not be scheduled till the execution commands are complete


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e7fdb714
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e7fdb714
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e7fdb714

Branch: refs/heads/trunk
Commit: e7fdb7146bc278aafd3d9530b7c9f824e4716cb2
Parents: 54e22bc
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Tue May 12 19:55:20 2015 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Tue May 12 19:55:20 2015 -0700

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py | 30 +++++-
 .../python/ambari_agent/CommandStatusDict.py    |  5 +
 .../src/main/python/ambari_agent/Controller.py  |  2 +-
 .../main/python/ambari_agent/RecoveryManager.py | 14 +++
 .../test/python/ambari_agent/TestActionQueue.py | 99 +++++++++++++++++++-
 .../python/ambari_agent/TestRecoveryManager.py  | 14 ++-
 6 files changed, 160 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index bdaefd0..59a5720 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -193,7 +193,13 @@ class ActionQueue(threading.Thread):
     logger.debug("Took an element of Queue (command type = %s)." % commandType)
     try:
       if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]:
-        self.execute_command(command)
+        try:
+          if self.controller.recovery_manager.enabled():
+            self.controller.recovery_manager.start_execution_command()
+          self.execute_command(command)
+        finally:
+          if self.controller.recovery_manager.enabled():
+            self.controller.recovery_manager.stop_execution_command()
       elif commandType == self.STATUS_COMMAND:
         self.execute_status_command(command)
       else:
@@ -203,6 +209,15 @@ class ActionQueue(threading.Thread):
       traceback.print_exc()
       logger.warn(err)
 
+  def tasks_in_progress_or_pending(self):
+    return_val = False
+    if not self.commandQueue.empty():
+      return_val = True
+    if self.controller.recovery_manager.has_active_command():
+      return_val = True
+    return return_val
+    pass
+
   def execute_command(self, command):
     '''
     Executes commands of type EXECUTION_COMMAND
@@ -248,6 +263,8 @@ class ActionQueue(threading.Thread):
         maxAttempts = int(command['commandParams']['command_retry_max_attempt_count'])
       if 'command_retry_enabled' in command['commandParams']:
         retryAble = command['commandParams']['command_retry_enabled'] == "true"
+    if isAutoExecuteCommand:
+      retryAble = False
 
     logger.debug("Command execution metadata - retry enabled = {retryAble}, max attempt count = {maxAttemptCount}".
                  format(retryAble = retryAble, maxAttemptCount = maxAttempts))
@@ -296,6 +313,17 @@ class ActionQueue(threading.Thread):
 
     # let ambari know that configuration tags were applied
     if status == self.COMPLETED_STATUS:
+      if self.controller.recovery_manager.enabled() and command.has_key('roleCommand'):
+        if command['roleCommand'] == self.ROLE_COMMAND_START:
+          self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
+        elif command['roleCommand'] == self.ROLE_COMMAND_STOP or command['roleCommand'] == self.ROLE_COMMAND_INSTALL:
+          self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.DEAD_STATUS)
+        elif command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND:
+          if command['hostLevelParams'].has_key('custom_command') and \
+                  command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART:
+            self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
+      pass
+
       configHandler = ActualConfigHandler(self.config, self.configTags)
       #update
       if command.has_key('forceRefreshConfigTags') and len(command['forceRefreshConfigTags']) > 0  :

http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index 0ebc45e..861b568 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -78,6 +78,7 @@ class CommandStatusDict():
     with self.lock:
       c = copy.copy(self.current_state[taskId][1])
     return c
+
   def generate_report(self):
     """
     Generates status reports about commands that are IN_PROGRESS, COMPLETE or
@@ -103,6 +104,10 @@ class CommandStatusDict():
           resultComponentStatus.append(report)
           # Component status is useful once, removing it
           del self.current_state[key]
+        elif command ['commandType'] in [ActionQueue.AUTO_EXECUTION_COMMAND]:
+          logger.debug("AUTO_EXECUTION_COMMAND task deleted " + str(command['commandId']))
+          del self.current_state[key]
+          pass
       result = {
         'reports': resultReports,
         'componentStatus': resultComponentStatus

http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 9ebb83a..ccd1233 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -283,7 +283,7 @@ class Controller(threading.Thread):
           self.recovery_manager.process_status_commands(response['statusCommands'])
           self.addToStatusQueue(response['statusCommands'])
 
-        if self.actionQueue.commandQueue.empty():
+        if not self.actionQueue.tasks_in_progress_or_pending():
           recovery_commands = self.recovery_manager.get_recovery_commands()
           for recovery_command in recovery_commands:
             logger.info("Adding recovery command %s for component %s",

http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index e2c5e98..12ba75d 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -88,12 +88,26 @@ class RecoveryManager:
     self.statuses = {}
     self.__status_lock = threading.RLock()
     self.__command_lock = threading.RLock()
+    self.__active_command_lock = threading.RLock()
+    self.active_command_count = 0
     self.paused = False
 
     self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only)
 
     pass
 
+  def start_execution_command(self):
+    with self.__active_command_lock:
+      self.active_command_count += 1
+    pass
+
+  def stop_execution_command(self):
+    with self.__active_command_lock:
+      self.active_command_count -= 1
+    pass
+
+  def has_active_command(self):
+    return self.active_command_count > 0
 
   def set_paused(self, paused):
     if self.paused != paused:

http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index ee18c38..7a834d8 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -67,6 +67,19 @@ class TestActionQueue(TestCase):
     'configurationTags':{'global' : { 'tag': 'v1' }}
   }
 
+  datanode_auto_start_command = {
+    'commandType': 'AUTO_EXECUTION_COMMAND',
+    'role': u'DATANODE',
+    'roleCommand': u'START',
+    'commandId': '1-1',
+    'taskId': 3,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    'hostLevelParams': {},
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v1' }}
+  }
+
   datanode_upgrade_command = {
       'commandId': 17,
       'role' : "role",
@@ -311,6 +324,85 @@ class TestActionQueue(TestCase):
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch("__builtin__.open")
   @patch.object(ActionQueue, "status_update_callback")
+  def test_auto_execute_command(self, status_update_callback_mock, open_mock):
+    # Make file read calls visible
+    def open_side_effect(file, mode):
+      if mode == 'r':
+        file_mock = MagicMock()
+        file_mock.read.return_value = "Read from " + str(file)
+        return file_mock
+      else:
+        return self.original_open(file, mode)
+    open_mock.side_effect = open_side_effect
+
+    config = AmbariConfig()
+    tempdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tempdir)
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    dummy_controller.recovery_manager = RecoveryManager()
+    dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False)
+
+    actionQueue = ActionQueue(config, dummy_controller)
+    unfreeze_flag = threading.Event()
+    python_execution_result_dict = {
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut' : ''
+    }
+
+    def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+      unfreeze_flag.wait()
+      return python_execution_result_dict
+    def patched_aq_execute_command(command):
+      # We have to perform patching for separate thread in the same thread
+      with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+        runCommand_mock.side_effect = side_effect
+        actionQueue.process_command(command)
+
+    python_execution_result_dict['status'] = 'COMPLETE'
+    python_execution_result_dict['exitcode'] = 0
+    self.assertFalse(actionQueue.tasks_in_progress_or_pending())
+    # We call method in a separate thread
+    execution_thread = Thread(target = patched_aq_execute_command ,
+                              args = (self.datanode_auto_start_command, ))
+    execution_thread.start()
+    #  check in progress report
+    # wait until ready
+    while True:
+      time.sleep(0.1)
+      if actionQueue.tasks_in_progress_or_pending():
+        break
+    # Continue command execution
+    unfreeze_flag.set()
+    # wait until ready
+    while actionQueue.tasks_in_progress_or_pending():
+      time.sleep(0.1)
+      report = actionQueue.result()
+
+    self.assertEqual(len(report['reports']), 0)
+
+    ## Test failed execution
+    python_execution_result_dict['status'] = 'FAILED'
+    python_execution_result_dict['exitcode'] = 13
+    # We call method in a separate thread
+    execution_thread = Thread(target = patched_aq_execute_command ,
+                              args = (self.datanode_auto_start_command, ))
+    execution_thread.start()
+    unfreeze_flag.set()
+    #  check in progress report
+    # wait until ready
+    report = actionQueue.result()
+    while actionQueue.tasks_in_progress_or_pending():
+      time.sleep(0.1)
+      report = actionQueue.result()
+
+    self.assertEqual(len(report['reports']), 0)
+
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch("__builtin__.open")
+  @patch.object(ActionQueue, "status_update_callback")
   def test_execute_command(self, status_update_callback_mock, open_mock):
     # Make file read calls visible
     def open_side_effect(file, mode):
@@ -371,7 +463,9 @@ class TestActionQueue(TestCase):
                 'taskId': 3,
                 'exitCode': 777}
     self.assertEqual(report['reports'][0], expected)
-    # Continue command execution
+    self.assertTrue(actionQueue.tasks_in_progress_or_pending())
+
+  # Continue command execution
     unfreeze_flag.set()
     # wait until ready
     while report['reports'][0]['status'] == 'IN_PROGRESS':
@@ -631,6 +725,7 @@ class TestActionQueue(TestCase):
                                 get_mock, process_command_mock, gpeo_mock):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
+    dummy_controller.recovery_manager = RecoveryManager()
     config = MagicMock()
     gpeo_mock.return_value = 0
     config.get_parallel_exec_option = gpeo_mock
@@ -638,8 +733,10 @@ class TestActionQueue(TestCase):
     actionQueue.start()
     actionQueue.put([self.datanode_install_command, self.hbase_install_command])
     self.assertEqual(2, actionQueue.commandQueue.qsize())
+    self.assertTrue(actionQueue.tasks_in_progress_or_pending())
     actionQueue.reset()
     self.assertTrue(actionQueue.commandQueue.empty())
+    self.assertFalse(actionQueue.tasks_in_progress_or_pending())
     time.sleep(0.1)
     actionQueue.stop()
     actionQueue.join()

http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
index bd7c96b..aaf6e53 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -507,4 +507,16 @@ class TestRecoveryManager(TestCase):
     commands = rm.get_recovery_commands()
     self.assertEqual(1, len(commands))
     self.assertEqual("START", commands[0]["roleCommand"])
-    pass
\ No newline at end of file
+    pass
+
+  def test_command_count(self):
+    rm = RecoveryManager(True)
+    self.assertFalse(rm.has_active_command())
+    rm.start_execution_command()
+    self.assertTrue(rm.has_active_command())
+    rm.start_execution_command()
+    self.assertTrue(rm.has_active_command())
+    rm.stop_execution_command()
+    self.assertTrue(rm.has_active_command())
+    rm.stop_execution_command()
+    self.assertFalse(rm.has_active_command())
\ No newline at end of file