You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2015/06/10 16:54:41 UTC
[2/2] ambari git commit: AMBARI-11838. HdfsResource rearly fails on
heavy loaded machines (aonishuk)
AMBARI-11838. HdfsResource rearly fails on heavy loaded machines (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fa694609
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fa694609
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fa694609
Branch: refs/heads/branch-2.1
Commit: fa694609375059c3856ed473078bf30652992d36
Parents: 49c7c93
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Wed Jun 10 17:54:31 2015 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Wed Jun 10 17:54:31 2015 +0300
----------------------------------------------------------------------
.../resource_management/TestExecuteResource.py | 81 +++++---
.../resource_management/TestGroupResource.py | 25 ++-
.../resource_management/TestUserResource.py | 49 ++---
.../resource_management/core/exceptions.py | 2 +-
.../core/providers/system.py | 37 +---
.../core/resources/system.py | 10 +
.../python/resource_management/core/shell.py | 187 +++++++++++++------
.../python/resource_management/core/sudo.py | 20 +-
.../libraries/providers/hdfs_resource.py | 11 +-
.../libraries/script/config_dictionary.py | 16 +-
.../HDFS/2.1.0.2.0/package/scripts/namenode.py | 5 +-
.../custom_actions/TestInstallPackages.py | 10 +-
12 files changed, 291 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestExecuteResource.py b/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
index 59d7e4c..04244e9 100644
--- a/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
@@ -30,18 +30,23 @@ import os
from resource_management import Fail
import grp
import pwd
+import select
@patch.object(System, "os_family", new='redhat')
class TestExecuteResource(TestCase):
+ @patch.object(os, "read")
+ @patch.object(select, "select")
@patch.object(logging.Logger, "info")
@patch.object(subprocess, "Popen")
- def test_attribute_logoutput(self, popen_mock, info_mock):
+ def test_attribute_logoutput(self, popen_mock, info_mock, select_mock, os_read_mock):
subproc_mock = MagicMock()
+ subproc_mock.wait.return_value = MagicMock()
+ subproc_mock.stdout = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
- subproc_mock.communicate.side_effect = [["1"], ["2"]]
popen_mock.return_value = subproc_mock
+ select_mock.return_value = ([subproc_mock.stdout], None, None)
+ os_read_mock.return_value = None
with Environment("/") as env:
Execute('echo "1"',
@@ -82,14 +87,18 @@ class TestExecuteResource(TestCase):
exists_mock.assert_called_with("/must/be/created")
self.assertEqual(subproc_mock.call_count, 0)
+ @patch.object(os, "read")
+ @patch.object(select, "select")
@patch.object(subprocess, "Popen")
- def test_attribute_path(self, popen_mock):
+ def test_attribute_path(self, popen_mock, select_mock, os_read_mock):
subproc_mock = MagicMock()
+ subproc_mock.wait.return_value = MagicMock()
+ subproc_mock.stdout = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
- subproc_mock.communicate.side_effect = [["1"]]
popen_mock.return_value = subproc_mock
-
+ select_mock.return_value = ([subproc_mock.stdout], None, None)
+ os_read_mock.return_value = None
+
with Environment("/") as env:
execute_resource = Execute('echo "1"',
path=["/test/one", "test/two"]
@@ -97,17 +106,23 @@ class TestExecuteResource(TestCase):
expected_command = ['/bin/bash', '--login', '--noprofile', '-c', 'echo "1"']
self.assertEqual(popen_mock.call_args_list[0][0][0], expected_command)
+ @patch.object(os, "read")
+ @patch.object(select, "select")
@patch('time.sleep')
@patch.object(subprocess, "Popen")
- def test_attribute_try_sleep_tries(self, popen_mock, time_mock):
+ def test_attribute_try_sleep_tries(self, popen_mock, time_mock, select_mock, os_read_mock):
expected_call = "call('Retrying after %d seconds. Reason: %s', 1, 'Fail')"
-
+
subproc_mock_one = MagicMock()
subproc_mock_one.returncode = 1
+ subproc_mock_one.stdout = MagicMock()
subproc_mock_zero = MagicMock()
+ subproc_mock_zero.stdout = MagicMock()
subproc_mock_zero.returncode = 0
#subproc_mock.stdout.readline = MagicMock(side_effect = [Fail("Fail"), "OK"])
popen_mock.side_effect = [subproc_mock_one, subproc_mock_zero]
+ select_mock.side_effect = [([subproc_mock_one.stdout], None, None),([subproc_mock_zero.stdout], None, None)]
+ os_read_mock.return_value = None
with Environment("/") as env:
Execute('echo "1"',
@@ -133,15 +148,19 @@ class TestExecuteResource(TestCase):
except Fail as e:
pass
+ @patch.object(os, "read")
+ @patch.object(select, "select")
@patch.object(subprocess, "Popen")
- def test_attribute_environment(self, popen_mock):
+ def test_attribute_environment(self, popen_mock, select_mock, os_read_mock):
expected_dict = {"JAVA_HOME": "/test/java/home"}
subproc_mock = MagicMock()
+ subproc_mock.wait.return_value = MagicMock()
+ subproc_mock.stdout = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
- subproc_mock.communicate.side_effect = [["1"]]
popen_mock.return_value = subproc_mock
+ select_mock.return_value = ([subproc_mock.stdout], None, None)
+ os_read_mock.return_value = None
with Environment("/") as env:
Execute('echo "1"',
@@ -151,15 +170,19 @@ class TestExecuteResource(TestCase):
self.assertEqual(popen_mock.call_args_list[0][1]["env"], expected_dict)
pass
+ @patch.object(os, "read")
+ @patch.object(select, "select")
@patch.object(subprocess, "Popen")
- def test_attribute_environment_non_root(self, popen_mock):
+ def test_attribute_environment_non_root(self, popen_mock, select_mock, os_read_mock):
expected_user = 'test_user'
subproc_mock = MagicMock()
+ subproc_mock.wait.return_value = MagicMock()
+ subproc_mock.stdout = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
- subproc_mock.communicate.side_effect = [["1"]]
popen_mock.return_value = subproc_mock
+ select_mock.return_value = ([subproc_mock.stdout], None, None)
+ os_read_mock.return_value = None
with Environment("/") as env:
execute_resource = Execute('echo "1"',
@@ -173,15 +196,19 @@ class TestExecuteResource(TestCase):
self.assertEqual(popen_mock.call_args_list[0][0][0], expected_command)
+ @patch.object(os, "read")
+ @patch.object(select, "select")
@patch.object(subprocess, "Popen")
- def test_attribute_cwd(self, popen_mock):
+ def test_attribute_cwd(self, popen_mock, select_mock, os_read_mock):
expected_cwd = "/test/work/directory"
subproc_mock = MagicMock()
+ subproc_mock.wait.return_value = MagicMock()
+ subproc_mock.stdout = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
- subproc_mock.communicate.side_effect = [["1"]]
popen_mock.return_value = subproc_mock
+ select_mock.return_value = ([subproc_mock.stdout], None, None)
+ os_read_mock.return_value = None
with Environment("/") as env:
Execute('echo "1"',
@@ -190,8 +217,10 @@ class TestExecuteResource(TestCase):
self.assertEqual(popen_mock.call_args_list[0][1]["cwd"], expected_cwd)
+ @patch.object(os, "read")
+ @patch.object(select, "select")
@patch.object(subprocess, "Popen")
- def test_attribute_command_escaping(self, popen_mock):
+ def test_attribute_command_escaping(self, popen_mock, select_mock, os_read_mock):
expected_command0 = "arg1 arg2 'quoted arg'"
expected_command1 = "arg1 arg2 'command \"arg\"'"
expected_command2 = 'arg1 arg2 \'command \'"\'"\'arg\'"\'"\'\''
@@ -200,9 +229,12 @@ class TestExecuteResource(TestCase):
expected_command5 = "arg1 arg2 '`ls /root`'"
subproc_mock = MagicMock()
+ subproc_mock.wait.return_value = MagicMock()
+ subproc_mock.stdout = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
+ select_mock.return_value = ([subproc_mock.stdout], None, None)
+ os_read_mock.return_value = None
with Environment("/") as env:
Execute(('arg1', 'arg2', 'quoted arg'),
@@ -225,14 +257,19 @@ class TestExecuteResource(TestCase):
self.assertEqual(popen_mock.call_args_list[4][0][0][4], expected_command4)
self.assertEqual(popen_mock.call_args_list[5][0][0][4], expected_command5)
+ @patch.object(os, "read")
+ @patch.object(select, "select")
@patch.object(subprocess, "Popen")
- def test_attribute_command_one_line(self, popen_mock):
+ def test_attribute_command_one_line(self, popen_mock, select_mock, os_read_mock):
expected_command = "rm -rf /somedir"
subproc_mock = MagicMock()
+ subproc_mock.wait.return_value = MagicMock()
+ subproc_mock.stdout = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
popen_mock.return_value = subproc_mock
+ select_mock.return_value = ([subproc_mock.stdout], None, None)
+ os_read_mock.return_value = None
with Environment("/") as env:
Execute(expected_command)
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-agent/src/test/python/resource_management/TestGroupResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestGroupResource.py b/ambari-agent/src/test/python/resource_management/TestGroupResource.py
index 65d8499..1d52a90 100644
--- a/ambari-agent/src/test/python/resource_management/TestGroupResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestGroupResource.py
@@ -26,8 +26,13 @@ import subprocess
import grp
import os
import pty
+import select
+subproc_stdout = MagicMock()
+
+@patch.object(os, "read", new=MagicMock(return_value=None))
+@patch.object(select, "select", new=MagicMock(return_value=([subproc_stdout], None, None)))
@patch.object(System, "os_family", new = 'redhat')
@patch.object(os, "environ", new = {'PATH':'/bin'})
@patch.object(pty, "openpty", new = MagicMock(return_value=(1,5)))
@@ -39,7 +44,7 @@ class TestGroupResource(TestCase):
def test_action_create_nonexistent(self, popen_mock, getgrnam_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getgrnam_mock.side_effect = KeyError()
@@ -51,7 +56,7 @@ class TestGroupResource(TestCase):
self.assertEqual(popen_mock.call_count, 1)
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E groupadd -p secure hadoop"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E groupadd -p secure hadoop"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
getgrnam_mock.assert_called_with('hadoop')
@@ -60,7 +65,7 @@ class TestGroupResource(TestCase):
def test_action_create_existent(self, popen_mock, getgrnam_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getgrnam_mock.return_value = _get_group()
@@ -73,7 +78,7 @@ class TestGroupResource(TestCase):
self.assertEqual(popen_mock.call_count, 1)
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E groupmod -p secure -g 2 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E groupmod -p secure -g 2 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
getgrnam_mock.assert_called_with('mapred')
@@ -82,7 +87,7 @@ class TestGroupResource(TestCase):
def test_action_create_fail(self, popen_mock, getgrnam_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 1
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getgrnam_mock.return_value = _get_group()
@@ -98,7 +103,7 @@ class TestGroupResource(TestCase):
except Fail:
pass
self.assertEqual(popen_mock.call_count, 1)
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E groupmod -p secure -g 2 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E groupmod -p secure -g 2 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
getgrnam_mock.assert_called_with('mapred')
@@ -108,7 +113,7 @@ class TestGroupResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getgrnam_mock.return_value = _get_group()
@@ -119,7 +124,7 @@ class TestGroupResource(TestCase):
self.assertEqual(popen_mock.call_count, 1)
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh PATH=/bin -H -E groupdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh PATH=/bin -H -E groupdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
getgrnam_mock.assert_called_with('mapred')
@@ -129,7 +134,7 @@ class TestGroupResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 1
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getgrnam_mock.return_value = _get_group()
@@ -144,7 +149,7 @@ class TestGroupResource(TestCase):
pass
self.assertEqual(popen_mock.call_count, 1)
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh PATH=/bin -H -E groupdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh PATH=/bin -H -E groupdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
getgrnam_mock.assert_called_with('mapred')
def _get_group():
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-agent/src/test/python/resource_management/TestUserResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestUserResource.py b/ambari-agent/src/test/python/resource_management/TestUserResource.py
index 5c8b6df..1598823 100644
--- a/ambari-agent/src/test/python/resource_management/TestUserResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestUserResource.py
@@ -26,7 +26,12 @@ import pwd
import subprocess
import os
import pty
+import select
+subproc_stdout = MagicMock()
+
+@patch.object(os, "read", new=MagicMock(return_value=None))
+@patch.object(select, "select", new=MagicMock(return_value=([subproc_stdout], None, None)))
@patch.object(System, "os_family", new = 'redhat')
@patch.object(os, "environ", new = {'PATH':'/bin'})
@patch.object(pty, "openpty", new = MagicMock(return_value=(1,5)))
@@ -38,13 +43,13 @@ class TestUserResource(TestCase):
def test_action_create_nonexistent(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = None
with Environment('/') as env:
user = User("mapred", action = "create", shell = "/bin/bash")
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E useradd -m -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, env={'PATH': '/bin'}, bufsize=1, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E useradd -m -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
self.assertEqual(popen_mock.call_count, 1)
@patch.object(subprocess, "Popen")
@@ -52,14 +57,14 @@ class TestUserResource(TestCase):
def test_action_create_existent(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = _get_user_entity()
with Environment('/') as env:
user = User("mapred", action = "create", shell = "/bin/bash")
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
self.assertEqual(popen_mock.call_count, 1)
@patch.object(subprocess, "Popen")
@@ -67,14 +72,14 @@ class TestUserResource(TestCase):
def test_action_delete(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = 1
with Environment('/') as env:
user = User("mapred", action = "remove", shell = "/bin/bash")
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh PATH=/bin -H -E userdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh PATH=/bin -H -E userdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
self.assertEqual(popen_mock.call_count, 1)
@patch.object(subprocess, "Popen")
@@ -82,7 +87,7 @@ class TestUserResource(TestCase):
def test_attribute_comment(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = _get_user_entity()
@@ -90,7 +95,7 @@ class TestUserResource(TestCase):
user = User("mapred", action = "create", comment = "testComment",
shell = "/bin/bash")
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -c testComment -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -c testComment -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
self.assertEqual(popen_mock.call_count, 1)
@patch.object(subprocess, "Popen")
@@ -98,7 +103,7 @@ class TestUserResource(TestCase):
def test_attribute_home(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = _get_user_entity()
@@ -106,7 +111,7 @@ class TestUserResource(TestCase):
user = User("mapred", action = "create", home = "/test/home",
shell = "/bin/bash")
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash -d /test/home mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash -d /test/home mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
self.assertEqual(popen_mock.call_count, 1)
@patch.object(subprocess, "Popen")
@@ -114,7 +119,7 @@ class TestUserResource(TestCase):
def test_attribute_password(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = _get_user_entity()
@@ -122,7 +127,7 @@ class TestUserResource(TestCase):
user = User("mapred", action = "create", password = "secure",
shell = "/bin/bash")
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash -p secure mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash -p secure mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
self.assertEqual(popen_mock.call_count, 1)
@patch.object(subprocess, "Popen")
@@ -130,14 +135,14 @@ class TestUserResource(TestCase):
def test_attribute_shell(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = _get_user_entity()
with Environment('/') as env:
user = User("mapred", action = "create", shell = "/bin/sh")
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/sh mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/sh mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
self.assertEqual(popen_mock.call_count, 1)
@patch.object(subprocess, "Popen")
@@ -145,14 +150,14 @@ class TestUserResource(TestCase):
def test_attribute_uid(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = _get_user_entity()
with Environment('/') as env:
user = User("mapred", action = "create", uid = "1", shell = "/bin/bash")
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash -u 1 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash -u 1 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
self.assertEqual(popen_mock.call_count, 1)
@patch.object(subprocess, "Popen")
@@ -160,14 +165,14 @@ class TestUserResource(TestCase):
def test_attribute_gid(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = _get_user_entity()
with Environment('/') as env:
user = User("mapred", action = "create", gid = "1", shell = "/bin/bash")
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash -g 1 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash -g 1 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
self.assertEqual(popen_mock.call_count, 1)
@patch('resource_management.core.providers.accounts.UserProvider.user_groups', new_callable=PropertyMock)
@@ -177,7 +182,7 @@ class TestUserResource(TestCase):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
user_groups_mock.return_value = ['hadoop']
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = _get_user_entity()
@@ -185,7 +190,7 @@ class TestUserResource(TestCase):
user = User("mapred", action = "create", groups = ['1','2','3'],
shell = "/bin/bash")
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash -G 1,2,3,hadoop mapred'], shell=False, preexec_fn=None, env={'PATH': '/bin'}, close_fds=True, stdout=5, stderr=-2, bufsize=1, cwd=None)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh PATH=/bin -H -E usermod -s /bin/bash -G 1,2,3,hadoop mapred'], shell=False, preexec_fn=None, env={'PATH': '/bin'}, close_fds=True, stdout=-1, stderr=-2, cwd=None)
self.assertEqual(popen_mock.call_count, 1)
@patch.object(subprocess, "Popen")
@@ -193,13 +198,13 @@ class TestUserResource(TestCase):
def test_missing_shell_argument(self, getpwnam_mock, popen_mock):
subproc_mock = MagicMock()
subproc_mock.returncode = 0
- subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+ subproc_mock.stdout = subproc_stdout
popen_mock.return_value = subproc_mock
getpwnam_mock.return_value = None
with Environment('/') as env:
user = User("mapred", action = "create")
- popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E useradd -m mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+ popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh PATH=/bin -H -E useradd -m mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
self.assertEqual(popen_mock.call_count, 1)
def _get_user_entity():
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/core/exceptions.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/exceptions.py b/ambari-common/src/main/python/resource_management/core/exceptions.py
index a8b2a63..25e7993 100644
--- a/ambari-common/src/main/python/resource_management/core/exceptions.py
+++ b/ambari-common/src/main/python/resource_management/core/exceptions.py
@@ -25,7 +25,7 @@ __all__ = ["Fail", "ExecuteTimeoutException", "InvalidArgument", "ClientComponen
class Fail(Exception):
pass
-class ExecuteTimeoutException(Exception):
+class ExecuteTimeoutException(Fail):
pass
class InvalidArgument(Fail):
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/core/providers/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/system.py b/ambari-common/src/main/python/resource_management/core/providers/system.py
index 3a1b218..a6f940d 100644
--- a/ambari-common/src/main/python/resource_management/core/providers/system.py
+++ b/ambari-common/src/main/python/resource_management/core/providers/system.py
@@ -242,33 +242,16 @@ class ExecuteProvider(Provider):
Logger.info("Skipping %s due to creates" % self.resource)
return
- env = self.resource.environment
-
- for i in range (0, self.resource.tries):
- try:
- shell.checked_call(self.resource.command, logoutput=self.resource.logoutput,
- cwd=self.resource.cwd, env=env,
- preexec_fn=_preexec_fn(self.resource), user=self.resource.user,
- wait_for_finish=self.resource.wait_for_finish,
- timeout=self.resource.timeout,
- path=self.resource.path,
- sudo=self.resource.sudo,
- on_new_line=self.resource.on_new_line)
- break
- except Fail as ex:
- if i == self.resource.tries-1: # last try
- raise ex
- else:
- Logger.info("Retrying after %d seconds. Reason: %s" % (self.resource.try_sleep, str(ex)))
- time.sleep(self.resource.try_sleep)
- except ExecuteTimeoutException:
- err_msg = ("Execution of '%s' was killed due timeout after %d seconds") % (self.resource.command, self.resource.timeout)
-
- if self.resource.on_timeout:
- Logger.info("Executing '%s'. Reason: %s" % (self.resource.on_timeout, err_msg))
- shell.checked_call(self.resource.on_timeout)
- else:
- raise Fail(err_msg)
+ shell.checked_call(self.resource.command, logoutput=self.resource.logoutput,
+ cwd=self.resource.cwd, env=self.resource.environment,
+ preexec_fn=_preexec_fn(self.resource), user=self.resource.user,
+ wait_for_finish=self.resource.wait_for_finish,
+ timeout=self.resource.timeout,on_timeout=self.resource.on_timeout,
+ path=self.resource.path,
+ sudo=self.resource.sudo,
+ on_new_line=self.resource.on_new_line,
+ stdout=self.resource.stdout,stderr=self.resource.stderr,
+ tries=self.resource.tries, try_sleep=self.resource.try_sleep)
class ExecuteScriptProvider(Provider):
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/core/resources/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/system.py b/ambari-common/src/main/python/resource_management/core/resources/system.py
index 1f00da1..f74e258 100644
--- a/ambari-common/src/main/python/resource_management/core/resources/system.py
+++ b/ambari-common/src/main/python/resource_management/core/resources/system.py
@@ -22,6 +22,7 @@ Ambari Agent
__all__ = ["File", "Directory", "Link", "Execute", "ExecuteScript", "Mount"]
+import subprocess
from resource_management.core.base import Resource, ForcedListArgument, ResourceArgument, BooleanArgument
@@ -154,6 +155,15 @@ class Execute(Resource):
command2 = as_sudo(["ls", "/root/example.txt") + " && " + as_sudo(["rm","-f","example.txt"])
"""
sudo = BooleanArgument(default=False)
+ """
+ subprocess.PIPE - enable output gathering
+ None - disable output to gathering, and output to Python out straightly (even if logoutput is False)
+ subprocess.STDOUT - redirect to stdout (not valid as value for stdout agument)
+ {int fd} - redirect to file with descriptor.
+ {string filename} - redirects to a file with name.
+ """
+ stdout = ResourceArgument(default=subprocess.PIPE)
+ stderr = ResourceArgument(default=subprocess.STDOUT)
class ExecuteScript(Resource):
action = ForcedListArgument(default="run")
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/core/shell.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/shell.py b/ambari-common/src/main/python/resource_management/core/shell.py
index 2e5a1b0..183c735 100644
--- a/ambari-common/src/main/python/resource_management/core/shell.py
+++ b/ambari-common/src/main/python/resource_management/core/shell.py
@@ -22,6 +22,8 @@ Ambari Agent
__all__ = ["non_blocking_call", "checked_call", "call", "quote_bash_args", "as_user", "as_sudo"]
+import time
+import copy
import os
import select
import sys
@@ -76,26 +78,34 @@ def log_function_call(function):
return inner
@log_function_call
-def checked_call(command, quiet=False, logoutput=None,
- cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
+def checked_call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
+ cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, on_timeout=None,
+ path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0):
"""
Execute the shell command and throw an exception on failure.
@throws Fail
@return: return_code, output
"""
- return _call(command, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo, on_new_line)
-
+ return _call_wrapper(command, logoutput=logoutput, throw_on_failure=True, stdout=stdout, stderr=stderr,
+ cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=wait_for_finish,
+ on_timeout=on_timeout, timeout=timeout, path=path, sudo=sudo, on_new_line=on_new_line,
+ tries=tries, try_sleep=try_sleep)
+
@log_function_call
-def call(command, quiet=False, logoutput=None,
- cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
+def call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
+ cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, on_timeout=None,
+ path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0):
"""
Execute the shell command despite failures.
@return: return_code, output
"""
- return _call(command, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo, on_new_line)
+ return _call_wrapper(command, logoutput=logoutput, throw_on_failure=False, stdout=stdout, stderr=stderr,
+ cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=wait_for_finish,
+ on_timeout=on_timeout, timeout=timeout, path=path, sudo=sudo, on_new_line=on_new_line,
+ tries=tries, try_sleep=try_sleep)
@log_function_call
-def non_blocking_call(command, quiet=False,
+def non_blocking_call(command, quiet=False, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
cwd=None, env=None, preexec_fn=None, user=None, timeout=None, path=None, sudo=False):
"""
Execute the shell command and don't wait until it's completion
@@ -104,10 +114,49 @@ def non_blocking_call(command, quiet=False,
(use proc.stdout.readline to read output in cycle, don't foget to proc.stdout.close(),
to get return code use proc.wait() and after that proc.returncode)
"""
- return _call(command, False, True, cwd, env, preexec_fn, user, False, timeout, path, sudo, None)
+ return _call_wrapper(command, logoutput=False, throw_on_failure=True, stdout=stdout, stderr=stderr,
+ cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=False,
+ on_timeout=None, timeout=timeout, path=path, sudo=sudo, on_new_line=None,
+ tries=1, try_sleep=0)
+
+def _call_wrapper(command, **kwargs):
+ tries = kwargs['tries']
+ try_sleep = kwargs['try_sleep']
+ timeout = kwargs['timeout']
+ on_timeout = kwargs['on_timeout']
+ throw_on_failure = kwargs['throw_on_failure']
+
+ for i in range (0, tries):
+ is_last_try = (i == tries-1)
-def _call(command, logoutput=None, throw_on_failure=True,
- cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
+ if not is_last_try:
+ kwargs_copy = copy.copy(kwargs)
+ kwargs_copy['throw_on_failure'] = True # we need to know when non-last try fails, to handle retries
+ else:
+ kwargs_copy = kwargs
+
+ try:
+ try:
+ result = _call(command, **kwargs_copy)
+ break
+ except ExecuteTimeoutException as ex:
+ if on_timeout:
+ Logger.info("Executing '%s'. Reason: %s" % (on_timeout, str(ex)))
+ checked_call(on_timeout)
+ else:
+ raise
+ except Fail as ex:
+ if is_last_try: # last try
+ raise
+ else:
+ Logger.info("Retrying after %d seconds. Reason: %s" % (try_sleep, str(ex)))
+ time.sleep(try_sleep)
+
+ return result
+
+def _call(command, logoutput=None, throw_on_failure=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
+ cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, on_timeout=None,
+ path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0):
"""
Execute shell command
@@ -115,8 +164,13 @@ def _call(command, logoutput=None, throw_on_failure=True,
or string of the command to execute
@param logoutput: boolean, whether command output should be logged of not
@param throw_on_failure: if true, when return code is not zero exception is thrown
+ @param stdout,stderr:
+ subprocess.PIPE - enable output to variable
+ subprocess.STDOUT - redirect to stdout
+ None - disable output to variable, and output to Python out straightly (even if logoutput is False)
+ {int fd} - redirect to file with descriptor.
+ {string filename} - redirects to a file with name.
"""
-
command_alias = string_cmd_from_args_list(command) if isinstance(command, (list, tuple)) else command
# Append current PATH to env['PATH']
@@ -125,7 +179,7 @@ def _call(command, logoutput=None, throw_on_failure=True,
if path:
path = os.pathsep.join(path) if isinstance(path, (list, tuple)) else path
env['PATH'] = os.pathsep.join([env['PATH'], path])
-
+
# prepare command cmd
if sudo:
command = as_sudo(command, env=env)
@@ -141,61 +195,82 @@ def _call(command, logoutput=None, throw_on_failure=True,
for placeholder, replacement in PLACEHOLDERS_TO_STR.iteritems():
command = command.replace(placeholder, replacement.format(env_str=env_str))
- import pty
- master_fd, slave_fd = pty.openpty()
-
# --noprofile is used to preserve PATH set for ambari-agent
subprocess_command = ["/bin/bash","--login","--noprofile","-c", command]
- proc = subprocess.Popen(subprocess_command, bufsize=1, stdout=slave_fd, stderr=subprocess.STDOUT,
- cwd=cwd, env=env, shell=False, close_fds=True,
- preexec_fn=preexec_fn)
- if timeout:
- timeout_event = threading.Event()
- t = threading.Timer( timeout, _on_timeout, [proc, timeout_event] )
- t.start()
+ files_to_close = []
+ if isinstance(stdout, (basestring)):
+ stdout = open(stdout, 'wb')
+ files_to_close.append(stdout)
+ if isinstance(stderr, (basestring)):
+ stderr = open(stderr, 'wb')
+ files_to_close.append(stderr)
+
+ try:
+ proc = subprocess.Popen(subprocess_command, stdout=stdout, stderr=stderr,
+ cwd=cwd, env=env, shell=False, close_fds=True,
+ preexec_fn=preexec_fn)
- if not wait_for_finish:
- return proc
+ if timeout:
+ timeout_event = threading.Event()
+ t = threading.Timer( timeout, _on_timeout, [proc, timeout_event] )
+ t.start()
+
+ if not wait_for_finish:
+ return proc
+
+ # in case logoutput==False, never log.
+ logoutput = logoutput==True and Logger.logger.isEnabledFor(logging.INFO) or logoutput==None and Logger.logger.isEnabledFor(logging.DEBUG)
+ read_set = []
- # in case logoutput==False, never log.
- logoutput = logoutput==True and Logger.logger.isEnabledFor(logging.INFO) or logoutput==None and Logger.logger.isEnabledFor(logging.DEBUG)
- out = ""
- read_timeout = .001 # seconds
-
- try:
- while True:
- ready, _, _ = select.select([master_fd], [], [], read_timeout)
- if ready:
- line = os.read(master_fd, 512)
- if not line:
- continue
+ if stdout == subprocess.PIPE:
+ read_set.append(proc.stdout)
+ if stderr == subprocess.PIPE:
+ read_set.append(proc.stderr)
+
+ fd_to_string = {
+ proc.stdout: "",
+ proc.stderr: ""
+ }
+
+ while read_set:
+ ready, _, _ = select.select(read_set, [], [])
+ for out_fd in read_set:
+ if out_fd in ready:
+ line = os.read(out_fd.fileno(), 1024)
- out += line
- try:
- if on_new_line:
- on_new_line(line)
- except Exception, err:
- err_msg = "Caused by on_new_line function failed with exception for input argument '{0}':\n{1}".format(line, traceback.format_exc())
- raise Fail(err_msg)
+ if not line:
+ read_set = copy.copy(read_set)
+ read_set.remove(out_fd)
+ continue
- if logoutput:
- _print(line)
- elif proc.poll() is not None:
- break # proc exited
+ fd_to_string[out_fd] += line
+
+ if on_new_line:
+ try:
+ on_new_line(line, out_fd == proc.stderr)
+ except Exception, err:
+ err_msg = "Caused by on_new_line function failed with exception for input argument '{0}':\n{1}".format(line, traceback.format_exc())
+ raise Fail(err_msg)
+
+ if logoutput:
+ _print(line)
+
+ proc.wait()
finally:
- os.close(slave_fd)
- os.close(master_fd)
-
- proc.wait()
- out = out.strip('\n')
+ for fp in files_to_close:
+ fp.close()
+
+ out = fd_to_string[proc.stdout].strip('\n')
+ err = fd_to_string[proc.stderr].strip('\n')
if timeout:
if not timeout_event.is_set():
t.cancel()
# timeout occurred
else:
- raise ExecuteTimeoutException()
+ err_msg = ("Execution of '%s' was killed due timeout after %d seconds") % (command, timeout)
+ raise ExecuteTimeoutException(err_msg)
code = proc.returncode
@@ -203,6 +278,10 @@ def _call(command, logoutput=None, throw_on_failure=True,
err_msg = Logger.filter_text(("Execution of '%s' returned %d. %s") % (command_alias, code, out))
raise Fail(err_msg)
+ # if separate stderr is enabled (by default it's redirected to out)
+ if stderr == subprocess.PIPE:
+ return code, out, err
+
return code, out
def as_sudo(command, env=None, auto_escape=True):
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/core/sudo.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/sudo.py b/ambari-common/src/main/python/resource_management/core/sudo.py
index f864d64..740eea3 100644
--- a/ambari-common/src/main/python/resource_management/core/sudo.py
+++ b/ambari-common/src/main/python/resource_management/core/sudo.py
@@ -198,25 +198,15 @@ else:
# os.path.isfile
def path_isfile(path):
return (shell.call(["test", "-f", path], sudo=True)[0] == 0)
-
+
# os.stat
def stat(path):
class Stat:
- RETRY_COUNT = 5
def __init__(self, path):
- # Sometimes (on heavy load) stat call returns an empty output with zero return code
- for i in range(0, self.RETRY_COUNT):
- out = shell.checked_call(["stat", "-c", "%u %g %a", path], sudo=True)[1]
- values = out.split(' ')
- if len(values) == 3:
- uid_str, gid_str, mode_str = values
- self.st_uid, self.st_gid, self.st_mode = int(uid_str), int(gid_str), int(mode_str, 8)
- break
- else:
- warning_message = "Can not parse a sudo stat call output: \"{0}\"".format(out)
- Logger.warning(warning_message)
- stat_val = os.stat(path)
- self.st_uid, self.st_gid, self.st_mode = stat_val.st_uid, stat_val.st_gid, stat_val.st_mode & 07777
+ out = shell.checked_call(["stat", "-c", "%u %g %a", path], sudo=True)[1]
+ uid_str, gid_str, mode_str = out.split(' ')
+ self.st_uid, self.st_gid, self.st_mode = int(uid_str), int(gid_str), int(mode_str, 8)
+
return Stat(path)
# os.kill replacement
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
index f9c512f..a2943de 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
@@ -22,18 +22,22 @@ Ambari Agent
import re
import os
from resource_management.core.environment import Environment
+from resource_management.core import sudo
from resource_management.core.base import Fail
from resource_management.core.resources.system import Execute
from resource_management.core.resources.system import File
from resource_management.core.providers import Provider
from resource_management.core.logger import Logger
from resource_management.core import shell
+from resource_management.libraries.script import Script
from resource_management.libraries.functions import format
from resource_management.libraries.functions import is_empty
from resource_management.libraries.functions import namenode_ha_utils
import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
+import subprocess
+ERR_FILE = "hdfs_resource.err"
JSON_PATH = '/var/lib/ambari-agent/data/hdfs_resources.json'
JAR_PATH = '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar'
@@ -158,7 +162,7 @@ class WebHDFSUtil:
return re.sub("[/]+", "/", path)
- valid_status_codes = ["200", "201", ""]
+ valid_status_codes = ["200", "201"]
def run_command(self, target, operation, method='POST', assertable_result=True, file_to_put=None, ignore_status_codes=[], **kwargs):
"""
assertable_result - some POST requests return '{"boolean":false}' or '{"boolean":true}'
@@ -173,6 +177,8 @@ class WebHDFSUtil:
if file_to_put and not os.path.exists(file_to_put):
raise Fail(format("File {file_to_put} is not found."))
+ err_file = os.path.join(Script.get_tmp_dir(), ERR_FILE)
+
cmd = ["curl", "-L", "-w", "%{http_code}", "-X", method]
if file_to_put:
@@ -183,7 +189,7 @@ class WebHDFSUtil:
cmd += ["-k"]
cmd.append(url)
- _, out = shell.checked_call(cmd, user=self.run_user, logoutput=self.logoutput, quiet=False)
+ _, out = shell.checked_call(cmd, user=self.run_user, logoutput=self.logoutput, quiet=False, stderr=err_file)
status_code = out[-3:]
out = out[:-3] # remove last line from output which is status code
@@ -194,6 +200,7 @@ class WebHDFSUtil:
if status_code not in WebHDFSUtil.valid_status_codes+ignore_status_codes or assertable_result and result_dict and not result_dict['boolean']:
formatted_output = json.dumps(result_dict, indent=2) if isinstance(result_dict, dict) else result_dict
+ formatted_output = sudo.read_file(err_file) + formatted_output
err_msg = "Execution of '%s' returned status_code=%s. %s" % (shell.string_cmd_from_args_list(cmd), status_code, formatted_output)
raise Fail(err_msg)
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py b/ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py
index 46ec0e7..e893503 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py
@@ -19,27 +19,29 @@ limitations under the License.
'''
from resource_management.core.exceptions import Fail
+IMMUTABLE_MESSAGE = """Configuration dictionary is immutable!
+
+For adding dynamic properties to xml files please use {{varible_from_params}} substitutions.
+Lookup xml files for {{ for examples.
+"""
+
class ConfigDictionary(dict):
"""
Immutable config dictionary
"""
- def __init__(self, dictionary, allow_overwrite=False):
+ def __init__(self, dictionary):
"""
Recursively turn dict to ConfigDictionary
"""
- self.__allow_overwrite = allow_overwrite
for k, v in dictionary.iteritems():
if isinstance(v, dict):
- dictionary[k] = ConfigDictionary(v, allow_overwrite=allow_overwrite)
+ dictionary[k] = ConfigDictionary(v)
super(ConfigDictionary, self).__init__(dictionary)
def __setitem__(self, name, value):
- if self.__allow_overwrite:
- super(ConfigDictionary, self).__setitem__(name, value)
- else:
- raise Fail("Configuration dictionary is immutable!")
+ raise Fail(IMMUTABLE_MESSAGE)
def __getitem__(self, name):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
index 3494ac9..578d994 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
@@ -233,7 +233,10 @@ class NameNodeDefault(NameNode):
parser = hdfs_rebalance.HdfsParser()
- def handle_new_line(line):
+ def handle_new_line(line, is_stderr):
+ if is_stderr:
+ return
+
_print('[balancer] %s' % (line))
pl = parser.parseLine(line)
if pl:
http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-server/src/test/python/custom_actions/TestInstallPackages.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/custom_actions/TestInstallPackages.py b/ambari-server/src/test/python/custom_actions/TestInstallPackages.py
index ecac6d4..763f9e2 100644
--- a/ambari-server/src/test/python/custom_actions/TestInstallPackages.py
+++ b/ambari-server/src/test/python/custom_actions/TestInstallPackages.py
@@ -22,6 +22,7 @@ import os
import pty
import socket
import subprocess
+import select
from resource_management import Script,ConfigDictionary
from mock.mock import patch
from mock.mock import MagicMock
@@ -33,9 +34,16 @@ from resource_management.core.resources.packaging import Package
from resource_management.core.exceptions import Fail
from ambari_commons.os_check import OSCheck
+subproc_mock = MagicMock()
+subproc_mock.return_value = MagicMock()
+subproc_stdout = MagicMock()
+subproc_mock.return_value.stdout = subproc_stdout
+
+@patch.object(os, "read", new=MagicMock(return_value=None))
+@patch.object(select, "select", new=MagicMock(return_value=([subproc_stdout], None, None)))
@patch.object(pty, "openpty", new = MagicMock(return_value=(1,5)))
@patch.object(os, "close", new=MagicMock())
-@patch.object(subprocess, "Popen", new=MagicMock())
+@patch.object(subprocess, "Popen", new=subproc_mock)
class TestInstallPackages(RMFTestCase):
@staticmethod