You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2015/06/10 16:54:41 UTC

[2/2] ambari git commit: AMBARI-11838. HdfsResource rearly fails on heavy loaded machines (aonishuk)

AMBARI-11838. HdfsResource rearly fails on heavy loaded machines (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fa694609
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fa694609
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fa694609

Branch: refs/heads/branch-2.1
Commit: fa694609375059c3856ed473078bf30652992d36
Parents: 49c7c93
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Wed Jun 10 17:54:31 2015 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Wed Jun 10 17:54:31 2015 +0300

----------------------------------------------------------------------
 .../resource_management/TestExecuteResource.py  |  81 +++++---
 .../resource_management/TestGroupResource.py    |  25 ++-
 .../resource_management/TestUserResource.py     |  49 ++---
 .../resource_management/core/exceptions.py      |   2 +-
 .../core/providers/system.py                    |  37 +---
 .../core/resources/system.py                    |  10 +
 .../python/resource_management/core/shell.py    | 187 +++++++++++++------
 .../python/resource_management/core/sudo.py     |  20 +-
 .../libraries/providers/hdfs_resource.py        |  11 +-
 .../libraries/script/config_dictionary.py       |  16 +-
 .../HDFS/2.1.0.2.0/package/scripts/namenode.py  |   5 +-
 .../custom_actions/TestInstallPackages.py       |  10 +-
 12 files changed, 291 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestExecuteResource.py b/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
index 59d7e4c..04244e9 100644
--- a/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestExecuteResource.py
@@ -30,18 +30,23 @@ import os
 from resource_management import Fail
 import grp
 import pwd
+import select
 
 
 @patch.object(System, "os_family", new='redhat')
 class TestExecuteResource(TestCase):
+  @patch.object(os, "read")
+  @patch.object(select, "select")
   @patch.object(logging.Logger, "info")
   @patch.object(subprocess, "Popen")
-  def test_attribute_logoutput(self, popen_mock, info_mock):
+  def test_attribute_logoutput(self, popen_mock, info_mock, select_mock, os_read_mock):
     subproc_mock = MagicMock()
+    subproc_mock.wait.return_value = MagicMock()
+    subproc_mock.stdout = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
-    subproc_mock.communicate.side_effect = [["1"], ["2"]]
     popen_mock.return_value = subproc_mock
+    select_mock.return_value = ([subproc_mock.stdout], None, None)
+    os_read_mock.return_value = None
 
     with Environment("/") as env:
       Execute('echo "1"',
@@ -82,14 +87,18 @@ class TestExecuteResource(TestCase):
     exists_mock.assert_called_with("/must/be/created")
     self.assertEqual(subproc_mock.call_count, 0)
 
+  @patch.object(os, "read")
+  @patch.object(select, "select")
   @patch.object(subprocess, "Popen")
-  def test_attribute_path(self, popen_mock):
+  def test_attribute_path(self, popen_mock, select_mock, os_read_mock):
     subproc_mock = MagicMock()
+    subproc_mock.wait.return_value = MagicMock()
+    subproc_mock.stdout = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
-    subproc_mock.communicate.side_effect = [["1"]]
     popen_mock.return_value = subproc_mock
-
+    select_mock.return_value = ([subproc_mock.stdout], None, None)
+    os_read_mock.return_value = None
+    
     with Environment("/") as env:
       execute_resource = Execute('echo "1"',
                                  path=["/test/one", "test/two"]
@@ -97,17 +106,23 @@ class TestExecuteResource(TestCase):
     expected_command = ['/bin/bash', '--login', '--noprofile', '-c', 'echo "1"']
     self.assertEqual(popen_mock.call_args_list[0][0][0], expected_command)
 
+  @patch.object(os, "read")
+  @patch.object(select, "select")
   @patch('time.sleep')
   @patch.object(subprocess, "Popen")
-  def test_attribute_try_sleep_tries(self, popen_mock, time_mock):
+  def test_attribute_try_sleep_tries(self, popen_mock, time_mock, select_mock, os_read_mock):
     expected_call = "call('Retrying after %d seconds. Reason: %s', 1, 'Fail')"
-
+    
     subproc_mock_one = MagicMock()
     subproc_mock_one.returncode = 1
+    subproc_mock_one.stdout = MagicMock()
     subproc_mock_zero = MagicMock()
+    subproc_mock_zero.stdout = MagicMock()
     subproc_mock_zero.returncode = 0
     #subproc_mock.stdout.readline = MagicMock(side_effect = [Fail("Fail"), "OK"])
     popen_mock.side_effect = [subproc_mock_one, subproc_mock_zero]
+    select_mock.side_effect = [([subproc_mock_one.stdout], None, None),([subproc_mock_zero.stdout], None, None)]
+    os_read_mock.return_value = None
 
     with Environment("/") as env:
       Execute('echo "1"',
@@ -133,15 +148,19 @@ class TestExecuteResource(TestCase):
     except Fail as e:
       pass
 
+  @patch.object(os, "read")
+  @patch.object(select, "select")
   @patch.object(subprocess, "Popen")
-  def test_attribute_environment(self, popen_mock):
+  def test_attribute_environment(self, popen_mock, select_mock, os_read_mock):
     expected_dict = {"JAVA_HOME": "/test/java/home"}
 
     subproc_mock = MagicMock()
+    subproc_mock.wait.return_value = MagicMock()
+    subproc_mock.stdout = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
-    subproc_mock.communicate.side_effect = [["1"]]
     popen_mock.return_value = subproc_mock
+    select_mock.return_value = ([subproc_mock.stdout], None, None)
+    os_read_mock.return_value = None
 
     with Environment("/") as env:
       Execute('echo "1"',
@@ -151,15 +170,19 @@ class TestExecuteResource(TestCase):
     self.assertEqual(popen_mock.call_args_list[0][1]["env"], expected_dict)
     pass
 
+  @patch.object(os, "read")
+  @patch.object(select, "select")
   @patch.object(subprocess, "Popen")
-  def test_attribute_environment_non_root(self, popen_mock):
+  def test_attribute_environment_non_root(self, popen_mock, select_mock, os_read_mock):
     expected_user = 'test_user'
 
     subproc_mock = MagicMock()
+    subproc_mock.wait.return_value = MagicMock()
+    subproc_mock.stdout = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
-    subproc_mock.communicate.side_effect = [["1"]]
     popen_mock.return_value = subproc_mock
+    select_mock.return_value = ([subproc_mock.stdout], None, None)
+    os_read_mock.return_value = None
 
     with Environment("/") as env:
       execute_resource = Execute('echo "1"',
@@ -173,15 +196,19 @@ class TestExecuteResource(TestCase):
     self.assertEqual(popen_mock.call_args_list[0][0][0], expected_command)
 
 
+  @patch.object(os, "read")
+  @patch.object(select, "select")
   @patch.object(subprocess, "Popen")
-  def test_attribute_cwd(self, popen_mock):
+  def test_attribute_cwd(self, popen_mock, select_mock, os_read_mock):
     expected_cwd = "/test/work/directory"
 
     subproc_mock = MagicMock()
+    subproc_mock.wait.return_value = MagicMock()
+    subproc_mock.stdout = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
-    subproc_mock.communicate.side_effect = [["1"]]
     popen_mock.return_value = subproc_mock
+    select_mock.return_value = ([subproc_mock.stdout], None, None)
+    os_read_mock.return_value = None
 
     with Environment("/") as env:
       Execute('echo "1"',
@@ -190,8 +217,10 @@ class TestExecuteResource(TestCase):
 
     self.assertEqual(popen_mock.call_args_list[0][1]["cwd"], expected_cwd)
 
+  @patch.object(os, "read")
+  @patch.object(select, "select")
   @patch.object(subprocess, "Popen")
-  def test_attribute_command_escaping(self, popen_mock):
+  def test_attribute_command_escaping(self, popen_mock, select_mock, os_read_mock):
     expected_command0 = "arg1 arg2 'quoted arg'"
     expected_command1 = "arg1 arg2 'command \"arg\"'"
     expected_command2 = 'arg1 arg2 \'command \'"\'"\'arg\'"\'"\'\''
@@ -200,9 +229,12 @@ class TestExecuteResource(TestCase):
     expected_command5 = "arg1 arg2 '`ls /root`'"
 
     subproc_mock = MagicMock()
+    subproc_mock.wait.return_value = MagicMock()
+    subproc_mock.stdout = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
+    select_mock.return_value = ([subproc_mock.stdout], None, None)
+    os_read_mock.return_value = None
 
     with Environment("/") as env:
       Execute(('arg1', 'arg2', 'quoted arg'),
@@ -225,14 +257,19 @@ class TestExecuteResource(TestCase):
     self.assertEqual(popen_mock.call_args_list[4][0][0][4], expected_command4)
     self.assertEqual(popen_mock.call_args_list[5][0][0][4], expected_command5)
 
+  @patch.object(os, "read")
+  @patch.object(select, "select")
   @patch.object(subprocess, "Popen")
-  def test_attribute_command_one_line(self, popen_mock):
+  def test_attribute_command_one_line(self, popen_mock, select_mock, os_read_mock):
     expected_command = "rm -rf /somedir"
 
     subproc_mock = MagicMock()
+    subproc_mock.wait.return_value = MagicMock()
+    subproc_mock.stdout = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
     popen_mock.return_value = subproc_mock
+    select_mock.return_value = ([subproc_mock.stdout], None, None)
+    os_read_mock.return_value = None
 
     with Environment("/") as env:
       Execute(expected_command)

http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-agent/src/test/python/resource_management/TestGroupResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestGroupResource.py b/ambari-agent/src/test/python/resource_management/TestGroupResource.py
index 65d8499..1d52a90 100644
--- a/ambari-agent/src/test/python/resource_management/TestGroupResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestGroupResource.py
@@ -26,8 +26,13 @@ import subprocess
 import grp
 import os
 import pty
+import select
 
 
+subproc_stdout = MagicMock()
+
+@patch.object(os, "read", new=MagicMock(return_value=None))
+@patch.object(select, "select", new=MagicMock(return_value=([subproc_stdout], None, None)))
 @patch.object(System, "os_family", new = 'redhat')
 @patch.object(os, "environ", new = {'PATH':'/bin'})
 @patch.object(pty, "openpty", new = MagicMock(return_value=(1,5)))
@@ -39,7 +44,7 @@ class TestGroupResource(TestCase):
   def test_action_create_nonexistent(self, popen_mock, getgrnam_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getgrnam_mock.side_effect = KeyError()
 
@@ -51,7 +56,7 @@ class TestGroupResource(TestCase):
     
 
     self.assertEqual(popen_mock.call_count, 1)
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E groupadd -p secure hadoop"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E groupadd -p secure hadoop"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     getgrnam_mock.assert_called_with('hadoop')
 
 
@@ -60,7 +65,7 @@ class TestGroupResource(TestCase):
   def test_action_create_existent(self, popen_mock, getgrnam_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getgrnam_mock.return_value = _get_group()
 
@@ -73,7 +78,7 @@ class TestGroupResource(TestCase):
     
 
     self.assertEqual(popen_mock.call_count, 1)
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E groupmod -p secure -g 2 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E groupmod -p secure -g 2 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     getgrnam_mock.assert_called_with('mapred')
 
 
@@ -82,7 +87,7 @@ class TestGroupResource(TestCase):
   def test_action_create_fail(self, popen_mock, getgrnam_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 1
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getgrnam_mock.return_value = _get_group()
 
@@ -98,7 +103,7 @@ class TestGroupResource(TestCase):
     except Fail:
       pass
     self.assertEqual(popen_mock.call_count, 1)
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E groupmod -p secure -g 2 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E groupmod -p secure -g 2 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     getgrnam_mock.assert_called_with('mapred')
 
 
@@ -108,7 +113,7 @@ class TestGroupResource(TestCase):
 
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getgrnam_mock.return_value = _get_group()
 
@@ -119,7 +124,7 @@ class TestGroupResource(TestCase):
     
 
     self.assertEqual(popen_mock.call_count, 1)
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh  PATH=/bin -H -E groupdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh  PATH=/bin -H -E groupdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     getgrnam_mock.assert_called_with('mapred')
 
 
@@ -129,7 +134,7 @@ class TestGroupResource(TestCase):
 
     subproc_mock = MagicMock()
     subproc_mock.returncode = 1
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getgrnam_mock.return_value = _get_group()
 
@@ -144,7 +149,7 @@ class TestGroupResource(TestCase):
       pass
 
     self.assertEqual(popen_mock.call_count, 1)
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh  PATH=/bin -H -E groupdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh  PATH=/bin -H -E groupdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     getgrnam_mock.assert_called_with('mapred')
     
 def _get_group():

http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-agent/src/test/python/resource_management/TestUserResource.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestUserResource.py b/ambari-agent/src/test/python/resource_management/TestUserResource.py
index 5c8b6df..1598823 100644
--- a/ambari-agent/src/test/python/resource_management/TestUserResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestUserResource.py
@@ -26,7 +26,12 @@ import pwd
 import subprocess
 import os
 import pty
+import select
 
+subproc_stdout = MagicMock()
+
+@patch.object(os, "read", new=MagicMock(return_value=None))
+@patch.object(select, "select", new=MagicMock(return_value=([subproc_stdout], None, None)))
 @patch.object(System, "os_family", new = 'redhat')
 @patch.object(os, "environ", new = {'PATH':'/bin'})
 @patch.object(pty, "openpty", new = MagicMock(return_value=(1,5)))
@@ -38,13 +43,13 @@ class TestUserResource(TestCase):
   def test_action_create_nonexistent(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = None
     with Environment('/') as env:
       user = User("mapred", action = "create", shell = "/bin/bash")
 
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E useradd -m -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, env={'PATH': '/bin'}, bufsize=1, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E useradd -m -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     self.assertEqual(popen_mock.call_count, 1)
     
   @patch.object(subprocess, "Popen")
@@ -52,14 +57,14 @@ class TestUserResource(TestCase):
   def test_action_create_existent(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = _get_user_entity()
 
     with Environment('/') as env:
       user = User("mapred", action = "create", shell = "/bin/bash")
 
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     self.assertEqual(popen_mock.call_count, 1)
 
   @patch.object(subprocess, "Popen")
@@ -67,14 +72,14 @@ class TestUserResource(TestCase):
   def test_action_delete(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = 1
 
     with Environment('/') as env:
       user = User("mapred", action = "remove", shell = "/bin/bash")
 
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh  PATH=/bin -H -E userdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh  PATH=/bin -H -E userdel mapred'], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     self.assertEqual(popen_mock.call_count, 1)
 
   @patch.object(subprocess, "Popen")
@@ -82,7 +87,7 @@ class TestUserResource(TestCase):
   def test_attribute_comment(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = _get_user_entity()
 
@@ -90,7 +95,7 @@ class TestUserResource(TestCase):
       user = User("mapred", action = "create", comment = "testComment", 
           shell = "/bin/bash")
 
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -c testComment -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -c testComment -s /bin/bash mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     self.assertEqual(popen_mock.call_count, 1)
 
   @patch.object(subprocess, "Popen")
@@ -98,7 +103,7 @@ class TestUserResource(TestCase):
   def test_attribute_home(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = _get_user_entity()
 
@@ -106,7 +111,7 @@ class TestUserResource(TestCase):
       user = User("mapred", action = "create", home = "/test/home", 
           shell = "/bin/bash")
 
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash -d /test/home mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash -d /test/home mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     self.assertEqual(popen_mock.call_count, 1)
 
   @patch.object(subprocess, "Popen")
@@ -114,7 +119,7 @@ class TestUserResource(TestCase):
   def test_attribute_password(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = _get_user_entity()
 
@@ -122,7 +127,7 @@ class TestUserResource(TestCase):
       user = User("mapred", action = "create", password = "secure", 
           shell = "/bin/bash")    
 
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash -p secure mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash -p secure mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     self.assertEqual(popen_mock.call_count, 1)
 
   @patch.object(subprocess, "Popen")
@@ -130,14 +135,14 @@ class TestUserResource(TestCase):
   def test_attribute_shell(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = _get_user_entity()
 
     with Environment('/') as env:
       user = User("mapred", action = "create", shell = "/bin/sh")
 
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/sh mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/sh mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     self.assertEqual(popen_mock.call_count, 1)
 
   @patch.object(subprocess, "Popen")
@@ -145,14 +150,14 @@ class TestUserResource(TestCase):
   def test_attribute_uid(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = _get_user_entity()
 
     with Environment('/') as env:
       user = User("mapred", action = "create", uid = "1", shell = "/bin/bash")
 
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash -u 1 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash -u 1 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     self.assertEqual(popen_mock.call_count, 1)
 
   @patch.object(subprocess, "Popen")
@@ -160,14 +165,14 @@ class TestUserResource(TestCase):
   def test_attribute_gid(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = _get_user_entity()
 
     with Environment('/') as env:
       user = User("mapred", action = "create", gid = "1", shell = "/bin/bash")
 
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash -g 1 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash -g 1 mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     self.assertEqual(popen_mock.call_count, 1)
 
   @patch('resource_management.core.providers.accounts.UserProvider.user_groups', new_callable=PropertyMock)
@@ -177,7 +182,7 @@ class TestUserResource(TestCase):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
     user_groups_mock.return_value = ['hadoop']
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = _get_user_entity()
 
@@ -185,7 +190,7 @@ class TestUserResource(TestCase):
       user = User("mapred", action = "create", groups = ['1','2','3'], 
           shell = "/bin/bash")
 
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash -G 1,2,3,hadoop mapred'], shell=False, preexec_fn=None, env={'PATH': '/bin'}, close_fds=True, stdout=5, stderr=-2, bufsize=1, cwd=None)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', 'ambari-sudo.sh  PATH=/bin -H -E usermod -s /bin/bash -G 1,2,3,hadoop mapred'], shell=False, preexec_fn=None, env={'PATH': '/bin'}, close_fds=True, stdout=-1, stderr=-2, cwd=None)
     self.assertEqual(popen_mock.call_count, 1)
 
   @patch.object(subprocess, "Popen")
@@ -193,13 +198,13 @@ class TestUserResource(TestCase):
   def test_missing_shell_argument(self, getpwnam_mock, popen_mock):
     subproc_mock = MagicMock()
     subproc_mock.returncode = 0
-    subproc_mock.stdout.readline = MagicMock(side_effect = ['OK'])
+    subproc_mock.stdout = subproc_stdout
     popen_mock.return_value = subproc_mock
     getpwnam_mock.return_value = None
     with Environment('/') as env:
       user = User("mapred", action = "create")
 
-    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E useradd -m mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=5, bufsize=1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
+    popen_mock.assert_called_with(['/bin/bash', '--login', '--noprofile', '-c', "ambari-sudo.sh  PATH=/bin -H -E useradd -m mapred"], shell=False, preexec_fn=None, stderr=-2, stdout=-1, env={'PATH': '/bin'}, cwd=None, close_fds=True)
     self.assertEqual(popen_mock.call_count, 1)
 
 def _get_user_entity():

http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/core/exceptions.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/exceptions.py b/ambari-common/src/main/python/resource_management/core/exceptions.py
index a8b2a63..25e7993 100644
--- a/ambari-common/src/main/python/resource_management/core/exceptions.py
+++ b/ambari-common/src/main/python/resource_management/core/exceptions.py
@@ -25,7 +25,7 @@ __all__ = ["Fail", "ExecuteTimeoutException", "InvalidArgument", "ClientComponen
 class Fail(Exception):
   pass
 
-class ExecuteTimeoutException(Exception):
+class ExecuteTimeoutException(Fail):
   pass
 
 class InvalidArgument(Fail):

http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/core/providers/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/providers/system.py b/ambari-common/src/main/python/resource_management/core/providers/system.py
index 3a1b218..a6f940d 100644
--- a/ambari-common/src/main/python/resource_management/core/providers/system.py
+++ b/ambari-common/src/main/python/resource_management/core/providers/system.py
@@ -242,33 +242,16 @@ class ExecuteProvider(Provider):
         Logger.info("Skipping %s due to creates" % self.resource)
         return
       
-    env = self.resource.environment
-          
-    for i in range (0, self.resource.tries):
-      try:
-        shell.checked_call(self.resource.command, logoutput=self.resource.logoutput,
-                            cwd=self.resource.cwd, env=env,
-                            preexec_fn=_preexec_fn(self.resource), user=self.resource.user,
-                            wait_for_finish=self.resource.wait_for_finish,
-                            timeout=self.resource.timeout,
-                            path=self.resource.path,
-                            sudo=self.resource.sudo,
-                            on_new_line=self.resource.on_new_line)
-        break
-      except Fail as ex:
-        if i == self.resource.tries-1: # last try
-          raise ex
-        else:
-          Logger.info("Retrying after %d seconds. Reason: %s" % (self.resource.try_sleep, str(ex)))
-          time.sleep(self.resource.try_sleep)
-      except ExecuteTimeoutException:
-        err_msg = ("Execution of '%s' was killed due timeout after %d seconds") % (self.resource.command, self.resource.timeout)
-        
-        if self.resource.on_timeout:
-          Logger.info("Executing '%s'. Reason: %s" % (self.resource.on_timeout, err_msg))
-          shell.checked_call(self.resource.on_timeout)
-        else:
-          raise Fail(err_msg)
+    shell.checked_call(self.resource.command, logoutput=self.resource.logoutput,
+                        cwd=self.resource.cwd, env=self.resource.environment,
+                        preexec_fn=_preexec_fn(self.resource), user=self.resource.user,
+                        wait_for_finish=self.resource.wait_for_finish,
+                        timeout=self.resource.timeout,on_timeout=self.resource.on_timeout,
+                        path=self.resource.path,
+                        sudo=self.resource.sudo,
+                        on_new_line=self.resource.on_new_line,
+                        stdout=self.resource.stdout,stderr=self.resource.stderr,
+                        tries=self.resource.tries, try_sleep=self.resource.try_sleep)
        
 
 class ExecuteScriptProvider(Provider):

http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/core/resources/system.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/resources/system.py b/ambari-common/src/main/python/resource_management/core/resources/system.py
index 1f00da1..f74e258 100644
--- a/ambari-common/src/main/python/resource_management/core/resources/system.py
+++ b/ambari-common/src/main/python/resource_management/core/resources/system.py
@@ -22,6 +22,7 @@ Ambari Agent
 
 __all__ = ["File", "Directory", "Link", "Execute", "ExecuteScript", "Mount"]
 
+import subprocess
 from resource_management.core.base import Resource, ForcedListArgument, ResourceArgument, BooleanArgument
 
 
@@ -154,6 +155,15 @@ class Execute(Resource):
   command2 = as_sudo(["ls", "/root/example.txt") + " && " + as_sudo(["rm","-f","example.txt"])
   """
   sudo = BooleanArgument(default=False)
+  """
+  subprocess.PIPE - enable output gathering
+  None - disable output to gathering, and output to Python out straightly (even if logoutput is False)
+  subprocess.STDOUT - redirect to stdout (not valid as value for stdout agument)
+  {int fd} - redirect to file with descriptor.
+  {string filename} - redirects to a file with name.
+  """
+  stdout = ResourceArgument(default=subprocess.PIPE)
+  stderr = ResourceArgument(default=subprocess.STDOUT)
 
 class ExecuteScript(Resource):
   action = ForcedListArgument(default="run")

http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/core/shell.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/shell.py b/ambari-common/src/main/python/resource_management/core/shell.py
index 2e5a1b0..183c735 100644
--- a/ambari-common/src/main/python/resource_management/core/shell.py
+++ b/ambari-common/src/main/python/resource_management/core/shell.py
@@ -22,6 +22,8 @@ Ambari Agent
 
 __all__ = ["non_blocking_call", "checked_call", "call", "quote_bash_args", "as_user", "as_sudo"]
 
+import time
+import copy
 import os
 import select
 import sys
@@ -76,26 +78,34 @@ def log_function_call(function):
   return inner
 
 @log_function_call
-def checked_call(command, quiet=False, logoutput=None,
-         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
+def checked_call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
+         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, on_timeout=None,
+         path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0):
   """
   Execute the shell command and throw an exception on failure.
   @throws Fail
   @return: return_code, output
   """
-  return _call(command, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo, on_new_line)
-
+  return _call_wrapper(command, logoutput=logoutput, throw_on_failure=True, stdout=stdout, stderr=stderr,
+                              cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=wait_for_finish, 
+                              on_timeout=on_timeout, timeout=timeout, path=path, sudo=sudo, on_new_line=on_new_line,
+                              tries=tries, try_sleep=try_sleep)
+  
 @log_function_call
-def call(command, quiet=False, logoutput=None,
-         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
+def call(command, quiet=False, logoutput=None, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
+         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, on_timeout=None,
+         path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0):
   """
   Execute the shell command despite failures.
   @return: return_code, output
   """
-  return _call(command, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo, on_new_line)
+  return _call_wrapper(command, logoutput=logoutput, throw_on_failure=False, stdout=stdout, stderr=stderr,
+                              cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=wait_for_finish, 
+                              on_timeout=on_timeout, timeout=timeout, path=path, sudo=sudo, on_new_line=on_new_line,
+                              tries=tries, try_sleep=try_sleep)
 
 @log_function_call
-def non_blocking_call(command, quiet=False,
+def non_blocking_call(command, quiet=False, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
          cwd=None, env=None, preexec_fn=None, user=None, timeout=None, path=None, sudo=False):
   """
   Execute the shell command and don't wait until it's completion
@@ -104,10 +114,49 @@ def non_blocking_call(command, quiet=False,
   (use proc.stdout.readline to read output in cycle, don't foget to proc.stdout.close(),
   to get return code use proc.wait() and after that proc.returncode)
   """
-  return _call(command, False, True, cwd, env, preexec_fn, user, False, timeout, path, sudo, None)
+  return _call_wrapper(command, logoutput=False, throw_on_failure=True, stdout=stdout, stderr=stderr, 
+                              cwd=cwd, env=env, preexec_fn=preexec_fn, user=user, wait_for_finish=False, 
+                              on_timeout=None, timeout=timeout, path=path, sudo=sudo, on_new_line=None, 
+                              tries=1, try_sleep=0)
+
+def _call_wrapper(command, **kwargs):
+  tries = kwargs['tries']
+  try_sleep = kwargs['try_sleep']
+  timeout = kwargs['timeout']
+  on_timeout = kwargs['on_timeout']
+  throw_on_failure = kwargs['throw_on_failure']
+  
+  for i in range (0, tries):
+    is_last_try = (i == tries-1)
 
-def _call(command, logoutput=None, throw_on_failure=True,
-         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None):
+    if not is_last_try:
+      kwargs_copy = copy.copy(kwargs)
+      kwargs_copy['throw_on_failure'] = True # we need to know when non-last try fails, to handle retries
+    else:
+      kwargs_copy = kwargs
+    
+    try:    
+      try:
+        result = _call(command, **kwargs_copy)
+        break
+      except ExecuteTimeoutException as ex:     
+        if on_timeout:
+          Logger.info("Executing '%s'. Reason: %s" % (on_timeout, str(ex)))
+          checked_call(on_timeout)
+        else:
+          raise
+    except Fail as ex:
+      if is_last_try: # last try
+        raise
+      else:
+        Logger.info("Retrying after %d seconds. Reason: %s" % (try_sleep, str(ex)))
+        time.sleep(try_sleep)
+      
+  return result
+
+def _call(command, logoutput=None, throw_on_failure=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
+         cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, on_timeout=None, 
+         path=None, sudo=False, on_new_line=None, tries=1, try_sleep=0):
   """
   Execute shell command
   
@@ -115,8 +164,13 @@ def _call(command, logoutput=None, throw_on_failure=True,
   or string of the command to execute
   @param logoutput: boolean, whether command output should be logged of not
   @param throw_on_failure: if true, when return code is not zero exception is thrown
+  @param stdout,stderr: 
+    subprocess.PIPE - enable output to variable
+    subprocess.STDOUT - redirect to stdout
+    None - disable output to variable, and output to Python out straightly (even if logoutput is False)
+    {int fd} - redirect to file with descriptor.
+    {string filename} - redirects to a file with name.
   """
-
   command_alias = string_cmd_from_args_list(command) if isinstance(command, (list, tuple)) else command
   
   # Append current PATH to env['PATH']
@@ -125,7 +179,7 @@ def _call(command, logoutput=None, throw_on_failure=True,
   if path:
     path = os.pathsep.join(path) if isinstance(path, (list, tuple)) else path
     env['PATH'] = os.pathsep.join([env['PATH'], path])
-
+  
   # prepare command cmd
   if sudo:
     command = as_sudo(command, env=env)
@@ -141,61 +195,82 @@ def _call(command, logoutput=None, throw_on_failure=True,
   for placeholder, replacement in PLACEHOLDERS_TO_STR.iteritems():
     command = command.replace(placeholder, replacement.format(env_str=env_str))
 
-  import pty
-  master_fd, slave_fd = pty.openpty()
-
   # --noprofile is used to preserve PATH set for ambari-agent
   subprocess_command = ["/bin/bash","--login","--noprofile","-c", command]
-  proc = subprocess.Popen(subprocess_command, bufsize=1, stdout=slave_fd, stderr=subprocess.STDOUT,
-                          cwd=cwd, env=env, shell=False, close_fds=True,
-                          preexec_fn=preexec_fn)
   
-  if timeout:
-    timeout_event = threading.Event()
-    t = threading.Timer( timeout, _on_timeout, [proc, timeout_event] )
-    t.start()
+  files_to_close = []
+  if isinstance(stdout, (basestring)):
+    stdout = open(stdout, 'wb')
+    files_to_close.append(stdout)
+  if isinstance(stderr, (basestring)):
+    stderr = open(stderr, 'wb')
+    files_to_close.append(stderr)
+  
+  try:
+    proc = subprocess.Popen(subprocess_command, stdout=stdout, stderr=stderr,
+                            cwd=cwd, env=env, shell=False, close_fds=True,
+                            preexec_fn=preexec_fn)
     
-  if not wait_for_finish:
-    return proc
+    if timeout:
+      timeout_event = threading.Event()
+      t = threading.Timer( timeout, _on_timeout, [proc, timeout_event] )
+      t.start()
+      
+    if not wait_for_finish:
+      return proc
+      
+    # in case logoutput==False, never log.    
+    logoutput = logoutput==True and Logger.logger.isEnabledFor(logging.INFO) or logoutput==None and Logger.logger.isEnabledFor(logging.DEBUG)
+    read_set = []
     
-  # in case logoutput==False, never log.    
-  logoutput = logoutput==True and Logger.logger.isEnabledFor(logging.INFO) or logoutput==None and Logger.logger.isEnabledFor(logging.DEBUG)
-  out = ""
-  read_timeout = .001 # seconds
-
-  try:
-    while True:
-      ready, _, _ = select.select([master_fd], [], [], read_timeout)
-      if ready:
-        line = os.read(master_fd, 512)
-        if not line:
-          continue
+    if stdout == subprocess.PIPE:
+      read_set.append(proc.stdout)
+    if stderr == subprocess.PIPE:
+      read_set.append(proc.stderr)
+    
+    fd_to_string = {
+      proc.stdout: "",
+      proc.stderr: ""
+    }
+                  
+    while read_set:
+      ready, _, _ = select.select(read_set, [], [])
+      for out_fd in read_set:
+        if out_fd in ready:
+          line = os.read(out_fd.fileno(), 1024)
           
-        out += line
-        try:
-          if on_new_line:
-            on_new_line(line)
-        except Exception, err:
-          err_msg = "Caused by on_new_line function failed with exception for input argument '{0}':\n{1}".format(line, traceback.format_exc())
-          raise Fail(err_msg)
+          if not line:
+            read_set = copy.copy(read_set)
+            read_set.remove(out_fd)
+            continue
           
-        if logoutput:
-          _print(line)    
-      elif proc.poll() is not None:
-        break # proc exited
+          fd_to_string[out_fd] += line
+            
+          if on_new_line:
+            try:
+              on_new_line(line, out_fd == proc.stderr)
+            except Exception, err:
+              err_msg = "Caused by on_new_line function failed with exception for input argument '{0}':\n{1}".format(line, traceback.format_exc())
+              raise Fail(err_msg)
+            
+          if logoutput:
+            _print(line)    
+  
+    proc.wait()
   finally:
-    os.close(slave_fd)
-    os.close(master_fd)
-
-  proc.wait()  
-  out = out.strip('\n')
+    for fp in files_to_close:
+      fp.close()
+      
+  out = fd_to_string[proc.stdout].strip('\n')
+  err = fd_to_string[proc.stderr].strip('\n')
   
   if timeout: 
     if not timeout_event.is_set():
       t.cancel()
     # timeout occurred
     else:
-      raise ExecuteTimeoutException()
+      err_msg = ("Execution of '%s' was killed due timeout after %d seconds") % (command, timeout)
+      raise ExecuteTimeoutException(err_msg)
    
   code = proc.returncode
   
@@ -203,6 +278,10 @@ def _call(command, logoutput=None, throw_on_failure=True,
     err_msg = Logger.filter_text(("Execution of '%s' returned %d. %s") % (command_alias, code, out))
     raise Fail(err_msg)
   
+  # if separate stderr is enabled (by default it's redirected to out)
+  if stderr == subprocess.PIPE:
+    return code, out, err
+  
   return code, out
 
 def as_sudo(command, env=None, auto_escape=True):

http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/core/sudo.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/sudo.py b/ambari-common/src/main/python/resource_management/core/sudo.py
index f864d64..740eea3 100644
--- a/ambari-common/src/main/python/resource_management/core/sudo.py
+++ b/ambari-common/src/main/python/resource_management/core/sudo.py
@@ -198,25 +198,15 @@ else:
   # os.path.isfile
   def path_isfile(path):
     return (shell.call(["test", "-f", path], sudo=True)[0] == 0)
-  
+
   # os.stat
   def stat(path):
     class Stat:
-      RETRY_COUNT = 5
       def __init__(self, path):
-        # Sometimes (on heavy load) stat call returns an empty output with zero return code
-        for i in range(0, self.RETRY_COUNT):
-          out = shell.checked_call(["stat", "-c", "%u %g %a", path], sudo=True)[1]
-          values = out.split(' ')
-          if len(values) == 3:
-            uid_str, gid_str, mode_str = values
-            self.st_uid, self.st_gid, self.st_mode = int(uid_str), int(gid_str), int(mode_str, 8)
-            break
-        else:
-          warning_message = "Can not parse a sudo stat call output: \"{0}\"".format(out)
-          Logger.warning(warning_message)
-          stat_val = os.stat(path)
-          self.st_uid, self.st_gid, self.st_mode = stat_val.st_uid, stat_val.st_gid, stat_val.st_mode & 07777
+        out = shell.checked_call(["stat", "-c", "%u %g %a", path], sudo=True)[1]
+        uid_str, gid_str, mode_str = out.split(' ')
+        self.st_uid, self.st_gid, self.st_mode = int(uid_str), int(gid_str), int(mode_str, 8)
+  
     return Stat(path)
   
   # os.kill replacement

http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
index f9c512f..a2943de 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
@@ -22,18 +22,22 @@ Ambari Agent
 import re
 import os
 from resource_management.core.environment import Environment
+from resource_management.core import sudo
 from resource_management.core.base import Fail
 from resource_management.core.resources.system import Execute
 from resource_management.core.resources.system import File
 from resource_management.core.providers import Provider
 from resource_management.core.logger import Logger
 from resource_management.core import shell
+from resource_management.libraries.script import Script
 from resource_management.libraries.functions import format
 from resource_management.libraries.functions import is_empty
 from resource_management.libraries.functions import namenode_ha_utils
 
 import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
+import subprocess
 
+ERR_FILE = "hdfs_resource.err"
 JSON_PATH = '/var/lib/ambari-agent/data/hdfs_resources.json'
 JAR_PATH = '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar'
 
@@ -158,7 +162,7 @@ class WebHDFSUtil:
       
     return re.sub("[/]+", "/", path)
     
-  valid_status_codes = ["200", "201", ""]
+  valid_status_codes = ["200", "201"]
   def run_command(self, target, operation, method='POST', assertable_result=True, file_to_put=None, ignore_status_codes=[], **kwargs):
     """
     assertable_result - some POST requests return '{"boolean":false}' or '{"boolean":true}'
@@ -173,6 +177,8 @@ class WebHDFSUtil:
     if file_to_put and not os.path.exists(file_to_put):
       raise Fail(format("File {file_to_put} is not found."))
     
+    err_file = os.path.join(Script.get_tmp_dir(), ERR_FILE)
+    
     cmd = ["curl", "-L", "-w", "%{http_code}", "-X", method]
     
     if file_to_put:
@@ -183,7 +189,7 @@ class WebHDFSUtil:
       cmd += ["-k"]
       
     cmd.append(url)
-    _, out = shell.checked_call(cmd, user=self.run_user, logoutput=self.logoutput, quiet=False)
+    _, out = shell.checked_call(cmd, user=self.run_user, logoutput=self.logoutput, quiet=False, stderr=err_file)
     status_code = out[-3:]
     out = out[:-3] # remove last line from output which is status code
     
@@ -194,6 +200,7 @@ class WebHDFSUtil:
           
     if status_code not in WebHDFSUtil.valid_status_codes+ignore_status_codes or assertable_result and result_dict and not result_dict['boolean']:
       formatted_output = json.dumps(result_dict, indent=2) if isinstance(result_dict, dict) else result_dict
+      formatted_output = sudo.read_file(err_file) + formatted_output
       err_msg = "Execution of '%s' returned status_code=%s. %s" % (shell.string_cmd_from_args_list(cmd), status_code, formatted_output)
       raise Fail(err_msg)
     

http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py b/ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py
index 46ec0e7..e893503 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/config_dictionary.py
@@ -19,27 +19,29 @@ limitations under the License.
 '''
 from resource_management.core.exceptions import Fail
 
+IMMUTABLE_MESSAGE = """Configuration dictionary is immutable!
+
+For adding dynamic properties to xml files please use {{varible_from_params}} substitutions.
+Lookup xml files for {{ for examples. 
+"""
+
 class ConfigDictionary(dict):
   """
   Immutable config dictionary
   """
   
-  def __init__(self, dictionary, allow_overwrite=False):
+  def __init__(self, dictionary):
     """
     Recursively turn dict to ConfigDictionary
     """
-    self.__allow_overwrite = allow_overwrite
     for k, v in dictionary.iteritems():
       if isinstance(v, dict):
-        dictionary[k] = ConfigDictionary(v, allow_overwrite=allow_overwrite)
+        dictionary[k] = ConfigDictionary(v)
         
     super(ConfigDictionary, self).__init__(dictionary)
 
   def __setitem__(self, name, value):
-    if self.__allow_overwrite:
-      super(ConfigDictionary, self).__setitem__(name, value)
-    else:
-      raise Fail("Configuration dictionary is immutable!")
+    raise Fail(IMMUTABLE_MESSAGE)
 
   def __getitem__(self, name):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
index 3494ac9..578d994 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
@@ -233,7 +233,10 @@ class NameNodeDefault(NameNode):
 
     parser = hdfs_rebalance.HdfsParser()
 
-    def handle_new_line(line):
+    def handle_new_line(line, is_stderr):
+      if is_stderr:
+        return
+      
       _print('[balancer] %s' % (line))
       pl = parser.parseLine(line)
       if pl:

http://git-wip-us.apache.org/repos/asf/ambari/blob/fa694609/ambari-server/src/test/python/custom_actions/TestInstallPackages.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/custom_actions/TestInstallPackages.py b/ambari-server/src/test/python/custom_actions/TestInstallPackages.py
index ecac6d4..763f9e2 100644
--- a/ambari-server/src/test/python/custom_actions/TestInstallPackages.py
+++ b/ambari-server/src/test/python/custom_actions/TestInstallPackages.py
@@ -22,6 +22,7 @@ import os
 import pty
 import socket
 import subprocess
+import select
 from resource_management import Script,ConfigDictionary
 from mock.mock import patch
 from mock.mock import MagicMock
@@ -33,9 +34,16 @@ from resource_management.core.resources.packaging import Package
 from resource_management.core.exceptions import Fail
 from ambari_commons.os_check import OSCheck
 
+subproc_mock = MagicMock()
+subproc_mock.return_value = MagicMock()
+subproc_stdout = MagicMock()
+subproc_mock.return_value.stdout = subproc_stdout
+
+@patch.object(os, "read", new=MagicMock(return_value=None))
+@patch.object(select, "select", new=MagicMock(return_value=([subproc_stdout], None, None)))
 @patch.object(pty, "openpty", new = MagicMock(return_value=(1,5)))
 @patch.object(os, "close", new=MagicMock())
-@patch.object(subprocess, "Popen", new=MagicMock())
+@patch.object(subprocess, "Popen", new=subproc_mock)
 class TestInstallPackages(RMFTestCase):
 
   @staticmethod