You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2014/12/25 15:18:39 UTC
ambari git commit: AMBARI-8916. Show output of Execute commands
concurrently (for timeouts, long ops) (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/trunk ea1c9f1ac -> 79cffa16d
AMBARI-8916. Show output of Execute commands concurrently (for timeouts, long ops) (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/79cffa16
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/79cffa16
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/79cffa16
Branch: refs/heads/trunk
Commit: 79cffa16dcf08a5baddbc861cf0212449deb5956
Parents: ea1c9f1
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Thu Dec 25 16:18:28 2014 +0200
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Thu Dec 25 16:18:28 2014 +0200
----------------------------------------------------------------------
.../TestExecuteHadoopResource.py | 6 +-
.../resource_management/TestExecuteResource.py | 10 +-
.../resource_management/TestGroupResource.py | 5 +
.../resource_management/TestUserResource.py | 11 ++
.../python/resource_management/core/logger.py | 4 +
.../core/providers/system.py | 4 +-
.../core/resources/system.py | 17 ++-
.../python/resource_management/core/shell.py | 107 ++++++++++++-------
.../libraries/resources/execute_hadoop.py | 2 +-
.../package/scripts/datanode_upgrade.py | 4 +-
.../HDFS/2.1.0.2.0/package/scripts/namenode.py | 25 ++---
.../HDFS/2.1.0.2.0/package/scripts/utils.py | 2 +-
.../package/scripts/hive_server_upgrade.py | 4 +-
.../0.8.1.2.2/package/scripts/service_check.py | 81 +++++++-------
.../YARN/package/scripts/nodemanager_upgrade.py | 4 +-
.../KERBEROS/package/scripts/kerberos_common.py | 4 +-
.../python/stacks/2.0.6/HDFS/test_datanode.py | 22 ++--
.../python/stacks/2.0.6/HDFS/test_namenode.py | 27 ++---
.../stacks/2.0.6/HIVE/test_hive_server.py | 28 +----
.../stacks/2.0.6/YARN/test_nodemanager.py | 22 ++--
.../src/test/python/stacks/utils/RMFTestCase.py | 17 ++-
21 files changed, 222 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py b/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py
index e4fe8df..ae137a5 100644
--- a/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py
@@ -65,7 +65,7 @@ class TestExecuteHadoopResource(TestCase):
self.assertEqual(execute_mock.call_count, 1)
self.assertEqual(execute_mock.call_args[0][0].command,'hadoop --config conf_dir command')
self.assertEqual(execute_mock.call_args[0][0].arguments,
- {'logoutput': False,
+ {'logoutput': None,
'tries': 1,
'user': 'user',
'try_sleep': 0,
@@ -122,14 +122,14 @@ class TestExecuteHadoopResource(TestCase):
self.assertEqual(execute_mock.call_args_list[1][0][0].command,
'hadoop --config conf_dir command2')
self.assertEqual(execute_mock.call_args_list[0][0][0].arguments,
- {'logoutput': False,
+ {'logoutput': None,
'tries': 1,
'user': 'user',
'environment': {},
'try_sleep': 0,
'path': []})
self.assertEqual(execute_mock.call_args_list[1][0][0].arguments,
- {'logoutput': False,
+ {'logoutput': None,
'tries': 1,
'user': 'user',
'try_sleep': 0,
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestExecuteResource.py b/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
index 87a637c..a0b375b 100644
--- a/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
@@ -38,6 +38,7 @@ class TestExecuteResource(TestCase):
def test_attribute_logoutput(self, popen_mock, info_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
subproc_mock.communicate.side_effect = [["1"], ["2"]]
popen_mock.return_value = subproc_mock
@@ -69,6 +70,7 @@ class TestExecuteResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
subproc_mock.communicate.side_effect = [["1"]]
popen_mock.return_value = subproc_mock
@@ -83,6 +85,7 @@ class TestExecuteResource(TestCase):
def test_attribute_path(self, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
subproc_mock.communicate.side_effect = [["1"]]
popen_mock.return_value = subproc_mock
@@ -100,7 +103,7 @@ class TestExecuteResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.communicate.side_effect = [Fail("Fail"), ["1"]]
+ subproc_mock.stdout.readline = MagicMock(side_effect = [Fail("Fail"), "OK"])
popen_mock.return_value = subproc_mock
with Environment("/") as env:
@@ -150,6 +153,7 @@ class TestExecuteResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
subproc_mock.communicate.side_effect = [["1"]]
popen_mock.return_value = subproc_mock
@@ -167,6 +171,7 @@ class TestExecuteResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
subproc_mock.communicate.side_effect = [["1"]]
popen_mock.return_value = subproc_mock
@@ -188,6 +193,7 @@ class TestExecuteResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
subproc_mock.communicate.side_effect = [["1"]]
popen_mock.return_value = subproc_mock
@@ -209,6 +215,7 @@ class TestExecuteResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
with Environment("/") as env:
@@ -238,6 +245,7 @@ class TestExecuteResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
with Environment("/") as env:
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestGroupResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestGroupResource.py b/ambari-agent/src/test/python/resource_management/TestGroupResource.py
index 29decbb..7ef487a 100644
--- a/ambari-agent/src/test/python/resource_management/TestGroupResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestGroupResource.py
@@ -36,6 +36,7 @@ class TestGroupResource(TestCase):
def test_action_create_nonexistent(self, popen_mock, getgrnam_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getgrnam_mock.side_effect = KeyError()
@@ -56,6 +57,7 @@ class TestGroupResource(TestCase):
def test_action_create_existent(self, popen_mock, getgrnam_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getgrnam_mock.return_value = "mapred"
@@ -77,6 +79,7 @@ class TestGroupResource(TestCase):
def test_action_create_fail(self, popen_mock, getgrnam_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 1
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getgrnam_mock.return_value = "mapred"
@@ -102,6 +105,7 @@ class TestGroupResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getgrnam_mock.return_value = "mapred"
@@ -122,6 +126,7 @@ class TestGroupResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 1
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getgrnam_mock.return_value = "mapred"
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestUserResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestUserResource.py b/ambari-agent/src/test/python/resource_management/TestUserResource.py
index 458053b..f66b738 100644
--- a/ambari-agent/src/test/python/resource_management/TestUserResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestUserResource.py
@@ -35,6 +35,7 @@ class TestUserResource(TestCase):
def test_action_create_nonexistent(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = None
with Environment('/') as env:
@@ -48,6 +49,7 @@ class TestUserResource(TestCase):
def test_action_create_existent(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = 1
@@ -62,6 +64,7 @@ class TestUserResource(TestCase):
def test_action_delete(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = 1
@@ -76,6 +79,7 @@ class TestUserResource(TestCase):
def test_attribute_comment(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = 1
@@ -91,6 +95,7 @@ class TestUserResource(TestCase):
def test_attribute_home(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = 1
@@ -106,6 +111,7 @@ class TestUserResource(TestCase):
def test_attribute_password(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = 1
@@ -121,6 +127,7 @@ class TestUserResource(TestCase):
def test_attribute_shell(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = 1
@@ -135,6 +142,7 @@ class TestUserResource(TestCase):
def test_attribute_uid(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = 1
@@ -149,6 +157,7 @@ class TestUserResource(TestCase):
def test_attribute_gid(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = 1
@@ -163,6 +172,7 @@ class TestUserResource(TestCase):
def test_attribute_groups(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = 1
@@ -178,6 +188,7 @@ class TestUserResource(TestCase):
def test_missing_shell_argument(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
+ subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = None
with Environment('/') as env:
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/logger.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/logger.py b/ambari-common/src/main/python/resource_management/core/logger.py
index 6ff40d6..69a55fb 100644
--- a/ambari-common/src/main/python/resource_management/core/logger.py
+++ b/ambari-common/src/main/python/resource_management/core/logger.py
@@ -105,8 +105,12 @@ class Logger:
val = oct(y)
except:
val = repr(y)
+ # for functions show only function name
+ elif hasattr(y, '__call__') and hasattr(y, '__name__'):
+ val = y.__name__
else:
val = repr(y)
+
arguments_str += "'{0}': {1}, ".format(x, val)
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/providers/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/system.py b/ambari-common/src/main/python/resource_management/core/providers/system.py
index b3949df..fc06a78 100644
--- a/ambari-common/src/main/python/resource_management/core/providers/system.py
+++ b/ambari-common/src/main/python/resource_management/core/providers/system.py
@@ -258,8 +258,8 @@ class ExecuteProvider(Provider):
wait_for_finish=self.resource.wait_for_finish,
timeout=self.resource.timeout,
path=self.resource.path,
- output_file=self.resource.output_file,
- sudo=self.resource.sudo)
+ sudo=self.resource.sudo,
+ on_new_line=self.resource.on_new_line)
break
except Fail as ex:
if i == self.resource.tries-1: # last try
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/resources/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/system.py b/ambari-common/src/main/python/resource_management/core/resources/system.py
index b0729d1..b7958d9 100644
--- a/ambari-common/src/main/python/resource_management/core/resources/system.py
+++ b/ambari-common/src/main/python/resource_management/core/resources/system.py
@@ -92,7 +92,21 @@ class Execute(Resource):
try_sleep = ResourceArgument(default=0) # seconds
path = ForcedListArgument(default=[])
actions = Resource.actions + ["run"]
- logoutput = BooleanArgument(default=False)
+ # TODO: handle how this is logged / tested?
+ """
+ A one-argument function, which will be executed,
+ once new line comes into command output.
+
+ The only parameter of this function is a new line which comes to output.
+ """
+ on_new_line = ResourceArgument()
+ """
+ True - log it in INFO mode
+ False - never log it
+ None (default) - log it in DEBUG mode
+ """
+ logoutput = ResourceArgument(default=None)
+
"""
if on_timeout is not set leads to failing after x seconds,
otherwise calls on_timeout
@@ -110,7 +124,6 @@ class Execute(Resource):
- try_sleep
"""
wait_for_finish = BooleanArgument(default=True)
- output_file = ResourceArgument()
"""
For calling more advanced commands use as_sudo(command) option.
Example:
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/shell.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/shell.py b/ambari-common/src/main/python/resource_management/core/shell.py
index df92e45..886f0f1 100644
--- a/ambari-common/src/main/python/resource_management/core/shell.py
+++ b/ambari-common/src/main/python/resource_management/core/shell.py
@@ -21,11 +21,14 @@ Ambari Agent
"""
import os
-__all__ = ["checked_call", "call", "quote_bash_args", "as_user", "as_sudo"]
+__all__ = ["non_blocking_call", "checked_call", "call", "quote_bash_args", "as_user", "as_sudo"]
+import sys
+import logging
import string
import subprocess
import threading
+import traceback
from multiprocessing import Queue
from exceptions import Fail
from exceptions import ExecuteTimeoutException
@@ -40,23 +43,35 @@ PLACEHOLDERS_TO_STR = {
}
def checked_call(command, verbose=False, logoutput=False,
- cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, output_file=None, sudo=False):
+ cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
"""
- Execute the process and throw an exception on failure.
- @return: return_code, stdout
+ Execute the shell command and throw an exception on failure.
+ @throws Fail
+ @return: return_code, output
"""
- return _call(command, verbose, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, output_file, sudo)
+ return _call(command, verbose, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo, on_new_line)
def call(command, verbose=False, logoutput=False,
- cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, output_file=None, sudo=False):
+ cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
"""
- Execute the process despite failures.
- @return: return_code, stdout
+ Execute the shell command despite failures.
+ @return: return_code, output
"""
- return _call(command, verbose, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, output_file, sudo)
+ return _call(command, verbose, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo, on_new_line)
+
+def non_blocking_call(command, verbose=False,
+ cwd=None, env=None, preexec_fn=None, user=None, timeout=None, path=None, sudo=False):
+ """
+ Execute the shell command and don't wait until it's completion
+
+ @return: process object -- Popen instance
+ (use proc.stdout.readline to read output in cycle, don't foget to proc.stdout.close(),
+ to get return code use proc.wait() and after that proc.returncode)
+ """
+ return _call(command, verbose, False, True, cwd, env, preexec_fn, user, False, timeout, path, sudo, None)
def _call(command, verbose=False, logoutput=False, throw_on_failure=True,
- cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, output_file=None, sudo=False):
+ cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
"""
Execute shell command
@@ -64,14 +79,12 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True,
or string of the command to execute
@param logoutput: boolean, whether command output should be logged of not
@param throw_on_failure: if true, when return code is not zero exception is thrown
-
- @return: return_code, stdout
"""
command_alias = string_cmd_from_args_list(command) if isinstance(command, (list, tuple)) else command
# Append current PATH to env['PATH']
- env = add_current_path_to_env(env)
+ env = _add_current_path_to_env(env)
# Append path to env['PATH']
if path:
path = os.pathsep.join(path) if isinstance(path, (list, tuple)) else path
@@ -88,7 +101,7 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True,
command = string_cmd_from_args_list(command)
# replace placeholder from as_sudo / as_user if present
- env_str = get_environment_str(env)
+ env_str = _get_environment_str(env)
for placeholder, replacement in PLACEHOLDERS_TO_STR.iteritems():
command = command.replace(placeholder, replacement.format(env_str=env_str))
@@ -100,16 +113,37 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True,
proc = subprocess.Popen(subprocess_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
cwd=cwd, env=env, shell=False,
preexec_fn=preexec_fn)
-
- if not wait_for_finish:
- return None, None
if timeout:
q = Queue()
- t = threading.Timer( timeout, on_timeout, [proc, q] )
+ t = threading.Timer( timeout, _on_timeout, [proc, q] )
t.start()
- out = proc.communicate()[0].strip('\n')
+ if not wait_for_finish:
+ return proc
+
+ # in case logoutput==False, never log.
+ logoutput = logoutput==True and Logger.logger.isEnabledFor(logging.INFO) or logoutput==None and Logger.logger.isEnabledFor(logging.DEBUG)
+ out = ""
+
+ try:
+ for line in iter(proc.stdout.readline, b''):
+ out += line
+
+ try:
+ if on_new_line:
+ on_new_line(line)
+ except Exception, err:
+ err_msg = "Caused by on_new_line function failed with exception for input argument '{0}':\n{1}".format(line, traceback.format_exc())
+ raise Fail(err_msg)
+
+ if logoutput:
+ _print(line)
+ finally:
+ proc.stdout.close()
+
+ proc.wait()
+ out = out.strip('\n')
if timeout:
if q.empty():
@@ -120,9 +154,6 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True,
code = proc.returncode
- if logoutput and out:
- Logger.info(out)
-
if throw_on_failure and code:
err_msg = Logger.filter_text(("Execution of '%s' returned %d. %s") % (command_alias, code, out))
raise Fail(err_msg)
@@ -147,17 +178,26 @@ def as_sudo(command, env=None):
err_msg = Logger.filter_text(("String command '%s' cannot be run as sudo. Please supply the command as a tuple of arguments") % (command))
raise Fail(err_msg)
- env = get_environment_str(add_current_path_to_env(env)) if env else ENV_PLACEHOLDER
+ env = _get_environment_str(_add_current_path_to_env(env)) if env else ENV_PLACEHOLDER
return "/usr/bin/sudo {0} -H -E {1}".format(env, command)
def as_user(command, user, env=None):
if isinstance(command, (list, tuple)):
command = string_cmd_from_args_list(command)
- export_env = "export {0} ; ".format(get_environment_str(add_current_path_to_env(env))) if env else EXPORT_PLACEHOLDER
+ export_env = "export {0} ; ".format(_get_environment_str(_add_current_path_to_env(env))) if env else EXPORT_PLACEHOLDER
return "/usr/bin/sudo su {0} -l -s /bin/bash -c {1}".format(user, quote_bash_args(export_env + command))
-def add_current_path_to_env(env):
+def quote_bash_args(command):
+ if not command:
+ return "''"
+ valid = set(string.ascii_letters + string.digits + '@%_-+=:,./')
+ for char in command:
+ if char not in valid:
+ return "'" + command.replace("'", "'\"'\"'") + "'"
+ return command
+
+def _add_current_path_to_env(env):
result = {} if not env else env
if not 'PATH' in result:
@@ -169,25 +209,20 @@ def add_current_path_to_env(env):
return result
-def get_environment_str(env):
+def _get_environment_str(env):
return reduce(lambda str,x: '{0} {1}={2}'.format(str,x,quote_bash_args(env[x])), env, '')
def string_cmd_from_args_list(command):
return ' '.join(quote_bash_args(x) for x in command)
-def on_timeout(proc, q):
+def _on_timeout(proc, q):
q.put(True)
if proc.poll() == None:
try:
proc.terminate()
except:
pass
-
-def quote_bash_args(command):
- if not command:
- return "''"
- valid = set(string.ascii_letters + string.digits + '@%_-+=:,./')
- for char in command:
- if char not in valid:
- return "'" + command.replace("'", "'\"'\"'") + "'"
- return command
\ No newline at end of file
+
+def _print(line):
+ sys.stdout.write(line)
+ sys.stdout.flush()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py b/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py
index 8b61331..b4b0b52 100644
--- a/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py
+++ b/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py
@@ -30,7 +30,7 @@ class ExecuteHadoop(Resource):
tries = ResourceArgument(default=1)
try_sleep = ResourceArgument(default=0) # seconds
user = ResourceArgument()
- logoutput = BooleanArgument(default=False)
+ logoutput = ResourceArgument()
principal = ResourceArgument(default=lambda obj: obj.user)
bin_dir = ResourceArgument(default=[]) # appended to $PATH
environment = ResourceArgument(default={})
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
index 88af1f9..529ca44 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
@@ -20,7 +20,7 @@ limitations under the License.
from resource_management.core.logger import Logger
from resource_management.core.exceptions import Fail
from resource_management.core.resources.system import Execute
-from resource_management.core.shell import call
+from resource_management.core import shell
from resource_management.libraries.functions import format
from resource_management.libraries.functions.decorator import retry
@@ -99,7 +99,7 @@ def _check_datanode_startup():
try:
# 'su - hdfs -c "hdfs dfsadmin -report -live"'
command = 'hdfs dfsadmin -report -live'
- return_code, hdfs_output = call(command, user=params.hdfs_user)
+ return_code, hdfs_output = shell.call(command, user=params.hdfs_user)
except:
raise Fail('Unable to determine if the DataNode has started after upgrade.')
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
index 3eb9cc2..e8dfe16 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
@@ -187,16 +187,9 @@ class NameNode(Script):
_print("Executing command %s\n" % command)
parser = hdfs_rebalance.HdfsParser()
- proc = subprocess.Popen(
- command,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- shell=True,
- close_fds=True,
- cwd=basedir
- )
- for line in iter(proc.stdout.readline, ''):
- _print('[balancer] %s %s' % (str(datetime.now()), line ))
+
+ def handle_new_line(line):
+ _print('[balancer] %s' % (line))
pl = parser.parseLine(line)
if pl:
res = pl.toJson()
@@ -204,14 +197,14 @@ class NameNode(Script):
self.put_structured_out(res)
elif parser.state == 'PROCESS_FINISED' :
- _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' ))
+ _print('[balancer] %s' % ('Process is finished' ))
self.put_structured_out({'completePercent' : 1})
- break
+ return
- proc.stdout.close()
- proc.wait()
- if proc.returncode != None and proc.returncode != 0:
- raise Fail('Hdfs rebalance process exited with error. See the log output')
+ Execute(command,
+ on_new_line = handle_new_line,
+ logoutput = False,
+ )
def _print(line):
sys.stdout.write(line)
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
index 6f421b6..b9bd273 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
@@ -98,7 +98,7 @@ def kill_zkfc(zkfc_user):
if code == 0:
Logger.debug("ZKFC is running and will be killed to initiate namenode failover.")
kill_command = format("{check_process} && kill -9 `cat {zkfc_pid_file}` > /dev/null 2>&1")
- checked_call(kill_command, verbose=True)
+ Execute(kill_command)
def get_service_pid_file(name, user):
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py
index 653d4bd..37fe7f1 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py
@@ -21,7 +21,7 @@ import re
from resource_management.core.logger import Logger
from resource_management.core.exceptions import Fail
from resource_management.core.resources.system import Execute
-from resource_management.core.shell import call
+from resource_management.core import shell
from resource_management.libraries.functions import format
@@ -65,7 +65,7 @@ def _get_current_hiveserver_version():
try:
command = 'hdp-select status hive-server2'
- return_code, hdp_output = call(command, user=params.hive_user)
+ return_code, hdp_output = shell.call(command, user=params.hive_user)
except Exception, e:
Logger.error(str(e))
raise Fail('Unable to execute hdp-select command to retrieve the hiveserver2 version.')
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
index c2b4bc1..ac7014d 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
@@ -17,49 +17,54 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
-from __future__ import print_function
from resource_management import *
-import sys,subprocess,os
class ServiceCheck(Script):
- def service_check(self, env):
- import params
- env.set_params(params)
- kafka_config=self.read_kafka_config(params.conf_dir)
- self.set_env(params.conf_dir)
- create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"."
- create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists."
- print("Running kafka create topic command", file=sys.stdout)
- create_topic_cmd = [params.kafka_home+'/bin/kafka-topics.sh', '--zookeeper '+kafka_config['zookeeper.connect'],
- '--create --topic ambari_kafka_service_check', '--partitions 1 --replication-factor 1']
- print(" ".join(create_topic_cmd), file=sys.stdout)
- create_topic_process = subprocess.Popen(create_topic_cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
- out, err = create_topic_process.communicate()
- if out.find(create_topic_cmd_created_output) != -1:
- print(out, file=sys.stdout)
- sys.exit(0)
- elif out.find(create_topic_cmd_exists_output) != -1:
- print("Topic ambari_kafka_service_check exists", file=sys.stdout)
- sys.exit(0)
- else:
- print(out, file=sys.stderr)
- sys.exit(1)
+ def service_check(self, env):
+ import params
+ env.set_params(params)
+
+ kafka_config = self.read_kafka_config()
+ environment = self.get_env()
+
+ create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"."
+ create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists."
+
+ print "Running kafka create topic command"
+ create_topic_cmd = (params.kafka_home+'/bin/kafka-topics.sh', '--zookeeper '+kafka_config['zookeeper.connect'],
+ '--create --topic ambari_kafka_service_check', '--partitions 1 --replication-factor 1')
+
+ code, out = shell.checked_call(create_topic_cmd,
+ verbose=True, env=environment)
- def read_kafka_config(self,kafka_conf_dir):
- conf_file = open(kafka_conf_dir+"/server.properties","r")
- kafka_config = {}
- for line in conf_file:
- key,value = line.split("=")
- kafka_config[key] = value.replace("\n","")
- return kafka_config
+ if out.find(create_topic_cmd_created_output) != -1:
+ print out
+ elif out.find(create_topic_cmd_exists_output) != -1:
+ print "Topic ambari_kafka_service_check exists"
+ else:
+ raise Fail(out)
- def set_env(self, kafka_conf_dir):
- command = ['bash', '-c', 'source '+kafka_conf_dir+'/kafka-env.sh && env']
- proc = subprocess.Popen(command, stdout = subprocess.PIPE)
- for line in proc.stdout:
- (key, _, value) = line.partition("=")
- os.environ[key] = value.replace("\n","")
- proc.communicate()
+ def read_kafka_config(self):
+ import params
+
+ kafka_config = {}
+ with open(params.conf_dir+"/server.properties","r") as conf_file:
+ for line in conf_file:
+ key,value = line.split("=")
+ kafka_config[key] = value.replace("\n","")
+
+ return kafka_config
+
+ def get_env(self):
+ import params
+ code, out = shell.checked_call(format('source {conf_dir}/kafka-env.sh && env'))
+
+ environment = {}
+ for line in out.split("\n"):
+ (key, _, value) = line.partition("=")
+ environment[key] = value.replace("\n","")
+
+ return environment
if __name__ == "__main__":
ServiceCheck().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py
index e82c320..54e0fae 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py
@@ -22,7 +22,7 @@ import subprocess
from resource_management.core.logger import Logger
from resource_management.core.exceptions import Fail
from resource_management.core.resources.system import Execute
-from resource_management.core.shell import call
+from resource_management.core import shell
from resource_management.libraries.functions.decorator import retry
@@ -56,7 +56,7 @@ def _check_nodemanager_startup():
try:
# 'su - yarn -c "yarn node -status c6401.ambari.apache.org:45454"'
- return_code, yarn_output = call(command, user=params.hdfs_user)
+ return_code, yarn_output = shell.call(command, user=params.hdfs_user)
except:
raise Fail('Unable to determine if the NodeManager has started after upgrade.')
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py
index d16a749..42e195c 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py
@@ -312,7 +312,7 @@ class KerberosScript(Script):
# If a test keytab file is available, simply use it
if (keytab_file is not None) and (os.path.isfile(keytab_file)):
command = 'kinit -k -t %s %s' % (keytab_file, principal)
- shell.checked_call(command)
+ Execute(command)
return shell.checked_call('kdestroy')
# If base64-encoded test keytab data is available; then decode it, write it to a temporary file
@@ -324,7 +324,7 @@ class KerberosScript(Script):
try:
command = 'kinit -k -t %s %s' % (test_keytab_file, principal)
- shell.checked_call(command)
+ Execute(command)
return shell.checked_call('kdestroy')
except:
raise
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
index 1c032ff..2fc8549 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
@@ -20,6 +20,7 @@ limitations under the License.
from stacks.utils.RMFTestCase import *
import json
from mock.mock import MagicMock, patch
+from resource_management.core import shell
from resource_management.core.exceptions import Fail
class TestDatanode(RMFTestCase):
@@ -441,7 +442,7 @@ class TestDatanode(RMFTestCase):
@patch('time.sleep')
- @patch("subprocess.Popen")
+ @patch.object(shell, "call")
def test_post_rolling_restart(self, process_mock, time_mock):
process_output = """
Live datanodes (2):
@@ -464,10 +465,7 @@ class TestDatanode(RMFTestCase):
Last contact: Fri Dec 12 20:47:21 UTC 2014
"""
- process = MagicMock()
- process.communicate.return_value = [process_output]
- process.returncode = 0
- process_mock.return_value = process
+ process_mock.return_value = (0, process_output)
self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
classname = "DataNode",
@@ -482,12 +480,9 @@ class TestDatanode(RMFTestCase):
@patch('time.sleep')
- @patch("subprocess.Popen")
+ @patch.object(shell, "call")
def test_post_rolling_restart_datanode_not_ready(self, process_mock, time_mock):
- process = MagicMock()
- process.communicate.return_value = ['There are no DataNodes here!']
- process.returncode = 0
- process_mock.return_value = process
+ process_mock.return_value = (0, 'There are no DataNodes here!')
try:
self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
@@ -504,12 +499,9 @@ class TestDatanode(RMFTestCase):
@patch('time.sleep')
- @patch("subprocess.Popen")
+ @patch.object(shell, "call")
def test_post_rolling_restart_bad_returncode(self, process_mock, time_mock):
- process = MagicMock()
- process.communicate.return_value = ['some']
- process.returncode = 999
- process_mock.return_value = process
+ process_mock.return_value = (0, 'some')
try:
self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
index 78a2f72..3981e33 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
@@ -88,6 +88,7 @@ class TestNamenode(RMFTestCase):
environment = {'HADOOP_LIBEXEC_DIR': '/usr/lib/hadoop/libexec'},
not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid` >/dev/null 2>&1',
)
+ self.printResources()
self.assertResourceCalled('Execute', 'hdfs --config /etc/hadoop/conf dfsadmin -safemode leave',
path = ['/usr/bin'],
user = 'hdfs',
@@ -776,26 +777,6 @@ class TestNamenode(RMFTestCase):
@patch("resource_management.libraries.script.Script.put_structured_out")
def test_rebalance_hdfs(self, pso):
- Popen_Mock.return_value = 1
- with patch("subprocess.Popen", new_callable=Popen_Mock):
- ll = subprocess.Popen()
- self.assertTrue(isinstance(ll.stdout.readline(),str))
- try:
- self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py",
- classname = "NameNode",
- command = "rebalancehdfs",
- config_file = "rebalancehdfs_default.json",
- hdp_stack_version = self.STACK_VERSION,
- target = RMFTestCase.TARGET_COMMON_SERVICES
- )
- self.fail("Exception was not thrown")
- except resource_management.core.exceptions.Fail:
- pass ##expected
-
- pso.reset_mock()
- Popen_Mock.return_value = 0
- ll = subprocess.Popen()
- self.assertTrue(isinstance(ll.stdout.readline(),str))
self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py",
classname = "NameNode",
command = "rebalancehdfs",
@@ -803,7 +784,11 @@ class TestNamenode(RMFTestCase):
hdp_stack_version = self.STACK_VERSION,
target = RMFTestCase.TARGET_COMMON_SERVICES
)
- self.assertEqual(pso.call_count, 2, "Output was not parsed properly")
+ self.assertResourceCalled('Execute', "/usr/bin/sudo su hdfs -l -s /bin/bash -c 'export PATH=/bin:/usr/bin ; hdfs --config /etc/hadoop/conf balancer -threshold -1'",
+ logoutput = False,
+ on_new_line = FunctionMock('handle_new_line'),
+ )
+ self.assertNoMoreResources()
class Popen_Mock:
return_value = 1
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py b/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py
index 8046313..0adb266 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py
@@ -536,25 +536,15 @@ class TestHiveServer(RMFTestCase):
@patch("hive_server.HiveServer.pre_rolling_restart")
@patch("hive_server.HiveServer.start")
- @patch("subprocess.Popen")
- def test_stop_during_upgrade(self, process_mock, hive_server_start_mock,
+ @patch.object(shell, "call", new=MagicMock(return_value=(0,"hive-server2 - 2.2.0.0-2041")))
+ def test_stop_during_upgrade(self, hive_server_start_mock,
hive_server_pre_rolling_mock):
-
- process_output = 'hive-server2 - 2.2.0.0-2041'
-
- process = MagicMock()
- process.communicate.return_value = [process_output]
- process.returncode = 0
- process_mock.return_value = process
-
+
self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
classname = "HiveServer", command = "restart", config_file = "hive-upgrade.json",
hdp_stack_version = self.UPGRADE_STACK_VERSION,
target = RMFTestCase.TARGET_COMMON_SERVICES )
- self.assertTrue(process_mock.called)
- self.assertEqual(process_mock.call_count,2)
-
self.assertResourceCalled('Execute', 'hive --service hiveserver2 --deregister 2.2.0.0-2041',
path=['/bin:/usr/hdp/current/hive-server2/bin:/usr/hdp/current/hadoop-client/bin'],
tries=1, user='hive')
@@ -564,17 +554,9 @@ class TestHiveServer(RMFTestCase):
@patch("hive_server.HiveServer.pre_rolling_restart")
@patch("hive_server.HiveServer.start")
- @patch("subprocess.Popen")
- def test_stop_during_upgrade_bad_hive_version(self, process_mock, hive_server_start_mock,
+ @patch.object(shell, "call", new=MagicMock(return_value=(0,"BAD VERSION")))
+ def test_stop_during_upgrade_bad_hive_version(self, hive_server_start_mock,
hive_server_pre_rolling_mock):
-
- process_output = 'BAD VERSION'
-
- process = MagicMock()
- process.communicate.return_value = [process_output]
- process.returncode = 0
- process_mock.return_value = process
-
try:
self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
classname = "HiveServer", command = "restart", config_file = "hive-upgrade.json",
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
index d4229dc..bc9b831 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
@@ -20,6 +20,7 @@ limitations under the License.
from mock.mock import MagicMock, call, patch
from stacks.utils.RMFTestCase import *
from resource_management.core.exceptions import Fail
+from resource_management.core import shell
import os
origin_exists = os.path.exists
@@ -559,16 +560,13 @@ class TestNodeManager(RMFTestCase):
)
@patch('time.sleep')
- @patch("subprocess.Popen")
+ @patch.object(shell, "call")
def test_post_rolling_restart(self, process_mock, time_mock):
process_output = """
c6401.ambari.apache.org:45454 RUNNING c6401.ambari.apache.org:8042 0
"""
- process = MagicMock()
- process.communicate.return_value = [process_output]
- process.returncode = 0
- process_mock.return_value = process
+ process_mock.return_value = (0, process_output)
self.executeScript("2.0.6/services/YARN/package/scripts/nodemanager.py",
classname="Nodemanager", command = "post_rolling_restart", config_file="default.json")
@@ -578,16 +576,13 @@ class TestNodeManager(RMFTestCase):
@patch('time.sleep')
- @patch("subprocess.Popen")
+ @patch.object(shell, "call")
def test_post_rolling_restart_nodemanager_not_ready(self, process_mock, time_mock):
process_output = """
c9999.ambari.apache.org:45454 RUNNING c9999.ambari.apache.org:8042 0
"""
- process = MagicMock()
- process.communicate.return_value = [process_output]
- process.returncode = 0
- process_mock.return_value = process
+ process_mock.return_value = (0, process_output)
try:
self.executeScript("2.0.6/services/YARN/package/scripts/nodemanager.py",
@@ -599,16 +594,13 @@ class TestNodeManager(RMFTestCase):
@patch('time.sleep')
- @patch("subprocess.Popen")
+ @patch.object(shell, "call")
def test_post_rolling_restart_nodemanager_not_ready(self, process_mock, time_mock):
process_output = """
c6401.ambari.apache.org:45454 RUNNING c6401.ambari.apache.org:8042 0
"""
- process = MagicMock()
- process.communicate.return_value = [process_output]
- process.returncode = 999
- process_mock.return_value = process
+ process_mock.return_value = (999, process_output)
try:
self.executeScript("2.0.6/services/YARN/package/scripts/nodemanager.py",
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/utils/RMFTestCase.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/utils/RMFTestCase.py b/ambari-server/src/test/python/stacks/utils/RMFTestCase.py
index 63bcdb8..b8e819d 100644
--- a/ambari-server/src/test/python/stacks/utils/RMFTestCase.py
+++ b/ambari-server/src/test/python/stacks/utils/RMFTestCase.py
@@ -17,7 +17,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
-__all__ = ["RMFTestCase", "Template", "StaticFile", "InlineTemplate", "UnknownConfigurationMock"]
+__all__ = ["RMFTestCase", "Template", "StaticFile", "InlineTemplate", "UnknownConfigurationMock", "FunctionMock"]
from unittest import TestCase
import json
@@ -172,6 +172,8 @@ class RMFTestCase(TestCase):
val = oct(v)
elif isinstance( v, UnknownConfiguration):
val = "UnknownConfigurationMock()"
+ elif hasattr(v, '__call__') and hasattr(v, '__name__'):
+ val = "FunctionMock('{0}')".format(v.__name__)
else:
val = self._ppformat(v)
# If value is multiline, format it
@@ -196,7 +198,7 @@ class RMFTestCase(TestCase):
print(self.reindent("self.assertNoMoreResources()", intendation))
def assertResourceCalled(self, resource_type, name, **kwargs):
- with patch.object(UnknownConfiguration, '__getattr__', return_value=lambda: "UnknownConfiguration()"):
+ with patch.object(UnknownConfiguration, '__getattr__', return_value=lambda: "UnknownConfiguration()"):
self.assertNotEqual(len(RMFTestCase.env.resource_list), 0, "There was no more resources executed!")
resource = RMFTestCase.env.resource_list.pop(0)
@@ -240,4 +242,15 @@ class UnknownConfigurationMock():
def __repr__(self):
return "UnknownConfigurationMock()"
+
+class FunctionMock():
+ def __init__(self, name):
+ self.name = name
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __eq__(self, other):
+ return hasattr(other, '__call__') and hasattr(other, '__name__') and self.name == other.__name__
+