You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2014/12/25 15:18:39 UTC

ambari git commit: AMBARI-8916. Show output of Execute commands concurrently (for timeouts, long ops) (aonishuk)

Repository: ambari
Updated Branches:
  refs/heads/trunk ea1c9f1ac -> 79cffa16d


AMBARI-8916. Show output of Execute commands concurrently (for timeouts, long ops) (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/79cffa16
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/79cffa16
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/79cffa16

Branch: refs/heads/trunk
Commit: 79cffa16dcf08a5baddbc861cf0212449deb5956
Parents: ea1c9f1
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Thu Dec 25 16:18:28 2014 +0200
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Thu Dec 25 16:18:28 2014 +0200

----------------------------------------------------------------------
 .../TestExecuteHadoopResource.py                |   6 +-
 .../resource_management/TestExecuteResource.py  |  10 +-
 .../resource_management/TestGroupResource.py    |   5 +
 .../resource_management/TestUserResource.py     |  11 ++
 .../python/resource_management/core/logger.py   |   4 +
 .../core/providers/system.py                    |   4 +-
 .../core/resources/system.py                    |  17 ++-
 .../python/resource_management/core/shell.py    | 107 ++++++++++++-------
 .../libraries/resources/execute_hadoop.py       |   2 +-
 .../package/scripts/datanode_upgrade.py         |   4 +-
 .../HDFS/2.1.0.2.0/package/scripts/namenode.py  |  25 ++---
 .../HDFS/2.1.0.2.0/package/scripts/utils.py     |   2 +-
 .../package/scripts/hive_server_upgrade.py      |   4 +-
 .../0.8.1.2.2/package/scripts/service_check.py  |  81 +++++++-------
 .../YARN/package/scripts/nodemanager_upgrade.py |   4 +-
 .../KERBEROS/package/scripts/kerberos_common.py |   4 +-
 .../python/stacks/2.0.6/HDFS/test_datanode.py   |  22 ++--
 .../python/stacks/2.0.6/HDFS/test_namenode.py   |  27 ++---
 .../stacks/2.0.6/HIVE/test_hive_server.py       |  28 +----
 .../stacks/2.0.6/YARN/test_nodemanager.py       |  22 ++--
 .../src/test/python/stacks/utils/RMFTestCase.py |  17 ++-
 21 files changed, 222 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py b/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py
index e4fe8df..ae137a5 100644
--- a/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py
@@ -65,7 +65,7 @@ class TestExecuteHadoopResource(TestCase):
       self.assertEqual(execute_mock.call_count, 1)
       self.assertEqual(execute_mock.call_args[0][0].command,'hadoop --config conf_dir command')
       self.assertEqual(execute_mock.call_args[0][0].arguments,
-                       {'logoutput': False,
+                       {'logoutput': None,
                         'tries': 1,
                         'user': 'user',
                         'try_sleep': 0,
@@ -122,14 +122,14 @@ class TestExecuteHadoopResource(TestCase):
       self.assertEqual(execute_mock.call_args_list[1][0][0].command,
                        'hadoop --config conf_dir command2')
       self.assertEqual(execute_mock.call_args_list[0][0][0].arguments,
-                       {'logoutput': False,
+                       {'logoutput': None,
                         'tries': 1,
                         'user': 'user',
                         'environment': {},
                         'try_sleep': 0,
                         'path': []})
       self.assertEqual(execute_mock.call_args_list[1][0][0].arguments,
-                       {'logoutput': False,
+                       {'logoutput': None,
                         'tries': 1,
                         'user': 'user',
                         'try_sleep': 0,

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestExecuteResource.py b/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
index 87a637c..a0b375b 100644
--- a/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
@@ -38,6 +38,7 @@ class TestExecuteResource(TestCase):
   def test_attribute_logoutput(self, popen_mock, info_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     subproc_mock.communicate.side_effect = [["1"], ["2"]]
     popen_mock.return_value = subproc_mock
 
@@ -69,6 +70,7 @@ class TestExecuteResource(TestCase):
 
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     subproc_mock.communicate.side_effect = [["1"]]
     popen_mock.return_value = subproc_mock
 
@@ -83,6 +85,7 @@ class TestExecuteResource(TestCase):
   def test_attribute_path(self, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     subproc_mock.communicate.side_effect = [["1"]]
     popen_mock.return_value = subproc_mock
 
@@ -100,7 +103,7 @@ class TestExecuteResource(TestCase):
 
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.communicate.side_effect = [Fail("Fail"), ["1"]]
+    subproc_mock.stdout.readline = MagicMock(side_effect = [Fail("Fail"), "OK"])
     popen_mock.return_value = subproc_mock
 
     with Environment("/") as env:
@@ -150,6 +153,7 @@ class TestExecuteResource(TestCase):
 
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     subproc_mock.communicate.side_effect = [["1"]]
     popen_mock.return_value = subproc_mock
 
@@ -167,6 +171,7 @@ class TestExecuteResource(TestCase):
 
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     subproc_mock.communicate.side_effect = [["1"]]
     popen_mock.return_value = subproc_mock
 
@@ -188,6 +193,7 @@ class TestExecuteResource(TestCase):
 
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     subproc_mock.communicate.side_effect = [["1"]]
     popen_mock.return_value = subproc_mock
 
@@ -209,6 +215,7 @@ class TestExecuteResource(TestCase):
 
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
 
     with Environment("/") as env:
@@ -238,6 +245,7 @@ class TestExecuteResource(TestCase):
 
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
 
     with Environment("/") as env:

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestGroupResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestGroupResource.py b/ambari-agent/src/test/python/resource_management/TestGroupResource.py
index 29decbb..7ef487a 100644
--- a/ambari-agent/src/test/python/resource_management/TestGroupResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestGroupResource.py
@@ -36,6 +36,7 @@ class TestGroupResource(TestCase):
   def test_action_create_nonexistent(self, popen_mock, getgrnam_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getgrnam_mock.side_effect = KeyError()
 
@@ -56,6 +57,7 @@ class TestGroupResource(TestCase):
   def test_action_create_existent(self, popen_mock, getgrnam_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getgrnam_mock.return_value = "mapred"
 
@@ -77,6 +79,7 @@ class TestGroupResource(TestCase):
   def test_action_create_fail(self, popen_mock, getgrnam_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 1
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getgrnam_mock.return_value = "mapred"
 
@@ -102,6 +105,7 @@ class TestGroupResource(TestCase):
 
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getgrnam_mock.return_value = "mapred"
 
@@ -122,6 +126,7 @@ class TestGroupResource(TestCase):
 
     subproc_mock = MagicMock()
     subproc_mock.returncode = 1
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getgrnam_mock.return_value = "mapred"
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestUserResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestUserResource.py b/ambari-agent/src/test/python/resource_management/TestUserResource.py
index 458053b..f66b738 100644
--- a/ambari-agent/src/test/python/resource_management/TestUserResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestUserResource.py
@@ -35,6 +35,7 @@ class TestUserResource(TestCase):
   def test_action_create_nonexistent(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = None
     with Environment('/') as env:
@@ -48,6 +49,7 @@ class TestUserResource(TestCase):
   def test_action_create_existent(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = 1
 
@@ -62,6 +64,7 @@ class TestUserResource(TestCase):
   def test_action_delete(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = 1
 
@@ -76,6 +79,7 @@ class TestUserResource(TestCase):
   def test_attribute_comment(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = 1
 
@@ -91,6 +95,7 @@ class TestUserResource(TestCase):
   def test_attribute_home(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = 1
 
@@ -106,6 +111,7 @@ class TestUserResource(TestCase):
   def test_attribute_password(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = 1
 
@@ -121,6 +127,7 @@ class TestUserResource(TestCase):
   def test_attribute_shell(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = 1
 
@@ -135,6 +142,7 @@ class TestUserResource(TestCase):
   def test_attribute_uid(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = 1
 
@@ -149,6 +157,7 @@ class TestUserResource(TestCase):
   def test_attribute_gid(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = 1
 
@@ -163,6 +172,7 @@ class TestUserResource(TestCase):
   def test_attribute_groups(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = 1
 
@@ -178,6 +188,7 @@ class TestUserResource(TestCase):
   def test_missing_shell_argument(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
+    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = None
     with Environment('/') as env:

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/logger.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/logger.py b/ambari-common/src/main/python/resource_management/core/logger.py
index 6ff40d6..69a55fb 100644
--- a/ambari-common/src/main/python/resource_management/core/logger.py
+++ b/ambari-common/src/main/python/resource_management/core/logger.py
@@ -105,8 +105,12 @@ class Logger:
           val = oct(y)
         except:
           val = repr(y)
+      # for functions show only function name
+      elif hasattr(y, '__call__') and hasattr(y, '__name__'):
+        val = y.__name__
       else:
         val = repr(y)
+        
 
 
       arguments_str += "'{0}': {1}, ".format(x, val)

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/providers/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/system.py b/ambari-common/src/main/python/resource_management/core/providers/system.py
index b3949df..fc06a78 100644
--- a/ambari-common/src/main/python/resource_management/core/providers/system.py
+++ b/ambari-common/src/main/python/resource_management/core/providers/system.py
@@ -258,8 +258,8 @@ class ExecuteProvider(Provider):
                             wait_for_finish=self.resource.wait_for_finish,
                             timeout=self.resource.timeout,
                             path=self.resource.path,
-                            output_file=self.resource.output_file,
-                            sudo=self.resource.sudo)
+                            sudo=self.resource.sudo,
+                            on_new_line=self.resource.on_new_line)
         break
       except Fail as ex:
         if i == self.resource.tries-1: # last try

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/resources/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/system.py b/ambari-common/src/main/python/resource_management/core/resources/system.py
index b0729d1..b7958d9 100644
--- a/ambari-common/src/main/python/resource_management/core/resources/system.py
+++ b/ambari-common/src/main/python/resource_management/core/resources/system.py
@@ -92,7 +92,21 @@ class Execute(Resource):
   try_sleep = ResourceArgument(default=0) # seconds
   path = ForcedListArgument(default=[])
   actions = Resource.actions + ["run"]
-  logoutput = BooleanArgument(default=False)
+  # TODO: handle how this is logged / tested?
+  """
+  A one-argument function, which will be executed,
+  once new line comes into command output.
+  
+  The only parameter of this function is a new line which comes to output.
+  """
+  on_new_line = ResourceArgument()
+  """
+  True           -  log it in INFO mode
+  False          -  never log it
+  None (default) -  log it in DEBUG mode
+  """
+  logoutput = ResourceArgument(default=None)
+
   """
   if on_timeout is not set leads to failing after x seconds,
   otherwise calls on_timeout
@@ -110,7 +124,6 @@ class Execute(Resource):
   - try_sleep
   """
   wait_for_finish = BooleanArgument(default=True)
-  output_file = ResourceArgument()
   """
   For calling more advanced commands use as_sudo(command) option.
   Example:

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/shell.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/shell.py b/ambari-common/src/main/python/resource_management/core/shell.py
index df92e45..886f0f1 100644
--- a/ambari-common/src/main/python/resource_management/core/shell.py
+++ b/ambari-common/src/main/python/resource_management/core/shell.py
@@ -21,11 +21,14 @@ Ambari Agent
 """
 import os
 
-__all__ = ["checked_call", "call", "quote_bash_args", "as_user", "as_sudo"]
+__all__ = ["non_blocking_call", "checked_call", "call", "quote_bash_args", "as_user", "as_sudo"]
 
+import sys
+import logging
 import string
 import subprocess
 import threading
+import traceback
 from multiprocessing import Queue
 from exceptions import Fail
 from exceptions import ExecuteTimeoutException
@@ -40,23 +43,35 @@ PLACEHOLDERS_TO_STR = {
 }
 
 def checked_call(command, verbose=False, logoutput=False,
-         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, output_file=None, sudo=False):
+         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
   """
-  Execute the process and throw an exception on failure.
-  @return: return_code, stdout
+  Execute the shell command and throw an exception on failure.
+  @throws Fail
+  @return: return_code, output
   """
-  return _call(command, verbose, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, output_file, sudo)
+  return _call(command, verbose, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo, on_new_line)
 
 def call(command, verbose=False, logoutput=False,
-         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, output_file=None, sudo=False):
+         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
   """
-  Execute the process despite failures.
-  @return: return_code, stdout
+  Execute the shell command despite failures.
+  @return: return_code, output
   """
-  return _call(command, verbose, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, output_file, sudo)
+  return _call(command, verbose, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo, on_new_line)
+
+def non_blocking_call(command, verbose=False,
+         cwd=None, env=None, preexec_fn=None, user=None, timeout=None, path=None, sudo=False):
+  """
+  Execute the shell command and don't wait until it's completion
+  
+  @return: process object -- Popen instance 
+  (use proc.stdout.readline to read output in cycle, don't foget to proc.stdout.close(),
+  to get return code use proc.wait() and after that proc.returncode)
+  """
+  return _call(command, verbose, False, True, cwd, env, preexec_fn, user, False, timeout, path, sudo, None)
 
 def _call(command, verbose=False, logoutput=False, throw_on_failure=True,
-         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, output_file=None, sudo=False):
+         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
   """
   Execute shell command
   
@@ -64,14 +79,12 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True,
   or string of the command to execute
   @param logoutput: boolean, whether command output should be logged of not
   @param throw_on_failure: if true, when return code is not zero exception is thrown
-  
-  @return: return_code, stdout
   """
 
   command_alias = string_cmd_from_args_list(command) if isinstance(command, (list, tuple)) else command
   
   # Append current PATH to env['PATH']
-  env = add_current_path_to_env(env)
+  env = _add_current_path_to_env(env)
   # Append path to env['PATH']
   if path:
     path = os.pathsep.join(path) if isinstance(path, (list, tuple)) else path
@@ -88,7 +101,7 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True,
     command = string_cmd_from_args_list(command)
     
   # replace placeholder from as_sudo / as_user if present
-  env_str = get_environment_str(env)
+  env_str = _get_environment_str(env)
   for placeholder, replacement in PLACEHOLDERS_TO_STR.iteritems():
     command = command.replace(placeholder, replacement.format(env_str=env_str))
   
@@ -100,16 +113,37 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True,
   proc = subprocess.Popen(subprocess_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
                           cwd=cwd, env=env, shell=False,
                           preexec_fn=preexec_fn)
-
-  if not wait_for_finish:
-    return None, None
   
   if timeout:
     q = Queue()
-    t = threading.Timer( timeout, on_timeout, [proc, q] )
+    t = threading.Timer( timeout, _on_timeout, [proc, q] )
     t.start()
     
-  out = proc.communicate()[0].strip('\n')
+  if not wait_for_finish:
+    return proc
+    
+  # in case logoutput==False, never log.    
+  logoutput = logoutput==True and Logger.logger.isEnabledFor(logging.INFO) or logoutput==None and Logger.logger.isEnabledFor(logging.DEBUG)
+  out = ""
+  
+  try:
+    for line in iter(proc.stdout.readline, b''):
+      out += line
+      
+      try:
+        if on_new_line:
+          on_new_line(line)
+      except Exception, err:
+        err_msg = "Caused by on_new_line function failed with exception for input argument '{0}':\n{1}".format(line, traceback.format_exc())
+        raise Fail(err_msg)
+        
+      if logoutput:
+        _print(line)
+  finally:
+    proc.stdout.close()
+    
+  proc.wait()  
+  out = out.strip('\n')
   
   if timeout:
     if q.empty():
@@ -120,9 +154,6 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True,
    
   code = proc.returncode
   
-  if logoutput and out:
-    Logger.info(out)
-  
   if throw_on_failure and code:
     err_msg = Logger.filter_text(("Execution of '%s' returned %d. %s") % (command_alias, code, out))
     raise Fail(err_msg)
@@ -147,17 +178,26 @@ def as_sudo(command, env=None):
     err_msg = Logger.filter_text(("String command '%s' cannot be run as sudo. Please supply the command as a tuple of arguments") % (command))
     raise Fail(err_msg)
 
-  env = get_environment_str(add_current_path_to_env(env)) if env else ENV_PLACEHOLDER
+  env = _get_environment_str(_add_current_path_to_env(env)) if env else ENV_PLACEHOLDER
   return "/usr/bin/sudo {0} -H -E {1}".format(env, command)
 
 def as_user(command, user, env=None):
   if isinstance(command, (list, tuple)):
     command = string_cmd_from_args_list(command)
 
-  export_env = "export {0} ; ".format(get_environment_str(add_current_path_to_env(env))) if env else EXPORT_PLACEHOLDER
+  export_env = "export {0} ; ".format(_get_environment_str(_add_current_path_to_env(env))) if env else EXPORT_PLACEHOLDER
   return "/usr/bin/sudo su {0} -l -s /bin/bash -c {1}".format(user, quote_bash_args(export_env + command))
 
-def add_current_path_to_env(env):
+def quote_bash_args(command):
+  if not command:
+    return "''"
+  valid = set(string.ascii_letters + string.digits + '@%_-+=:,./')
+  for char in command:
+    if char not in valid:
+      return "'" + command.replace("'", "'\"'\"'") + "'"
+  return command
+
+def _add_current_path_to_env(env):
   result = {} if not env else env
   
   if not 'PATH' in result:
@@ -169,25 +209,20 @@ def add_current_path_to_env(env):
   
   return result
   
-def get_environment_str(env):
+def _get_environment_str(env):
   return reduce(lambda str,x: '{0} {1}={2}'.format(str,x,quote_bash_args(env[x])), env, '')
 
 def string_cmd_from_args_list(command):
   return ' '.join(quote_bash_args(x) for x in command)
 
-def on_timeout(proc, q):
+def _on_timeout(proc, q):
   q.put(True)
   if proc.poll() == None:
     try:
       proc.terminate()
     except:
       pass
-
-def quote_bash_args(command):
-  if not command:
-    return "''"
-  valid = set(string.ascii_letters + string.digits + '@%_-+=:,./')
-  for char in command:
-    if char not in valid:
-      return "'" + command.replace("'", "'\"'\"'") + "'"
-  return command
\ No newline at end of file
+    
+def _print(line):
+  sys.stdout.write(line)
+  sys.stdout.flush()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py b/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py
index 8b61331..b4b0b52 100644
--- a/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py
+++ b/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py
@@ -30,7 +30,7 @@ class ExecuteHadoop(Resource):
   tries = ResourceArgument(default=1)
   try_sleep = ResourceArgument(default=0) # seconds
   user = ResourceArgument()
-  logoutput = BooleanArgument(default=False)
+  logoutput = ResourceArgument()
   principal = ResourceArgument(default=lambda obj: obj.user)
   bin_dir = ResourceArgument(default=[]) # appended to $PATH
   environment = ResourceArgument(default={})

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
index 88af1f9..529ca44 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
@@ -20,7 +20,7 @@ limitations under the License.
 from resource_management.core.logger import Logger
 from resource_management.core.exceptions import Fail
 from resource_management.core.resources.system import Execute
-from resource_management.core.shell import call
+from resource_management.core import shell
 from resource_management.libraries.functions import format
 from resource_management.libraries.functions.decorator import retry
 
@@ -99,7 +99,7 @@ def _check_datanode_startup():
   try:
     # 'su - hdfs -c "hdfs dfsadmin -report -live"'
     command = 'hdfs dfsadmin -report -live'
-    return_code, hdfs_output = call(command, user=params.hdfs_user)
+    return_code, hdfs_output = shell.call(command, user=params.hdfs_user)
   except:
     raise Fail('Unable to determine if the DataNode has started after upgrade.')
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
index 3eb9cc2..e8dfe16 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
@@ -187,16 +187,9 @@ class NameNode(Script):
     _print("Executing command %s\n" % command)
 
     parser = hdfs_rebalance.HdfsParser()
-    proc = subprocess.Popen(
-                            command,
-                            stdout=subprocess.PIPE,
-                            stderr=subprocess.PIPE,
-                            shell=True,
-                            close_fds=True,
-                            cwd=basedir
-                           )
-    for line in iter(proc.stdout.readline, ''):
-      _print('[balancer] %s %s' % (str(datetime.now()), line ))
+
+    def handle_new_line(line):
+      _print('[balancer] %s' % (line))
       pl = parser.parseLine(line)
       if pl:
         res = pl.toJson()
@@ -204,14 +197,14 @@ class NameNode(Script):
 
         self.put_structured_out(res)
       elif parser.state == 'PROCESS_FINISED' :
-        _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' ))
+        _print('[balancer] %s' % ('Process is finished' ))
         self.put_structured_out({'completePercent' : 1})
-        break
+        return
 
-    proc.stdout.close()
-    proc.wait()
-    if proc.returncode != None and proc.returncode != 0:
-      raise Fail('Hdfs rebalance process exited with error. See the log output')
+    Execute(command,
+            on_new_line = handle_new_line,
+            logoutput = False,
+    )
 
 def _print(line):
   sys.stdout.write(line)

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
index 6f421b6..b9bd273 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
@@ -98,7 +98,7 @@ def kill_zkfc(zkfc_user):
       if code == 0:
         Logger.debug("ZKFC is running and will be killed to initiate namenode failover.")
         kill_command = format("{check_process} && kill -9 `cat {zkfc_pid_file}` > /dev/null 2>&1")
-        checked_call(kill_command, verbose=True)
+        Execute(kill_command)
 
 
 def get_service_pid_file(name, user):

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py
index 653d4bd..37fe7f1 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py
@@ -21,7 +21,7 @@ import re
 from resource_management.core.logger import Logger
 from resource_management.core.exceptions import Fail
 from resource_management.core.resources.system import Execute
-from resource_management.core.shell import call
+from resource_management.core import shell
 from resource_management.libraries.functions import format
 
 
@@ -65,7 +65,7 @@ def _get_current_hiveserver_version():
 
   try:
     command = 'hdp-select status hive-server2'
-    return_code, hdp_output = call(command, user=params.hive_user)
+    return_code, hdp_output = shell.call(command, user=params.hive_user)
   except Exception, e:
     Logger.error(str(e))
     raise Fail('Unable to execute hdp-select command to retrieve the hiveserver2 version.')

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
index c2b4bc1..ac7014d 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
@@ -17,49 +17,54 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
-from __future__ import print_function
 from resource_management import *
-import  sys,subprocess,os
 
 class ServiceCheck(Script):
-    def service_check(self, env):
-        import params
-        env.set_params(params)
-        kafka_config=self.read_kafka_config(params.conf_dir)
-        self.set_env(params.conf_dir)
-        create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"."
-        create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists."
-	print("Running kafka create topic command", file=sys.stdout)
-        create_topic_cmd = [params.kafka_home+'/bin/kafka-topics.sh', '--zookeeper '+kafka_config['zookeeper.connect'],
-                            '--create --topic ambari_kafka_service_check', '--partitions 1 --replication-factor 1']
-	print(" ".join(create_topic_cmd), file=sys.stdout)
-        create_topic_process = subprocess.Popen(create_topic_cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
-        out, err = create_topic_process.communicate()
-        if out.find(create_topic_cmd_created_output) != -1:
-	    print(out, file=sys.stdout)
-            sys.exit(0)
-        elif out.find(create_topic_cmd_exists_output) != -1:
-            print("Topic ambari_kafka_service_check exists", file=sys.stdout)
-            sys.exit(0)
-        else:
-	    print(out, file=sys.stderr)
-            sys.exit(1)
+  def service_check(self, env):
+      import params
+      env.set_params(params)
+      
+      kafka_config = self.read_kafka_config()
+      environment = self.get_env()
+      
+      create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"."
+      create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists."
+      
+      print "Running kafka create topic command"
+      create_topic_cmd = (params.kafka_home+'/bin/kafka-topics.sh', '--zookeeper '+kafka_config['zookeeper.connect'],
+                          '--create --topic ambari_kafka_service_check', '--partitions 1 --replication-factor 1')
+      
+      code, out = shell.checked_call(create_topic_cmd, 
+                                     verbose=True, env=environment)
 
-    def read_kafka_config(self,kafka_conf_dir):
-        conf_file = open(kafka_conf_dir+"/server.properties","r")
-        kafka_config = {}
-        for line in conf_file:
-            key,value = line.split("=")
-            kafka_config[key] = value.replace("\n","")
-        return kafka_config
+      if out.find(create_topic_cmd_created_output) != -1:
+          print out
+      elif out.find(create_topic_cmd_exists_output) != -1:
+          print "Topic ambari_kafka_service_check exists"
+      else:
+          raise Fail(out)
 
-    def set_env(self, kafka_conf_dir):
-        command = ['bash', '-c', 'source '+kafka_conf_dir+'/kafka-env.sh && env']
-        proc = subprocess.Popen(command, stdout = subprocess.PIPE)
-        for line in proc.stdout:
-            (key, _, value) = line.partition("=")
-            os.environ[key] = value.replace("\n","")
-        proc.communicate()
+  def read_kafka_config(self):
+    import params
+    
+    kafka_config = {}
+    with open(params.conf_dir+"/server.properties","r") as conf_file:
+      for line in conf_file:
+          key,value = line.split("=")
+          kafka_config[key] = value.replace("\n","")
+    
+    return kafka_config
+
+  def get_env(self):
+    import params
+    code, out = shell.checked_call(format('source {conf_dir}/kafka-env.sh && env'))
+    
+    environment = {}
+    for line in out.split("\n"):
+      (key, _, value) = line.partition("=")
+      environment[key] = value.replace("\n","")
+      
+    return environment
 
 if __name__ == "__main__":
     ServiceCheck().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py
index e82c320..54e0fae 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py
@@ -22,7 +22,7 @@ import subprocess
 from resource_management.core.logger import Logger
 from resource_management.core.exceptions import Fail
 from resource_management.core.resources.system import Execute
-from resource_management.core.shell import call
+from resource_management.core import shell
 from resource_management.libraries.functions.decorator import retry
 
 
@@ -56,7 +56,7 @@ def _check_nodemanager_startup():
 
   try:
     # 'su - yarn -c "yarn node -status c6401.ambari.apache.org:45454"'
-    return_code, yarn_output = call(command, user=params.hdfs_user)
+    return_code, yarn_output = shell.call(command, user=params.hdfs_user)
   except:
     raise Fail('Unable to determine if the NodeManager has started after upgrade.')
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py
index d16a749..42e195c 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py
@@ -312,7 +312,7 @@ class KerberosScript(Script):
       # If a test keytab file is available, simply use it
       if (keytab_file is not None) and (os.path.isfile(keytab_file)):
         command = 'kinit -k -t %s %s' % (keytab_file, principal)
-        shell.checked_call(command)
+        Execute(command)
         return shell.checked_call('kdestroy')
 
       # If base64-encoded test keytab data is available; then decode it, write it to a temporary file
@@ -324,7 +324,7 @@ class KerberosScript(Script):
 
         try:
           command = 'kinit -k -t %s %s' % (test_keytab_file, principal)
-          shell.checked_call(command)
+          Execute(command)
           return shell.checked_call('kdestroy')
         except:
           raise

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
index 1c032ff..2fc8549 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
@@ -20,6 +20,7 @@ limitations under the License.
 from stacks.utils.RMFTestCase import *
 import json
 from mock.mock import MagicMock, patch
+from resource_management.core import shell
 from resource_management.core.exceptions import Fail
 
 class TestDatanode(RMFTestCase):
@@ -441,7 +442,7 @@ class TestDatanode(RMFTestCase):
 
 
   @patch('time.sleep')
-  @patch("subprocess.Popen")
+  @patch.object(shell, "call")
   def test_post_rolling_restart(self, process_mock, time_mock):
     process_output = """
       Live datanodes (2):
@@ -464,10 +465,7 @@ class TestDatanode(RMFTestCase):
       Last contact: Fri Dec 12 20:47:21 UTC 2014
     """
 
-    process = MagicMock()
-    process.communicate.return_value = [process_output]
-    process.returncode = 0
-    process_mock.return_value = process
+    process_mock.return_value = (0, process_output)
 
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
                        classname = "DataNode",
@@ -482,12 +480,9 @@ class TestDatanode(RMFTestCase):
 
 
   @patch('time.sleep')
-  @patch("subprocess.Popen")
+  @patch.object(shell, "call")
   def test_post_rolling_restart_datanode_not_ready(self, process_mock, time_mock):
-    process = MagicMock()
-    process.communicate.return_value = ['There are no DataNodes here!']
-    process.returncode = 0
-    process_mock.return_value = process
+    process_mock.return_value = (0, 'There are no DataNodes here!')
 
     try:
       self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
@@ -504,12 +499,9 @@ class TestDatanode(RMFTestCase):
 
 
   @patch('time.sleep')
-  @patch("subprocess.Popen")
+  @patch.object(shell, "call")
   def test_post_rolling_restart_bad_returncode(self, process_mock, time_mock):
-    process = MagicMock()
-    process.communicate.return_value = ['some']
-    process.returncode = 999
-    process_mock.return_value = process
+    process_mock.return_value = (0, 'some')
 
     try:
       self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
index 78a2f72..3981e33 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
@@ -88,6 +88,7 @@ class TestNamenode(RMFTestCase):
         environment = {'HADOOP_LIBEXEC_DIR': '/usr/lib/hadoop/libexec'},
         not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid` >/dev/null 2>&1',
     )
+    self.printResources()
     self.assertResourceCalled('Execute', 'hdfs --config /etc/hadoop/conf dfsadmin -safemode leave',
         path = ['/usr/bin'],
         user = 'hdfs',
@@ -776,26 +777,6 @@ class TestNamenode(RMFTestCase):
 
   @patch("resource_management.libraries.script.Script.put_structured_out")
   def test_rebalance_hdfs(self, pso):
-    Popen_Mock.return_value = 1
-    with patch("subprocess.Popen", new_callable=Popen_Mock):
-      ll = subprocess.Popen()
-      self.assertTrue(isinstance(ll.stdout.readline(),str))
-      try:
-        self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py",
-                           classname = "NameNode",
-                           command = "rebalancehdfs",
-                           config_file = "rebalancehdfs_default.json",
-                           hdp_stack_version = self.STACK_VERSION,
-                           target = RMFTestCase.TARGET_COMMON_SERVICES
-        )
-        self.fail("Exception was not thrown")
-      except  resource_management.core.exceptions.Fail:
-        pass ##expected
-
-      pso.reset_mock()
-      Popen_Mock.return_value = 0
-      ll = subprocess.Popen()
-      self.assertTrue(isinstance(ll.stdout.readline(),str))
       self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py",
                          classname = "NameNode",
                          command = "rebalancehdfs",
@@ -803,7 +784,11 @@ class TestNamenode(RMFTestCase):
                          hdp_stack_version = self.STACK_VERSION,
                          target = RMFTestCase.TARGET_COMMON_SERVICES
       )
-      self.assertEqual(pso.call_count, 2, "Output was not parsed properly")
+      self.assertResourceCalled('Execute', "/usr/bin/sudo su hdfs -l -s /bin/bash -c 'export  PATH=/bin:/usr/bin ; hdfs --config /etc/hadoop/conf balancer -threshold -1'",
+          logoutput = False,
+          on_new_line = FunctionMock('handle_new_line'),
+      )
+      self.assertNoMoreResources()
 
 class Popen_Mock:
   return_value = 1

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py b/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py
index 8046313..0adb266 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py
@@ -536,25 +536,15 @@ class TestHiveServer(RMFTestCase):
 
   @patch("hive_server.HiveServer.pre_rolling_restart")
   @patch("hive_server.HiveServer.start")
-  @patch("subprocess.Popen")
-  def test_stop_during_upgrade(self, process_mock, hive_server_start_mock,
+  @patch.object(shell, "call", new=MagicMock(return_value=(0,"hive-server2 - 2.2.0.0-2041")))
+  def test_stop_during_upgrade(self, hive_server_start_mock,
     hive_server_pre_rolling_mock):
-
-    process_output = 'hive-server2 - 2.2.0.0-2041'
-
-    process = MagicMock()
-    process.communicate.return_value = [process_output]
-    process.returncode = 0
-    process_mock.return_value = process
-
+    
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
      classname = "HiveServer", command = "restart", config_file = "hive-upgrade.json",
      hdp_stack_version = self.UPGRADE_STACK_VERSION,
      target = RMFTestCase.TARGET_COMMON_SERVICES )
 
-    self.assertTrue(process_mock.called)
-    self.assertEqual(process_mock.call_count,2)
-
     self.assertResourceCalled('Execute', 'hive --service hiveserver2 --deregister 2.2.0.0-2041',
       path=['/bin:/usr/hdp/current/hive-server2/bin:/usr/hdp/current/hadoop-client/bin'],
       tries=1, user='hive')
@@ -564,17 +554,9 @@ class TestHiveServer(RMFTestCase):
 
   @patch("hive_server.HiveServer.pre_rolling_restart")
   @patch("hive_server.HiveServer.start")
-  @patch("subprocess.Popen")
-  def test_stop_during_upgrade_bad_hive_version(self, process_mock, hive_server_start_mock,
+  @patch.object(shell, "call", new=MagicMock(return_value=(0,"BAD VERSION")))
+  def test_stop_during_upgrade_bad_hive_version(self, hive_server_start_mock,
     hive_server_pre_rolling_mock):
-
-    process_output = 'BAD VERSION'
-
-    process = MagicMock()
-    process.communicate.return_value = [process_output]
-    process.returncode = 0
-    process_mock.return_value = process
-
     try:
       self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
        classname = "HiveServer", command = "restart", config_file = "hive-upgrade.json",

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
index d4229dc..bc9b831 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
@@ -20,6 +20,7 @@ limitations under the License.
 from mock.mock import MagicMock, call, patch
 from stacks.utils.RMFTestCase import *
 from resource_management.core.exceptions import Fail
+from resource_management.core import shell
 import os
 
 origin_exists = os.path.exists
@@ -559,16 +560,13 @@ class TestNodeManager(RMFTestCase):
                               )
 
   @patch('time.sleep')
-  @patch("subprocess.Popen")
+  @patch.object(shell, "call")
   def test_post_rolling_restart(self, process_mock, time_mock):
     process_output = """
       c6401.ambari.apache.org:45454  RUNNING  c6401.ambari.apache.org:8042  0
     """
 
-    process = MagicMock()
-    process.communicate.return_value = [process_output]
-    process.returncode = 0
-    process_mock.return_value = process
+    process_mock.return_value = (0, process_output)
 
     self.executeScript("2.0.6/services/YARN/package/scripts/nodemanager.py",
       classname="Nodemanager", command = "post_rolling_restart", config_file="default.json")
@@ -578,16 +576,13 @@ class TestNodeManager(RMFTestCase):
 
 
   @patch('time.sleep')
-  @patch("subprocess.Popen")
+  @patch.object(shell, "call")
   def test_post_rolling_restart_nodemanager_not_ready(self, process_mock, time_mock):
     process_output = """
       c9999.ambari.apache.org:45454  RUNNING  c9999.ambari.apache.org:8042  0
     """
 
-    process = MagicMock()
-    process.communicate.return_value = [process_output]
-    process.returncode = 0
-    process_mock.return_value = process
+    process_mock.return_value = (0, process_output)
 
     try:
       self.executeScript("2.0.6/services/YARN/package/scripts/nodemanager.py",
@@ -599,16 +594,13 @@ class TestNodeManager(RMFTestCase):
 
 
   @patch('time.sleep')
-  @patch("subprocess.Popen")
+  @patch.object(shell, "call")
   def test_post_rolling_restart_nodemanager_not_ready(self, process_mock, time_mock):
     process_output = """
       c6401.ambari.apache.org:45454  RUNNING  c6401.ambari.apache.org:8042  0
     """
 
-    process = MagicMock()
-    process.communicate.return_value = [process_output]
-    process.returncode = 999
-    process_mock.return_value = process
+    process_mock.return_value = (999, process_output)
 
     try:
       self.executeScript("2.0.6/services/YARN/package/scripts/nodemanager.py",

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/utils/RMFTestCase.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/utils/RMFTestCase.py b/ambari-server/src/test/python/stacks/utils/RMFTestCase.py
index 63bcdb8..b8e819d 100644
--- a/ambari-server/src/test/python/stacks/utils/RMFTestCase.py
+++ b/ambari-server/src/test/python/stacks/utils/RMFTestCase.py
@@ -17,7 +17,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 '''
-__all__ = ["RMFTestCase", "Template", "StaticFile", "InlineTemplate", "UnknownConfigurationMock"]
+__all__ = ["RMFTestCase", "Template", "StaticFile", "InlineTemplate", "UnknownConfigurationMock", "FunctionMock"]
 
 from unittest import TestCase
 import json
@@ -172,6 +172,8 @@ class RMFTestCase(TestCase):
           val = oct(v)
         elif  isinstance( v, UnknownConfiguration):
           val = "UnknownConfigurationMock()"
+        elif hasattr(v, '__call__') and hasattr(v, '__name__'):
+          val = "FunctionMock('{0}')".format(v.__name__)
         else:
           val = self._ppformat(v)
         # If value is multiline, format it
@@ -196,7 +198,7 @@ class RMFTestCase(TestCase):
     print(self.reindent("self.assertNoMoreResources()", intendation))
   
   def assertResourceCalled(self, resource_type, name, **kwargs):
-    with patch.object(UnknownConfiguration, '__getattr__', return_value=lambda: "UnknownConfiguration()"): 
+    with patch.object(UnknownConfiguration, '__getattr__', return_value=lambda: "UnknownConfiguration()"):
       self.assertNotEqual(len(RMFTestCase.env.resource_list), 0, "There was no more resources executed!")
       resource = RMFTestCase.env.resource_list.pop(0)
       
@@ -240,4 +242,15 @@ class UnknownConfigurationMock():
   
   def __repr__(self):
     return "UnknownConfigurationMock()"
+  
+class FunctionMock():
+  def __init__(self, name):
+    self.name = name
+    
+  def __ne__(self, other):
+    return not self.__eq__(other)
+    
+  def __eq__(self, other):
+    return hasattr(other, '__call__') and hasattr(other, '__name__') and self.name == other.__name__
+