You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/04/17 20:18:07 UTC

[13/34] ambari git commit: AMBARI-20682. Wait For DataNodes To Shutdown During a Rolling Upgrade (dlysnichenko)

AMBARI-20682. Wait For DataNodes To Shutdown During a Rolling Upgrade (dlysnichenko)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/273dfcac
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/273dfcac
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/273dfcac

Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 273dfcac0296ebfaebd21484ea887acfe2a02067
Parents: ac75f1d
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Thu Apr 13 17:36:19 2017 +0300
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Thu Apr 13 17:36:19 2017 +0300

----------------------------------------------------------------------
 .../libraries/script/script.py                  | 61 +++++++++++++++---
 .../HIVE/package/scripts/mysql_service.py       |  5 ++
 .../HDFS/2.1.0.2.0/package/scripts/datanode.py  | 45 ++++++++++++-
 .../package/scripts/datanode_upgrade.py         | 38 +----------
 .../HIVE/package/scripts/mysql_service.py       |  5 +-
 .../HIVE/package/scripts/postgresql_service.py  |  5 +-
 .../python/stacks/2.0.6/HDFS/test_datanode.py   | 66 ++++++++++++++++----
 7 files changed, 164 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/273dfcac/ambari-common/src/main/python/resource_management/libraries/script/script.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py
index 5fa9ec4..bad09d2 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py
@@ -28,6 +28,7 @@ import logging
 import platform
 import inspect
 import tarfile
+import time
 from optparse import OptionParser
 import resource_management
 from ambari_commons import OSCheck, OSConst
@@ -308,21 +309,36 @@ class Script(object):
       method = self.choose_method_to_execute(self.command_name)
       with Environment(self.basedir, tmp_dir=Script.tmp_dir) as env:
         env.config.download_path = Script.tmp_dir
-        
-        if self.command_name == "start" and not self.is_hook():
-          self.pre_start()
+
+        if not self.is_hook():
+          self.execute_prefix_function(self.command_name, 'pre', env)
         
         method(env)
 
-        if self.command_name == "start" and not self.is_hook():
-          self.post_start()
+        if not self.is_hook():
+          self.execute_prefix_function(self.command_name, 'post', env)
+
     except Fail as ex:
       ex.pre_raise()
       raise
     finally:
       if self.should_expose_component_version(self.command_name):
         self.save_component_version_to_structured_out()
-        
+
+  def execute_prefix_function(self, command_name, afix, env):
+    """
+    Execute action afix (prefix or suffix) based on command_name and afix type
+    example: command_name=start, afix=pre will result in execution of self.pre_start(env) if exists
+    """
+    self_methods = dir(self)
+    method_name = "{0}_{1}".format(afix, command_name)
+    if not method_name in self_methods:
+      Logger.logger.debug("Action afix '{0}' not present".format(method_name))
+      return
+    Logger.logger.debug("Execute action afix: {0}".format(method_name))
+    method = getattr(self, method_name)
+    method(env)
+
   def is_hook(self):
     from resource_management.libraries.script.hook import Hook
     return (Hook in self.__class__.__bases__)
@@ -335,8 +351,11 @@ class Script(object):
 
   def get_pid_files(self):
     return []
-        
-  def pre_start(self):
+
+  def pre_start(self, env=None):
+    """
+    Executed before any start method. Posts contents of relevant *.out files to command execution log.
+    """
     if self.log_out_files:
       log_folder = self.get_log_folder()
       user = self.get_user()
@@ -366,6 +385,32 @@ class Script(object):
 
     Logger.info("Component has started with pid(s): {0}".format(', '.join(pids)))
 
+  def post_stop(self, env):
+    """
+    Executed after completion of every stop method. Waits until component is actually stopped (check is performed using
+     components status() method.
+    """
+    self_methods = dir(self)
+
+    if not 'status' in self_methods:
+      pass
+    status_method = getattr(self, 'status')
+    component_is_stopped = False
+    counter = 0
+    while not component_is_stopped :
+      try:
+        if counter % 100 == 0:
+          Logger.logger.info("Waiting for actual component stop")
+        status_method(env)
+        time.sleep(0.1)
+        counter += 1
+      except ComponentIsNotRunning, e:
+        Logger.logger.debug("'status' reports ComponentIsNotRunning")
+        component_is_stopped = True
+      except ClientComponentHasNoStatus, e:
+        Logger.logger.debug("Client component has no status")
+        component_is_stopped = True
+
   def choose_method_to_execute(self, command_name):
     """
     Returns a callable object that should be executed for a given command.

http://git-wip-us.apache.org/repos/asf/ambari/blob/273dfcac/ambari-funtest/src/test/resources/stacks/HDP/2.0.7/services/HIVE/package/scripts/mysql_service.py
----------------------------------------------------------------------
diff --git a/ambari-funtest/src/test/resources/stacks/HDP/2.0.7/services/HIVE/package/scripts/mysql_service.py b/ambari-funtest/src/test/resources/stacks/HDP/2.0.7/services/HIVE/package/scripts/mysql_service.py
index 4716343..cf1d30e 100644
--- a/ambari-funtest/src/test/resources/stacks/HDP/2.0.7/services/HIVE/package/scripts/mysql_service.py
+++ b/ambari-funtest/src/test/resources/stacks/HDP/2.0.7/services/HIVE/package/scripts/mysql_service.py
@@ -31,6 +31,11 @@ def mysql_service(daemon_name=None, action='start'):
   elif action == 'status':
     cmd = format('service {daemon_name} status')
     logoutput = False
+    try:
+      Execute(cmd)
+      return
+    except:
+      raise ComponentIsNotRunning()
   else:
     cmd = None
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/273dfcac/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
index 924eea4..cd52885 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
@@ -21,10 +21,13 @@ import datanode_upgrade
 from ambari_commons.constants import UPGRADE_TYPE_ROLLING
 
 from hdfs_datanode import datanode
+from resource_management import Script, Fail, shell, Logger
 from resource_management.libraries.script.script import Script
 from resource_management.libraries.functions import conf_select, stack_select
-from resource_management.libraries.functions.constants import StackFeature
 from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions.decorator import retry
 from resource_management.libraries.functions.security_commons import build_expectations, \
   cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, FILE_TYPE_XML
 from resource_management.core.logger import Logger
@@ -32,6 +35,7 @@ from hdfs import hdfs
 from ambari_commons.os_family_impl import OsFamilyImpl
 from ambari_commons import OSConst
 from utils import get_hdfs_binary
+from utils import get_dfsadmin_base_command
 
 class DataNode(Script):
 
@@ -75,12 +79,51 @@ class DataNode(Script):
         datanode(action="stop")
     else:
       datanode(action="stop")
+    # verify that the datanode is down
+    self.check_datanode_shutdown(hdfs_binary)
 
   def status(self, env):
     import status_params
     env.set_params(status_params)
     datanode(action = "status")
 
+  @retry(times=24, sleep_time=5, err_class=Fail)
+  def check_datanode_shutdown(self, hdfs_binary):
+    """
+    Checks that a DataNode is down by running "hdfs dfsamin getDatanodeInfo"
+    several times, pausing in between runs. Once the DataNode stops responding
+    this method will return, otherwise it will raise a Fail(...) and retry
+    automatically.
+    The stack defaults for retrying for HDFS are also way too slow for this
+    command; they are set to wait about 45 seconds between client retries. As
+    a result, a single execution of dfsadmin will take 45 seconds to retry and
+    the DataNode may be marked as dead, causing problems with HBase.
+    https://issues.apache.org/jira/browse/HDFS-8510 tracks reducing the
+    times for ipc.client.connect.retry.interval. In the meantime, override them
+    here, but only for RU.
+    :param hdfs_binary: name/path of the HDFS binary to use
+    :return:
+    """
+    import params
+
+    # override stock retry timeouts since after 30 seconds, the datanode is
+    # marked as dead and can affect HBase during RU
+    dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
+    command = format('{dfsadmin_base_command} -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo {dfs_dn_ipc_address}')
+
+    is_datanode_deregistered = False
+    try:
+      shell.checked_call(command, user=params.hdfs_user, tries=1)
+    except:
+      is_datanode_deregistered = True
+
+    if not is_datanode_deregistered:
+      Logger.info("DataNode has not yet deregistered from the NameNode...")
+      raise Fail('DataNode has not yet deregistered from the NameNode...')
+
+    Logger.info("DataNode has successfully shutdown.")
+    return True
+
 
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class DataNodeDefault(DataNode):

http://git-wip-us.apache.org/repos/asf/ambari/blob/273dfcac/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
index b55237d..c1b0296 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
@@ -48,10 +48,7 @@ def pre_rolling_upgrade_shutdown(hdfs_binary):
   command = format('{dfsadmin_base_command} -shutdownDatanode {dfs_dn_ipc_address} upgrade')
 
   code, output = shell.call(command, user=params.hdfs_user)
-  if code == 0:
-    # verify that the datanode is down
-    _check_datanode_shutdown(hdfs_binary)
-  else:
+  if code != 0:
     # Due to bug HDFS-7533, DataNode may not always shutdown during stack upgrade, and it is necessary to kill it.
     if output is not None and re.search("Shutdown already in progress", output):
       Logger.error("Due to a known issue in DataNode, the command {0} did not work, so will need to shutdown the datanode forcefully.".format(command))
@@ -84,39 +81,6 @@ def is_datanode_process_running():
   except ComponentIsNotRunning:
     return False
 
-@retry(times=24, sleep_time=5, err_class=Fail)
-def _check_datanode_shutdown(hdfs_binary):
-  """
-  Checks that a DataNode is down by running "hdfs dfsamin getDatanodeInfo"
-  several times, pausing in between runs. Once the DataNode stops responding
-  this method will return, otherwise it will raise a Fail(...) and retry
-  automatically.
-  The stack defaults for retrying for HDFS are also way too slow for this
-  command; they are set to wait about 45 seconds between client retries. As
-  a result, a single execution of dfsadmin will take 45 seconds to retry and
-  the DataNode may be marked as dead, causing problems with HBase.
-  https://issues.apache.org/jira/browse/HDFS-8510 tracks reducing the
-  times for ipc.client.connect.retry.interval. In the meantime, override them
-  here, but only for RU.
-  :param hdfs_binary: name/path of the HDFS binary to use
-  :return:
-  """
-  import params
-
-  # override stock retry timeouts since after 30 seconds, the datanode is
-  # marked as dead and can affect HBase during RU
-  dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
-  command = format('{dfsadmin_base_command} -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo {dfs_dn_ipc_address}')
-
-  try:
-    Execute(command, user=params.hdfs_user, tries=1)
-  except:
-    Logger.info("DataNode has successfully shutdown for upgrade.")
-    return
-
-  Logger.info("DataNode has not shutdown.")
-  raise Fail('DataNode has not shutdown.')
-
 
 @retry(times=30, sleep_time=30, err_class=Fail) # keep trying for 15 mins
 def _check_datanode_startup(hdfs_binary):

http://git-wip-us.apache.org/repos/asf/ambari/blob/273dfcac/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/HIVE/package/scripts/mysql_service.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/HIVE/package/scripts/mysql_service.py b/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/HIVE/package/scripts/mysql_service.py
index 11bbdd8..a4f3bbb 100644
--- a/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/HIVE/package/scripts/mysql_service.py
+++ b/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/HIVE/package/scripts/mysql_service.py
@@ -26,7 +26,10 @@ def mysql_service(daemon_name=None, action='start'):
   cmd = format('service {daemon_name} {action}')
 
   if action == 'status':
-    Execute(status_cmd)
+    try:
+      Execute(status_cmd)
+    except:
+      raise ComponentIsNotRunning()
   elif action == 'stop':
     Execute(cmd,
             logoutput = True,

http://git-wip-us.apache.org/repos/asf/ambari/blob/273dfcac/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/HIVE/package/scripts/postgresql_service.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/HIVE/package/scripts/postgresql_service.py b/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/HIVE/package/scripts/postgresql_service.py
index cc7b4cc..41fe107 100644
--- a/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/HIVE/package/scripts/postgresql_service.py
+++ b/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/HIVE/package/scripts/postgresql_service.py
@@ -26,7 +26,10 @@ def postgresql_service(postgresql_daemon_name=None, action='start'):
   cmd = format('service {postgresql_daemon_name} {action}')
 
   if action == 'status':
-    Execute(status_cmd)
+    try:
+      Execute(status_cmd)
+    except:
+      raise ComponentIsNotRunning()
   elif action == 'stop':
     Execute(cmd,
             logoutput = True,

http://git-wip-us.apache.org/repos/asf/ambari/blob/273dfcac/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
index 1c3c5b7..2cd35ab 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
@@ -22,6 +22,7 @@ import json
 from mock.mock import MagicMock, patch
 from resource_management.libraries.script.script import Script
 from resource_management.core import shell
+import itertools
 from resource_management.core.exceptions import Fail
 import resource_management.libraries.functions.mounted_dirs_helper
 
@@ -76,13 +77,21 @@ class TestDatanode(RMFTestCase):
     )
     self.assertNoMoreResources()
 
+  @patch('time.sleep')
   @patch("os.path.exists", new = MagicMock(return_value=False))
-  def test_stop_default(self):
+  @patch("resource_management.core.shell.checked_call")
+  def test_stop_default(self, checked_call_mock, time_mock):
+    def side_effect(arg):
+      if '-D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo' in arg :
+        raise Fail()
+      return
+    checked_call_mock.side_effect = side_effect
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
                        classname = "DataNode",
                        command = "stop",
                        config_file = "default.json",
                        stack_version = self.STACK_VERSION,
+                       checked_call_mocks = side_effect,
                        target = RMFTestCase.TARGET_COMMON_SERVICES
     )
     self.assertResourceCalled('Execute', "ambari-sudo.sh su hdfs -l -s /bin/bash -c '[RMF_EXPORT_PLACEHOLDER]ulimit -c unlimited ;  /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf stop datanode'",
@@ -221,13 +230,21 @@ class TestDatanode(RMFTestCase):
     )
     self.assertNoMoreResources()
 
+  @patch('time.sleep')
   @patch("os.path.exists", new = MagicMock(return_value=False))
-  def test_stop_secured(self):
+  @patch("resource_management.core.shell.checked_call")
+  def test_stop_secured(self, checked_call_mock, time_mock):
+    def side_effect(arg):
+      if '-D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo' in arg :
+        raise Fail()
+      return
+    checked_call_mock.side_effect = side_effect
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
                        classname = "DataNode",
                        command = "stop",
                        config_file = "secured.json",
                        stack_version = self.STACK_VERSION,
+                       checked_call_mocks = side_effect,
                        target = RMFTestCase.TARGET_COMMON_SERVICES
     )
     self.assertResourceCalled('Execute', 'ambari-sudo.sh [RMF_ENV_PLACEHOLDER] -H -E /usr/lib/hadoop/sbin/hadoop-daemon.sh --config /etc/hadoop/conf stop datanode',
@@ -237,9 +254,15 @@ class TestDatanode(RMFTestCase):
     self.assertResourceCalled('File', '/var/run/hadoop/hdfs/hadoop-hdfs-datanode.pid', action = ['delete'])
     self.assertNoMoreResources()
 
-
+  @patch('time.sleep')
   @patch("os.path.exists", new = MagicMock(return_value=False))
-  def test_stop_secured_HDP22_root(self):
+  @patch("resource_management.core.shell.checked_call")
+  def test_stop_secured_HDP22_root(self, checked_call_mock, time_mock):
+    def side_effect(arg):
+      if '-D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo' in arg :
+        raise Fail()
+      return
+    checked_call_mock.side_effect = side_effect
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/secured.json"
     with open(config_file, "r") as f:
       secured_json = json.load(f)
@@ -251,6 +274,7 @@ class TestDatanode(RMFTestCase):
                        command = "stop",
                        config_dict = secured_json,
                        stack_version = self.STACK_VERSION,
+                       checked_call_mocks = side_effect,
                        target = RMFTestCase.TARGET_COMMON_SERVICES
     )
     self.assertResourceCalled('Execute', 'ambari-sudo.sh [RMF_ENV_PLACEHOLDER] -H -E /usr/hdp/current/hadoop-client/sbin/hadoop-daemon.sh --config /usr/hdp/current/hadoop-client/conf stop datanode',
@@ -260,8 +284,15 @@ class TestDatanode(RMFTestCase):
     self.assertResourceCalled('File', '/var/run/hadoop/hdfs/hadoop-hdfs-datanode.pid', action = ['delete'])
     self.assertNoMoreResources()
 
+  @patch('time.sleep')
   @patch("os.path.exists", new = MagicMock(return_value=False))
-  def test_stop_secured_HDP22_non_root_https_only(self):
+  @patch("resource_management.core.shell.checked_call")
+  def test_stop_secured_HDP22_non_root_https_only(self, checked_call_mock, time_mock):
+    def side_effect(arg):
+      if '-D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo' in arg :
+        raise Fail()
+      return
+    checked_call_mock.side_effect = side_effect
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/secured.json"
     with open(config_file, "r") as f:
       secured_json = json.load(f)
@@ -276,6 +307,7 @@ class TestDatanode(RMFTestCase):
                        command = "stop",
                        config_dict = secured_json,
                        stack_version = self.STACK_VERSION,
+                       checked_call_mocks = side_effect,
                        target = RMFTestCase.TARGET_COMMON_SERVICES
     )
     self.assertResourceCalled('Execute', "ambari-sudo.sh su hdfs -l -s /bin/bash -c '[RMF_EXPORT_PLACEHOLDER]ulimit -c unlimited ;  /usr/hdp/current/hadoop-client/sbin/hadoop-daemon.sh --config /usr/hdp/current/hadoop-client/conf stop datanode'",
@@ -564,7 +596,7 @@ class TestDatanode(RMFTestCase):
 
   @patch("resource_management.core.shell.call")
   @patch('time.sleep')
-  def test_stop_during_upgrade(self, time_mock, call_mock):
+  def test_stop_during_upgrade_not_shutdown(self, time_mock, call_mock):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     call_mock_side_effects = [(0, ""), ]
     call_mock.side_effects = call_mock_side_effects
@@ -573,7 +605,7 @@ class TestDatanode(RMFTestCase):
 
     version = '2.2.1.0-3242'
     json_content['commandParams']['version'] = version
-
+    mocks_dict={}
     try:
       self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
         classname = "DataNode",
@@ -582,19 +614,23 @@ class TestDatanode(RMFTestCase):
         stack_version = self.STACK_VERSION,
         target = RMFTestCase.TARGET_COMMON_SERVICES,
         call_mocks = call_mock_side_effects,
+        checked_call_mocks=itertools.cycle([(0, "OK.")]),
+        mocks_dict = mocks_dict,
         command_args=["rolling"])
 
       raise Fail("Expected a fail since datanode didn't report a shutdown")
     except Exception, err:
-      expected_message = "DataNode has not shutdown."
+      expected_message = "DataNode has not yet deregistered from the NameNode..."
       if str(err.message) != expected_message:
         self.fail("Expected this exception to be thrown. " + expected_message + ". Got this instead, " + str(err.message))
 
-    self.assertResourceCalled("Execute", "hdfs dfsadmin -fs hdfs://c6401.ambari.apache.org:8020 -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo 0.0.0.0:8010", tries=1, user="hdfs")
+    self.assertEquals(
+      ('hdfs dfsadmin -fs hdfs://c6401.ambari.apache.org:8020 -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo 0.0.0.0:8010'),
+      mocks_dict['checked_call'].call_args_list[0][0][0])
 
   @patch("resource_management.core.shell.call")
   @patch('time.sleep')
-  def test_stop_during_upgrade(self, time_mock, call_mock):
+  def test_stop_during_upgrade_not_shutdown_ha(self, time_mock, call_mock):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/ha_default.json"
     call_mock_side_effects = [(0, ""), ]
     call_mock.side_effects = call_mock_side_effects
@@ -603,7 +639,7 @@ class TestDatanode(RMFTestCase):
 
     version = '2.2.1.0-3242'
     json_content['commandParams']['version'] = version
-
+    mocks_dict={}
     try:
       self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
                          classname = "DataNode",
@@ -612,15 +648,19 @@ class TestDatanode(RMFTestCase):
                          stack_version = self.STACK_VERSION,
                          target = RMFTestCase.TARGET_COMMON_SERVICES,
                          call_mocks = call_mock_side_effects,
+                         checked_call_mocks=itertools.cycle([(0, "OK.")]),
+                         mocks_dict = mocks_dict,
                          command_args=["rolling"])
 
       raise Fail("Expected a fail since datanode didn't report a shutdown")
     except Exception, err:
-      expected_message = "DataNode has not shutdown."
+      expected_message = "DataNode has not yet deregistered from the NameNode..."
       if str(err.message) != expected_message:
         self.fail("Expected this exception to be thrown. " + expected_message + ". Got this instead, " + str(err.message))
 
-    self.assertResourceCalled("Execute", "hdfs dfsadmin -fs hdfs://ns1 -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo 0.0.0.0:8010", tries=1, user="hdfs")
+    self.assertEquals(
+      ('hdfs dfsadmin -fs hdfs://ns1 -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo 0.0.0.0:8010'),
+      mocks_dict['checked_call'].call_args_list[0][0][0])
 
   @patch("resource_management.libraries.functions.security_commons.build_expectations")
   @patch("resource_management.libraries.functions.security_commons.get_params_from_filesystem")