You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2016/03/22 14:46:11 UTC

ambari git commit: AMBARI-15492. Status commands fail randomly. (mpapirkovskyy)

Repository: ambari
Updated Branches:
  refs/heads/trunk 2925a6aa5 -> 0e4084c37


AMBARI-15492. Status commands fail randomly. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0e4084c3
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0e4084c3
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0e4084c3

Branch: refs/heads/trunk
Commit: 0e4084c37a68ec143af766cf1fd93330a899c650
Parents: 2925a6a
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Mon Mar 21 18:11:52 2016 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Tue Mar 22 15:36:08 2016 +0200

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |   3 +-
 .../ambari_agent/PythonReflectiveExecutor.py    |   5 +-
 .../resource_management/core/environment.py     |  21 +-
 .../libraries/providers/execute_hadoop.py       |  19 +-
 .../providers/modify_properties_file.py         |  15 +-
 .../libraries/providers/properties_file.py      |  13 +-
 .../libraries/providers/repository.py           | 150 ++++++------
 .../libraries/providers/template_config.py      |  13 +-
 .../libraries/providers/xml_config.py           |  15 +-
 ambari-server/src/test/python/TestUtils.py      |  52 ++---
 .../python/custom_actions/test_ru_set_all.py    | 230 +++++++++----------
 .../stacks/2.2/common/test_conf_select.py       |   5 +-
 12 files changed, 273 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 9530d4c..941bd68 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -94,7 +94,8 @@ class ActionQueue(threading.Thread):
     self.statusCommandQueue.queue.clear()
 
     for command in commands:
-      logger.info("Adding " + command['commandType'] + " for service " + \
+      logger.info("Adding " + command['commandType'] + " for component " + \
+                  command['componentName'] + " of service " + \
                   command['serviceName'] + " of cluster " + \
                   command['clusterName'] + " to the queue.")
       self.statusCommandQueue.put(command)

http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
index 2c42891..3808b3f 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
@@ -19,6 +19,7 @@ limitations under the License.
 '''
 
 from PythonExecutor import PythonExecutor
+from resource_management.core.exceptions import ClientComponentHasNoStatus, ComponentIsNotRunning
 
 import imp
 import sys
@@ -58,8 +59,10 @@ class PythonReflectiveExecutor(PythonExecutor):
       returncode = e.code
       if returncode:
         logger.debug("Reflective command failed with return_code=" + str(e))
-    except Exception: 
+    except (ClientComponentHasNoStatus, ComponentIsNotRunning):
       logger.debug("Reflective command failed with exception:", exc_info=1)
+    except Exception:
+      logger.info("Reflective command failed with exception:", exc_info=1)
     else: 
       returncode = 0
       

http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-common/src/main/python/resource_management/core/environment.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/core/environment.py b/ambari-common/src/main/python/resource_management/core/environment.py
index 9fd5c3a..b4e9b3a 100644
--- a/ambari-common/src/main/python/resource_management/core/environment.py
+++ b/ambari-common/src/main/python/resource_management/core/environment.py
@@ -27,6 +27,7 @@ import types
 import logging
 import shutil
 import time
+import threading
 from datetime import datetime
 
 from resource_management.core import shell
@@ -35,10 +36,12 @@ from resource_management.core.providers import find_provider
 from resource_management.core.utils import AttributeDictionary
 from resource_management.core.system import System
 from resource_management.core.logger import Logger
+from threading import Thread, local
 
+_local_data = local()
+_instance_name = 'instance'
 
 class Environment(object):
-  _instances = []
 
   def __init__(self, basedir=None, tmp_dir=None, test_mode=False, logger=None, logging_level=logging.INFO):
     """
@@ -134,7 +137,6 @@ class Environment(object):
     raise Exception("Unknown condition type %r" % cond) 
     
   def run(self):
-    with self:
       # Run resource actions
       while self.resource_list:
         resource = self.resource_list.pop(0)
@@ -170,7 +172,10 @@ class Environment(object):
 
   @classmethod
   def get_instance(cls):
-    return cls._instances[-1]
+    instance = getattr(_local_data, _instance_name, None)
+    if instance is None:
+      raise Exception("No Environment present for retrieving for thread %s" % threading.current_thread())
+    return instance
   
   @classmethod
   def get_instance_copy(cls):
@@ -184,11 +189,17 @@ class Environment(object):
     return new_instance
 
   def __enter__(self):
-    self.__class__._instances.append(self)
+    instance = getattr(_local_data, _instance_name, None)
+    if instance is not None:
+      raise Exception("Trying to enter to Environment from thread %s second time" % threading.current_thread())
+    setattr(_local_data, 'instance', self)
     return self
 
   def __exit__(self, exc_type, exc_val, exc_tb):
-    self.__class__._instances.pop()
+    instance = getattr(_local_data, _instance_name, None)
+    if instance is None:
+      raise Exception("Trying to exit from Environment without enter before for thread %s" % threading.current_thread())
+    setattr(_local_data, 'instance', None)
     return False
 
   def __getstate__(self):

http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-common/src/main/python/resource_management/libraries/providers/execute_hadoop.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/execute_hadoop.py b/ambari-common/src/main/python/resource_management/libraries/providers/execute_hadoop.py
index 7c1a49f..d831b6d 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/execute_hadoop.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/execute_hadoop.py
@@ -34,13 +34,12 @@ class ExecuteHadoopProvider(Provider):
     
     if isinstance(command, (list, tuple)):
       command = ' '.join(quote_bash_args(x) for x in command)
-    
-    with Environment.get_instance_copy() as env:
-      Execute (format("hadoop --config {conf_dir} {command}"),
-        user        = self.resource.user,
-        tries       = self.resource.tries,
-        try_sleep   = self.resource.try_sleep,
-        logoutput   = self.resource.logoutput,
-        path        = self.resource.bin_dir,
-        environment = self.resource.environment,
-      )
+
+    Execute (format("hadoop --config {conf_dir} {command}"),
+      user        = self.resource.user,
+      tries       = self.resource.tries,
+      try_sleep   = self.resource.try_sleep,
+      logoutput   = self.resource.logoutput,
+      path        = self.resource.bin_dir,
+      environment = self.resource.environment,
+    )

http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-common/src/main/python/resource_management/libraries/providers/modify_properties_file.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/modify_properties_file.py b/ambari-common/src/main/python/resource_management/libraries/providers/modify_properties_file.py
index 661cb68..53d7d2f 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/modify_properties_file.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/modify_properties_file.py
@@ -63,11 +63,10 @@ class ModifyPropertiesFileProvider(Provider):
       line = u"{0}{1}{2}".format(unicode(property_name), delimiter, value)
       new_content_lines.append(line)
           
-    with Environment.get_instance_copy() as env:
-      File (filename,
-            content = u"\n".join(new_content_lines) + "\n",
-            owner = self.resource.owner,
-            group = self.resource.group,
-            mode = self.resource.mode,
-            encoding = self.resource.encoding,
-      )
+    File (filename,
+          content = u"\n".join(new_content_lines) + "\n",
+          owner = self.resource.owner,
+          group = self.resource.group,
+          mode = self.resource.mode,
+          encoding = self.resource.encoding,
+    )

http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-common/src/main/python/resource_management/libraries/providers/properties_file.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/properties_file.py b/ambari-common/src/main/python/resource_management/libraries/providers/properties_file.py
index 53459dd..823f579 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/properties_file.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/properties_file.py
@@ -47,10 +47,9 @@ class PropertiesFileProvider(Provider):
 
     Logger.info(format("Generating properties file: {filepath}"))
 
-    with Environment.get_instance_copy() as env:
-      File (format("{filepath}"),
-            content = config_content,
-            owner = self.resource.owner,
-            group = self.resource.group,
-            mode = self.resource.mode
-      )
+    File (format("{filepath}"),
+          content = config_content,
+          owner = self.resource.owner,
+          group = self.resource.group,
+          mode = self.resource.mode
+    )

http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-common/src/main/python/resource_management/libraries/providers/repository.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/repository.py b/ambari-common/src/main/python/resource_management/libraries/providers/repository.py
index 5f0c089..f476e29 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/repository.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/repository.py
@@ -44,39 +44,37 @@ class RhelSuseRepositoryProvider(Provider):
   update_cmd = ['zypper', 'clean', '--all']
 
   def action_create(self):
-    with Environment.get_instance_copy() as env:
-      repo_file_name = self.resource.repo_file_name
-      repo_dir = get_repo_dir()
-      new_content = InlineTemplate(self.resource.repo_template, repo_id=self.resource.repo_id, repo_file_name=self.resource.repo_file_name,
-                             base_url=self.resource.base_url, mirror_list=self.resource.mirror_list)
-      repo_file_path = format("{repo_dir}/{repo_file_name}.repo")
-
-      if os.path.isfile(repo_file_path):
-        existing_content_str = sudo.read_file(repo_file_path)
-        new_content_str = new_content.get_content()
-        if existing_content_str != new_content_str and OSCheck.is_suse_family():
-          # We need to reset package manager's cache when we replace base urls
-          # at existing repo. That is a case at least under SLES
-          Logger.info("Flushing package manager cache since repo file content is about to change")
-          checked_call(self.update_cmd, sudo=True)
-        if self.resource.append_to_file:
-          content = existing_content_str + '\n' + new_content_str
-        else:
-          content = new_content_str
-      else: # If repo file does not exist yet
-        content = new_content
-
-      File(repo_file_path,
-           content=content
-      )
+    repo_file_name = self.resource.repo_file_name
+    repo_dir = get_repo_dir()
+    new_content = InlineTemplate(self.resource.repo_template, repo_id=self.resource.repo_id, repo_file_name=self.resource.repo_file_name,
+                           base_url=self.resource.base_url, mirror_list=self.resource.mirror_list)
+    repo_file_path = format("{repo_dir}/{repo_file_name}.repo")
+
+    if os.path.isfile(repo_file_path):
+      existing_content_str = sudo.read_file(repo_file_path)
+      new_content_str = new_content.get_content()
+      if existing_content_str != new_content_str and OSCheck.is_suse_family():
+        # We need to reset package manager's cache when we replace base urls
+        # at existing repo. That is a case at least under SLES
+        Logger.info("Flushing package manager cache since repo file content is about to change")
+        checked_call(self.update_cmd, sudo=True)
+      if self.resource.append_to_file:
+        content = existing_content_str + '\n' + new_content_str
+      else:
+        content = new_content_str
+    else: # If repo file does not exist yet
+      content = new_content
+
+    File(repo_file_path,
+         content=content
+    )
   
   def action_remove(self):
-    with Environment.get_instance_copy() as env:
-      repo_file_name = self.resource.repo_file_name
-      repo_dir = get_repo_dir()
+    repo_file_name = self.resource.repo_file_name
+    repo_dir = get_repo_dir()
 
-      File(format("{repo_dir}/{repo_file_name}.repo"),
-           action="delete")
+    File(format("{repo_dir}/{repo_file_name}.repo"),
+         action="delete")
     
   
 def get_repo_dir():
@@ -94,56 +92,54 @@ class UbuntuRepositoryProvider(Provider):
   app_pkey_cmd_prefix = ('apt-key', 'adv', '--recv-keys', '--keyserver', 'keyserver.ubuntu.com')
 
   def action_create(self):
-    with Environment.get_instance_copy() as env:
-      with tempfile.NamedTemporaryFile() as tmpf:
-        with tempfile.NamedTemporaryFile() as old_repo_tmpf:
-          repo_file_name = format("{repo_file_name}.list",repo_file_name=self.resource.repo_file_name)
-          repo_file_path = format("{repo_dir}/{repo_file_name}", repo_dir=self.repo_dir)
-  
-          new_content = InlineTemplate(self.resource.repo_template, package_type=self.package_type,
-                                        base_url=self.resource.base_url,
-                                        components=' '.join(self.resource.components)).get_content()
-          old_content = ''
-          if self.resource.append_to_file and os.path.isfile(repo_file_path):
-              old_content = sudo.read_file(repo_file_path) + '\n'
-  
-          File(tmpf.name, 
-               content=old_content+new_content
+    with tempfile.NamedTemporaryFile() as tmpf:
+      with tempfile.NamedTemporaryFile() as old_repo_tmpf:
+        repo_file_name = format("{repo_file_name}.list",repo_file_name=self.resource.repo_file_name)
+        repo_file_path = format("{repo_dir}/{repo_file_name}", repo_dir=self.repo_dir)
+
+        new_content = InlineTemplate(self.resource.repo_template, package_type=self.package_type,
+                                      base_url=self.resource.base_url,
+                                      components=' '.join(self.resource.components)).get_content()
+        old_content = ''
+        if self.resource.append_to_file and os.path.isfile(repo_file_path):
+            old_content = sudo.read_file(repo_file_path) + '\n'
+
+        File(tmpf.name, 
+             content=old_content+new_content
+        )
+        
+        if os.path.isfile(repo_file_path):
+          # a copy of old repo file, which will be readable by current user
+          File(old_repo_tmpf.name, 
+               content=StaticFile(repo_file_path),
+          )
+
+        if not os.path.isfile(repo_file_path) or not filecmp.cmp(tmpf.name, old_repo_tmpf.name):
+          File(repo_file_path,
+               content = StaticFile(tmpf.name)
           )
           
-          if os.path.isfile(repo_file_path):
-            # a copy of old repo file, which will be readable by current user
-            File(old_repo_tmpf.name, 
-                 content=StaticFile(repo_file_path),
-            )
-  
-          if not os.path.isfile(repo_file_path) or not filecmp.cmp(tmpf.name, old_repo_tmpf.name):
-            File(repo_file_path,
-                 content = StaticFile(tmpf.name)
+          update_cmd_formatted = [format(x) for x in self.update_cmd]
+          # this is time expensive
+          retcode, out = checked_call(update_cmd_formatted, sudo=True, quiet=False)
+          
+          # add public keys for new repos
+          missing_pkeys = set(re.findall(self.missing_pkey_regex, out))
+          for pkey in missing_pkeys:
+            Execute(self.app_pkey_cmd_prefix + (pkey,),
+                    timeout = 15, # in case we are on the host w/o internet (using localrepo), we should ignore hanging
+                    ignore_failures = True,
+                    sudo = True,
             )
-            
-            update_cmd_formatted = [format(x) for x in self.update_cmd]
-            # this is time expensive
-            retcode, out = checked_call(update_cmd_formatted, sudo=True, quiet=False)
-            
-            # add public keys for new repos
-            missing_pkeys = set(re.findall(self.missing_pkey_regex, out))
-            for pkey in missing_pkeys:
-              Execute(self.app_pkey_cmd_prefix + (pkey,),
-                      timeout = 15, # in case we are on the host w/o internet (using localrepo), we should ignore hanging
-                      ignore_failures = True,
-                      sudo = True,
-              )
   
   def action_remove(self):
-    with Environment.get_instance_copy() as env:
-      repo_file_name = format("{repo_file_name}.list", repo_file_name=self.resource.repo_file_name)
-      repo_file_path = format("{repo_dir}/{repo_file_name}", repo_dir=self.repo_dir)
+    repo_file_name = format("{repo_file_name}.list", repo_file_name=self.resource.repo_file_name)
+    repo_file_path = format("{repo_dir}/{repo_file_name}", repo_dir=self.repo_dir)
+    
+    if os.path.isfile(repo_file_path):
+      File(repo_file_path,
+           action = "delete")
       
-      if os.path.isfile(repo_file_path):
-        File(repo_file_path,
-             action = "delete")
-        
-        # this is time expensive
-        update_cmd_formatted = [format(x) for x in self.update_cmd]
-        Execute(update_cmd_formatted)
+      # this is time expensive
+      update_cmd_formatted = [format(x) for x in self.update_cmd]
+      Execute(update_cmd_formatted)

http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-common/src/main/python/resource_management/libraries/providers/template_config.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/template_config.py b/ambari-common/src/main/python/resource_management/libraries/providers/template_config.py
index 77ac749..f97e0d6 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/template_config.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/template_config.py
@@ -39,10 +39,9 @@ class TemplateConfigProvider(Provider):
     else:
       template_name = format("{file_name}-{template_tag}.j2")
 
-    with Environment.get_instance_copy() as env:
-      File( qualified_file_name,
-       owner   = self.resource.owner,
-       group   = self.resource.group,
-       mode    = self.resource.mode,
-       content = Template(template_name, extra_imports=self.resource.extra_imports)
-      )
+    File( qualified_file_name,
+     owner   = self.resource.owner,
+     group   = self.resource.group,
+     mode    = self.resource.mode,
+     content = Template(template_name, extra_imports=self.resource.extra_imports)
+    )

http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py b/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
index 887fc5f..c885d6e 100644
--- a/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
+++ b/ambari-common/src/main/python/resource_management/libraries/providers/xml_config.py
@@ -58,11 +58,10 @@ class XmlConfigProvider(Provider):
     xml_config_dest_file_path = os.path.join(xml_config_provider_config_dir, filename)
     Logger.info("Generating config: {0}".format(xml_config_dest_file_path))
 
-    with Environment.get_instance_copy() as env:
-      File (xml_config_dest_file_path,
-        content = config_content,
-        owner = self.resource.owner,
-        group = self.resource.group,
-        mode = self.resource.mode,
-        encoding = self.resource.encoding
-      )
+    File (xml_config_dest_file_path,
+      content = config_content,
+      owner = self.resource.owner,
+      group = self.resource.group,
+      mode = self.resource.mode,
+      encoding = self.resource.encoding
+    )

http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-server/src/test/python/TestUtils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/TestUtils.py b/ambari-server/src/test/python/TestUtils.py
index ee310b7..db94d92 100644
--- a/ambari-server/src/test/python/TestUtils.py
+++ b/ambari-server/src/test/python/TestUtils.py
@@ -202,33 +202,31 @@ class TestUtils(TestCase):
     from resource_management.core.environment import Environment
 
     env = Environment()
-    env._instances.append(env)
-
-
-    # declare some environment variables
-    env_params = {}
-    env_params["envfoo"] = "env-foo1"
-    env_params["envbar"] = "env-bar1"
-    env.config.params = env_params
-
-    # declare some local variables
-    foo = "foo1"
-    bar = "bar1"
-
-    # make sure local variables and env variables work
-    message = "{foo} {bar} {envfoo} {envbar}"
-    formatted_message = format(message)
-    self.assertEquals("foo1 bar1 env-foo1 env-bar1", formatted_message)
-
-    # try the same thing with an instance; we pass in keyword args to be
-    # combined with the env params
-    formatter = ConfigurationFormatter()
-    formatted_message = formatter.format(message, foo="foo2", bar="bar2")
-    self.assertEquals("foo2 bar2 env-foo1 env-bar1", formatted_message)
-
-    # now supply keyword args to override env params
-    formatted_message = formatter.format(message, envfoo="foobar", envbar="foobarbaz", foo="foo3", bar="bar3")
-    self.assertEquals("foo3 bar3 foobar foobarbaz", formatted_message)
+    with env:
+      # declare some environment variables
+      env_params = {}
+      env_params["envfoo"] = "env-foo1"
+      env_params["envbar"] = "env-bar1"
+      env.config.params = env_params
+
+      # declare some local variables
+      foo = "foo1"
+      bar = "bar1"
+
+      # make sure local variables and env variables work
+      message = "{foo} {bar} {envfoo} {envbar}"
+      formatted_message = format(message)
+      self.assertEquals("foo1 bar1 env-foo1 env-bar1", formatted_message)
+
+      # try the same thing with an instance; we pass in keyword args to be
+      # combined with the env params
+      formatter = ConfigurationFormatter()
+      formatted_message = formatter.format(message, foo="foo2", bar="bar2")
+      self.assertEquals("foo2 bar2 env-foo1 env-bar1", formatted_message)
+
+      # now supply keyword args to override env params
+      formatted_message = formatter.format(message, envfoo="foobar", envbar="foobarbaz", foo="foo3", bar="bar3")
+      self.assertEquals("foo3 bar3 foobar foobarbaz", formatted_message)
 
   def test_compare_versions(self):
     self.assertEquals(utils.compare_versions("1.7.0", "2.0.0"), -1)

http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-server/src/test/python/custom_actions/test_ru_set_all.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/custom_actions/test_ru_set_all.py b/ambari-server/src/test/python/custom_actions/test_ru_set_all.py
index d3adc5a..b2c2543 100644
--- a/ambari-server/src/test/python/custom_actions/test_ru_set_all.py
+++ b/ambari-server/src/test/python/custom_actions/test_ru_set_all.py
@@ -155,98 +155,97 @@ class TestRUSetAll(RMFTestCase):
     # required for the test to run since the Execute calls need this
     from resource_management.core.environment import Environment
     env = Environment(test_mode=True)
-    env._instances.append(env)
-
-    # Mock the config objects
-    json_file_path = os.path.join(self.get_custom_actions_dir(), "ru_execute_tasks_namenode_prepare.json")
-    self.assertTrue(os.path.isfile(json_file_path))
-    with open(json_file_path, "r") as json_file:
-      json_payload = json.load(json_file)
-
-    # alter JSON for a downgrade from 2.3 to 2.2
-    json_payload['commandParams']['version'] = "2.2.0.0-1234"
-    json_payload['commandParams']['downgrade_from_version'] = "2.3.0.0-1234"
-    json_payload['commandParams']['original_stack'] = "HDP-2.2"
-    json_payload['commandParams']['target_stack'] = "HDP-2.3"
-    json_payload['commandParams']['upgrade_direction'] = "downgrade"
-    json_payload['hostLevelParams']['stack_version'] = "2.2"
-
-    config_dict = ConfigDictionary(json_payload)
-
-    family_mock.return_value = True
-    get_config_mock.return_value = config_dict
-    call_mock.side_effect = fake_call   # echo the command
-
-    # test the function
-    ru_execute = UpgradeSetAll()
-    ru_execute.unlink_all_configs(None)
-
-    # verify that os.path.islink was called for each conf
-    self.assertTrue(islink_mock.called)
-    for key, value in conf_select.get_package_dirs().iteritems():
-      for directory_mapping in value:
-        original_config_directory = directory_mapping['conf_dir']
-        is_link_called = False
-
-        for call in islink_mock.call_args_list:
-          call_tuple = call[0]
-          if original_config_directory in call_tuple:
-            is_link_called = True
-
-        if not is_link_called:
-          self.fail("os.path.islink({0}) was never called".format(original_config_directory))
-
-    # alter JSON for a downgrade from 2.3 to 2.3
-    with open(json_file_path, "r") as json_file:
-      json_payload = json.load(json_file)
-
-    json_payload['commandParams']['version'] = "2.3.0.0-1234"
-    json_payload['commandParams']['downgrade_from_version'] = "2.3.0.0-5678"
-    json_payload['commandParams']['original_stack'] = "HDP-2.3"
-    json_payload['commandParams']['target_stack'] = "HDP-2.3"
-    json_payload['commandParams']['upgrade_direction'] = "downgrade"
-    json_payload['hostLevelParams']['stack_version'] = "2.3"
-
-    # reset config
-    config_dict = ConfigDictionary(json_payload)
-    family_mock.return_value = True
-    get_config_mock.return_value = config_dict
-
-    # reset mock
-    islink_mock.reset_mock()
-
-    # test the function
-    ru_execute = UpgradeSetAll()
-    ru_execute.unlink_all_configs(None)
-
-    # ensure it wasn't called this time
-    self.assertFalse(islink_mock.called)
-
-    with open(json_file_path, "r") as json_file:
-      json_payload = json.load(json_file)
-
-    # alter JSON for a downgrade from 2.2 to 2.2
-    json_payload['commandParams']['version'] = "2.2.0.0-1234"
-    json_payload['commandParams']['downgrade_from_version'] = "2.2.0.0-5678"
-    json_payload['commandParams']['original_stack'] = "HDP-2.2"
-    json_payload['commandParams']['target_stack'] = "HDP-2.2"
-    json_payload['commandParams']['upgrade_direction'] = "downgrade"
-    json_payload['hostLevelParams']['stack_version'] = "2.2"
-
-    # reset config
-    config_dict = ConfigDictionary(json_payload)
-    family_mock.return_value = True
-    get_config_mock.return_value = config_dict
-
-    # reset mock
-    islink_mock.reset_mock()
-
-    # test the function
-    ru_execute = UpgradeSetAll()
-    ru_execute.unlink_all_configs(None)
-
-    # ensure it wasn't called this time
-    self.assertFalse(islink_mock.called)
+    with env:
+      # Mock the config objects
+      json_file_path = os.path.join(self.get_custom_actions_dir(), "ru_execute_tasks_namenode_prepare.json")
+      self.assertTrue(os.path.isfile(json_file_path))
+      with open(json_file_path, "r") as json_file:
+        json_payload = json.load(json_file)
+
+      # alter JSON for a downgrade from 2.3 to 2.2
+      json_payload['commandParams']['version'] = "2.2.0.0-1234"
+      json_payload['commandParams']['downgrade_from_version'] = "2.3.0.0-1234"
+      json_payload['commandParams']['original_stack'] = "HDP-2.2"
+      json_payload['commandParams']['target_stack'] = "HDP-2.3"
+      json_payload['commandParams']['upgrade_direction'] = "downgrade"
+      json_payload['hostLevelParams']['stack_version'] = "2.2"
+
+      config_dict = ConfigDictionary(json_payload)
+
+      family_mock.return_value = True
+      get_config_mock.return_value = config_dict
+      call_mock.side_effect = fake_call   # echo the command
+
+      # test the function
+      ru_execute = UpgradeSetAll()
+      ru_execute.unlink_all_configs(None)
+
+      # verify that os.path.islink was called for each conf
+      self.assertTrue(islink_mock.called)
+      for key, value in conf_select.get_package_dirs().iteritems():
+        for directory_mapping in value:
+          original_config_directory = directory_mapping['conf_dir']
+          is_link_called = False
+
+          for call in islink_mock.call_args_list:
+            call_tuple = call[0]
+            if original_config_directory in call_tuple:
+              is_link_called = True
+
+          if not is_link_called:
+            self.fail("os.path.islink({0}) was never called".format(original_config_directory))
+
+      # alter JSON for a downgrade from 2.3 to 2.3
+      with open(json_file_path, "r") as json_file:
+        json_payload = json.load(json_file)
+
+      json_payload['commandParams']['version'] = "2.3.0.0-1234"
+      json_payload['commandParams']['downgrade_from_version'] = "2.3.0.0-5678"
+      json_payload['commandParams']['original_stack'] = "HDP-2.3"
+      json_payload['commandParams']['target_stack'] = "HDP-2.3"
+      json_payload['commandParams']['upgrade_direction'] = "downgrade"
+      json_payload['hostLevelParams']['stack_version'] = "2.3"
+
+      # reset config
+      config_dict = ConfigDictionary(json_payload)
+      family_mock.return_value = True
+      get_config_mock.return_value = config_dict
+
+      # reset mock
+      islink_mock.reset_mock()
+
+      # test the function
+      ru_execute = UpgradeSetAll()
+      ru_execute.unlink_all_configs(None)
+
+      # ensure it wasn't called this time
+      self.assertFalse(islink_mock.called)
+
+      with open(json_file_path, "r") as json_file:
+        json_payload = json.load(json_file)
+
+      # alter JSON for a downgrade from 2.2 to 2.2
+      json_payload['commandParams']['version'] = "2.2.0.0-1234"
+      json_payload['commandParams']['downgrade_from_version'] = "2.2.0.0-5678"
+      json_payload['commandParams']['original_stack'] = "HDP-2.2"
+      json_payload['commandParams']['target_stack'] = "HDP-2.2"
+      json_payload['commandParams']['upgrade_direction'] = "downgrade"
+      json_payload['hostLevelParams']['stack_version'] = "2.2"
+
+      # reset config
+      config_dict = ConfigDictionary(json_payload)
+      family_mock.return_value = True
+      get_config_mock.return_value = config_dict
+
+      # reset mock
+      islink_mock.reset_mock()
+
+      # test the function
+      ru_execute = UpgradeSetAll()
+      ru_execute.unlink_all_configs(None)
+
+      # ensure it wasn't called this time
+      self.assertFalse(islink_mock.called)
 
   @patch("os.path.isdir")
   @patch("os.path.islink")
@@ -255,30 +254,29 @@ class TestRUSetAll(RMFTestCase):
     # required for the test to run since the Execute calls need this
     from resource_management.core.environment import Environment
     env = Environment(test_mode=True)
-    env._instances.append(env)
-
-    # Case: missing backup directory
-    isdir_mock.return_value = False
-    ru_execute = UpgradeSetAll()
-    self.assertEqual(len(env.resource_list), 0)
-    # Case: missing symlink
-    isdir_mock.reset_mock()
-    isdir_mock.return_value = True
-    islink_mock.return_value = False
-    ru_execute._unlink_config("/fake/config")
-    self.assertEqual(len(env.resource_list), 2)
-    # Case: missing symlink
-    isdir_mock.reset_mock()
-    isdir_mock.return_value = True
-    islink_mock.reset_mock()
-    islink_mock.return_value = True
-
-    ru_execute._unlink_config("/fake/config")
-    self.assertEqual(pprint.pformat(env.resource_list),
-                     "[Directory['/fake/config'],\n "
-                     "Execute[('mv', '/fake/conf.backup', '/fake/config')],\n "
-                     "Execute[('rm', '/fake/config')],\n "
-                     "Execute[('mv', '/fake/conf.backup', '/fake/config')]]")
+    with env:
+      # Case: missing backup directory
+      isdir_mock.return_value = False
+      ru_execute = UpgradeSetAll()
+      self.assertEqual(len(env.resource_list), 0)
+      # Case: missing symlink
+      isdir_mock.reset_mock()
+      isdir_mock.return_value = True
+      islink_mock.return_value = False
+      ru_execute._unlink_config("/fake/config")
+      self.assertEqual(len(env.resource_list), 2)
+      # Case: missing symlink
+      isdir_mock.reset_mock()
+      isdir_mock.return_value = True
+      islink_mock.reset_mock()
+      islink_mock.return_value = True
+
+      ru_execute._unlink_config("/fake/config")
+      self.assertEqual(pprint.pformat(env.resource_list),
+                       "[Directory['/fake/config'],\n "
+                       "Execute[('mv', '/fake/conf.backup', '/fake/config')],\n "
+                       "Execute[('rm', '/fake/config')],\n "
+                       "Execute[('mv', '/fake/conf.backup', '/fake/config')]]")
 
   @patch("os.path.exists")
   @patch("os.path.islink")

http://git-wip-us.apache.org/repos/asf/ambari/blob/0e4084c3/ambari-server/src/test/python/stacks/2.2/common/test_conf_select.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.2/common/test_conf_select.py b/ambari-server/src/test/python/stacks/2.2/common/test_conf_select.py
index 20f00e8..934117c 100644
--- a/ambari-server/src/test/python/stacks/2.2/common/test_conf_select.py
+++ b/ambari-server/src/test/python/stacks/2.2/common/test_conf_select.py
@@ -30,7 +30,10 @@ class TestConfSelect(RMFTestCase):
     # required for the test to run since the Execute calls need this
     from resource_management.core.environment import Environment
     self.env = Environment(test_mode=True)
-    self.env._instances.append(self.env)
+    self.env.__enter__()
+
+  def tearDown(self):
+    self.env.__exit__(None,None,None)
 
 
   @patch("resource_management.libraries.functions.conf_select._valid", new = MagicMock(return_value=True))