You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2014/02/18 22:06:33 UTC

[2/2] git commit: AMBARI-4481. Add to the agent ability to download service scripts and hooks (dlysnichenko)

AMBARI-4481. Add to the agent ability to download service scripts and hooks (dlysnichenko)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/02f9c453
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/02f9c453
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/02f9c453

Branch: refs/heads/trunk
Commit: 02f9c45311ee6f6e7d87fe07b0059ad43a265782
Parents: f2c18bc
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Fri Jan 31 15:33:57 2014 +0200
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Tue Feb 18 23:03:50 2014 +0200

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |   4 +-
 ambari-agent/pom.xml                            |   3 +-
 .../src/main/python/ambari_agent/ActionQueue.py |   4 +-
 .../src/main/python/ambari_agent/Controller.py  |  15 +-
 .../ambari_agent/CustomServiceOrchestrator.py   |  18 +-
 .../src/main/python/ambari_agent/FileCache.py   | 211 ++++++++++--
 .../test/python/ambari_agent/TestActionQueue.py |  44 ++-
 .../test/python/ambari_agent/TestController.py  |  27 +-
 .../TestCustomServiceOrchestrator.py            |  66 +++-
 .../test/python/ambari_agent/TestFileCache.py   | 320 +++++++++++++++++--
 .../test/python/ambari_agent/TestHeartbeat.py   |  35 +-
 .../ambari_agent/dummy_files/dummy_archive.zip  | Bin 0 -> 29558 bytes
 ambari-server/pom.xml                           |  16 +-
 .../ambari/server/agent/HeartbeatMonitor.java   |   6 +-
 .../AmbariCustomCommandExecutionHelper.java     |   4 +-
 .../ambari/server/controller/AmbariServer.java  |   4 +-
 ambari-server/src/main/python/ambari-server.py  |  29 +-
 .../src/main/python/ambari_server/__init__.py   |  21 ++
 .../python/ambari_server/resourceFilesKeeper.py | 258 +++++++++++++++
 .../src/test/python/TestAmbariServer.py         |  20 +-
 .../src/test/python/TestResourceFilesKeeper.py  | 319 ++++++++++++++++++
 .../active_stack/metainfo.xml                   |  23 ++
 .../HIVE/configuration/hive-site.xml            | 267 ++++++++++++++++
 .../dummy_stack/HIVE/metainfo.xml               |  47 +++
 .../dummy_stack/HIVE/package/.hash              |   1 +
 .../HIVE/package/files/addMysqlUser.sh          |  41 +++
 .../dummy_stack/HIVE/package/files/hcatSmoke.sh |  35 ++
 .../dummy_stack/HIVE/package/files/hiveSmoke.sh |  23 ++
 .../HIVE/package/files/hiveserver2.sql          |  23 ++
 .../HIVE/package/files/hiveserver2Smoke.sh      |  31 ++
 .../dummy_stack/HIVE/package/files/pigSmoke.sh  |  18 ++
 .../HIVE/package/files/startHiveserver2.sh      |  22 ++
 .../HIVE/package/files/startMetastore.sh        |  22 ++
 .../HIVE/package/scripts/__init__.py            |  19 ++
 .../dummy_stack/HIVE/package/scripts/hcat.py    |  47 +++
 .../HIVE/package/scripts/hcat_client.py         |  43 +++
 .../HIVE/package/scripts/hcat_service_check.py  |  63 ++++
 .../dummy_stack/HIVE/package/scripts/hive.py    | 122 +++++++
 .../HIVE/package/scripts/hive_client.py         |  41 +++
 .../HIVE/package/scripts/hive_metastore.py      |  63 ++++
 .../HIVE/package/scripts/hive_server.py         |  63 ++++
 .../HIVE/package/scripts/hive_service.py        |  56 ++++
 .../HIVE/package/scripts/mysql_server.py        |  77 +++++
 .../HIVE/package/scripts/mysql_service.py       |  44 +++
 .../dummy_stack/HIVE/package/scripts/params.py  | 123 +++++++
 .../HIVE/package/scripts/service_check.py       |  56 ++++
 .../HIVE/package/scripts/status_params.py       |  30 ++
 .../HIVE/package/templates/hcat-env.sh.j2       |  25 ++
 .../HIVE/package/templates/hive-env.sh.j2       |  55 ++++
 .../inactive_stack/metainfo.xml                 |  23 ++
 50 files changed, 2807 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 11d6f1a..27eb6e1 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -25,6 +25,7 @@ data_cleanup_interval=86400
 data_cleanup_max_age=2592000
 ping_port=8670
 cache_dir=/var/lib/ambari-agent/cache
+tolerate_download_failures=true
 
 [puppet]
 puppetmodules=/var/lib/ambari-agent/puppet
@@ -32,9 +33,6 @@ ruby_home=/usr/lib/ambari-agent/lib/ruby-1.8.7-p370
 puppet_home=/usr/lib/ambari-agent/lib/puppet-2.7.9
 facter_home=/usr/lib/ambari-agent/lib/facter-1.6.10
 
-[python]
-custom_actions_dir = /var/lib/ambari-agent/resources/custom_actions
-
 [command]
 maxretries=2
 sleepBetweenRetries=1

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml
index d037d46..12a9910 100644
--- a/ambari-agent/pom.xml
+++ b/ambari-agent/pom.xml
@@ -336,7 +336,6 @@
               </sources>
             </mapping>
             <mapping>
-              <!-- TODO: Remove when we introduce metadata downloading by agent-->
               <directory>/var/lib/ambari-agent/cache/stacks</directory>
               <sources>
                 <source>
@@ -346,7 +345,7 @@
             </mapping>
             <mapping>
               <!-- custom actions root-->
-              <directory>/var/lib/ambari-agent/resources/custom_actions</directory>
+              <directory>/var/lib/ambari-agent/cache/custom_actions</directory>
               <filemode>755</filemode>
               <username>root</username>
               <groupname>root</groupname>

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 942cc75..731ac54 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -71,7 +71,9 @@ class ActionQueue(threading.Thread):
     self.sh = shellRunner()
     self._stop = threading.Event()
     self.tmpdir = config.get('agent', 'prefix')
-    self.customServiceOrchestrator = CustomServiceOrchestrator(config)
+    self.customServiceOrchestrator = CustomServiceOrchestrator(config,
+                                                               controller)
+
 
   def stop(self):
     self._stop.set()

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index b842b4d..26985ef 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -54,7 +54,8 @@ class Controller(threading.Thread):
     self.credential = None
     self.config = config
     self.hostname = hostname.hostname()
-    server_secured_url = 'https://' + config.get('server', 'hostname') + ':' + config.get('server', 'secured_url_port')
+    server_secured_url = 'https://' + config.get('server', 'hostname') + \
+                         ':' + config.get('server', 'secured_url_port')
     self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
     self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
     self.netutil = NetUtil()
@@ -67,14 +68,15 @@ class Controller(threading.Thread):
     # Event is used for synchronizing heartbeat iterations (to make possible
     # manual wait() interruption between heartbeats )
     self.heartbeat_wait_event = threading.Event()
+    # List of callbacks that are called at agent registration
+    self.registration_listeners = []
+
 
   def __del__(self):
     logger.info("Server connection disconnected.")
     pass
   
   def registerWithServer(self):
-    retry=False
-    firstTime=True
     id = -1
     ret = {}
 
@@ -257,8 +259,11 @@ class Controller(threading.Thread):
     message = registerResponse['response']
     logger.info("Response from server = " + message)
     if self.isRegistered:
-     time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
-     self.heartbeatWithServer()
+      # Process callbacks
+      for callback in self.registration_listeners:
+        callback()
+      time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
+      self.heartbeatWithServer()
 
   def restartAgent(self):
     os._exit(AGENT_AUTO_RESTART_EXIT_CODE)

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 95ad2cd..7436d26 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -48,7 +48,7 @@ class CustomServiceOrchestrator():
   PRE_HOOK_PREFIX="before"
   POST_HOOK_PREFIX="after"
 
-  def __init__(self, config):
+  def __init__(self, config, controller):
     self.config = config
     self.tmp_dir = config.get('agent', 'prefix')
     self.file_cache = FileCache(config)
@@ -57,6 +57,8 @@ class CustomServiceOrchestrator():
                                                'status_command_stdout.txt')
     self.status_commands_stderr = os.path.join(self.tmp_dir,
                                                'status_command_stderr.txt')
+    # cache reset will be called on every agent registration
+    controller.registration_listeners.append(self.file_cache.reset)
     # Clean up old status command files if any
     try:
       os.unlink(self.status_commands_stdout)
@@ -72,15 +74,10 @@ class CustomServiceOrchestrator():
     command json, is ignored.
     """
     try:
-      try:
-        component_name = command['role']
-      except KeyError:
-        # For status commands and (maybe) custom actions component name
-        # is stored at another location
-        component_name = command['componentName']
       script_type = command['commandParams']['script_type']
       script = command['commandParams']['script']
       timeout = int(command['commandParams']['command_timeout'])
+      server_url_prefix = command['hostLevelParams']['jdk_location']
       task_id = "status"
       try:
         task_id = command['taskId']
@@ -92,15 +89,14 @@ class CustomServiceOrchestrator():
         command_name = forsed_command_name
 
       if command_name == self.CUSTOM_ACTION_COMMAND:
-        base_dir = self.config.get('python', 'custom_actions_dir')
+        base_dir = self.file_cache.get_custom_actions_base_dir(server_url_prefix)
         script_tuple = (os.path.join(base_dir, script) , base_dir)
         hook_dir = None
       else:
         if command_name == self.CUSTOM_COMMAND_COMMAND:
           command_name = command['hostLevelParams']['custom_command']
-        hook_dir = self.file_cache.get_hook_base_dir(command)
-        service_subpath = command['commandParams']['service_package_folder']
-        base_dir = self.file_cache.get_service_base_dir(service_subpath)
+        hook_dir = self.file_cache.get_hook_base_dir(command, server_url_prefix)
+        base_dir = self.file_cache.get_service_base_dir(command, server_url_prefix)
         script_path = self.resolve_script_path(base_dir, script, script_type)
         script_tuple = (script_path, base_dir)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/main/python/ambari_agent/FileCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/FileCache.py b/ambari-agent/src/main/python/ambari_agent/FileCache.py
index 01d2e52..6b307f1 100644
--- a/ambari-agent/src/main/python/ambari_agent/FileCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/FileCache.py
@@ -17,18 +17,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 '''
-
+import StringIO
 
 import logging
-import Queue
-import threading
-import pprint
 import os
-import json
-from AgentException import AgentException
+import shutil
+import zipfile
+import urllib2
 
 logger = logging.getLogger()
 
+class CachingException(Exception):
+  pass
+
 class FileCache():
   """
   Provides caching and lookup for service metadata files.
@@ -36,28 +37,40 @@ class FileCache():
   downloads relevant files from the server.
   """
 
+  STACKS_CACHE_DIRECTORY="stacks"
+  CUSTOM_ACTIONS_CACHE_DIRECTORY="custom_actions"
+  HASH_SUM_FILE=".hash"
+  ARCHIVE_NAME="archive.zip"
+
+  BLOCK_SIZE=1024*16
+  SOCKET_TIMEOUT=10
+
   def __init__(self, config):
     self.service_component_pool = {}
     self.config = config
     self.cache_dir = config.get('agent', 'cache_dir')
+    # Defines whether command should fail when downloading scripts
+    # from the server is not possible or agent should rollback to local copy
+    self.tolerate_download_failures = \
+          config.get('agent','tolerate_download_failures').lower() == 'true'
+    self.reset()
+
+
+  def reset(self):
+    self.uptodate_paths = [] # Paths that already have been recently checked
 
 
-  def get_service_base_dir(self, service_subpath):
+  def get_service_base_dir(self, command, server_url_prefix):
     """
     Returns a base directory for service
     """
-    service_base_dir = os.path.join(self.cache_dir, "stacks", service_subpath)
-    if not os.path.isdir(service_base_dir):
-      # TODO: Metadata downloading will be implemented at Phase 2
-      # As of now, all stack definitions are packaged and distributed with
-      # agent rpm
-      message = "Service base dir not found at expected location {0}".\
-        format(service_base_dir)
-      raise AgentException(message)
-    return service_base_dir
+    service_subpath = command['commandParams']['service_package_folder']
+    subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, service_subpath)
+    return self.provide_directory(self.cache_dir, subpath,
+                                  server_url_prefix)
 
 
-  def get_hook_base_dir(self, command):
+  def get_hook_base_dir(self, command, server_url_prefix):
     """
     Returns a base directory for hooks
     """
@@ -65,13 +78,159 @@ class FileCache():
       hooks_subpath = command['commandParams']['hooks_folder']
     except KeyError:
       return None
-    hook_base_path = os.path.join(self.cache_dir, "stacks", hooks_subpath)
-    if not os.path.isdir(hook_base_path):
-      # TODO: Metadata downloading will be implemented at Phase 2
-      # As of now, all stack definitions are packaged and distributed with
-      # agent rpm
-      message = "Hook scripts dir for not found at " \
-                "expected location {0}".format(hook_base_path)
-      raise AgentException(message)
-    return hook_base_path
+    subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, hooks_subpath)
+    return self.provide_directory(self.cache_dir, subpath,
+                                  server_url_prefix)
+
+
+  def get_custom_actions_base_dir(self, server_url_prefix):
+    """
+    Returns a base directory for custom action scripts
+    """
+    return self.provide_directory(self.cache_dir,
+                                  self.CUSTOM_ACTIONS_CACHE_DIRECTORY,
+                                  server_url_prefix)
+
 
+  def provide_directory(self, cache_path, subdirectory, server_url_prefix):
+    """
+    Ensures that directory at cache is up-to-date. Throws a CachingException
+    if any problems occur
+    Parameters;
+      cache_path: full path to cache directory
+      subdirectory: subpath inside cache
+      server_url_prefix: url of "resources" folder at the server
+    """
+    full_path = os.path.join(cache_path, subdirectory)
+    logger.debug("Trying to provide directory {0}".format(subdirectory))
+    try:
+      if full_path not in self.uptodate_paths:
+        logger.debug("Checking if update is available for "
+                     "directory {0}".format(full_path))
+        # Need to check for updates at server
+        remote_url = self.build_download_url(server_url_prefix,
+                                             subdirectory, self.HASH_SUM_FILE)
+        memory_buffer = self.fetch_url(remote_url)
+        remote_hash = memory_buffer.getvalue().strip()
+        local_hash = self.read_hash_sum(full_path)
+        if not local_hash or local_hash != remote_hash:
+          logger.debug("Updating directory {0}".format(full_path))
+          download_url = self.build_download_url(server_url_prefix,
+                                                 subdirectory, self.ARCHIVE_NAME)
+          membuffer = self.fetch_url(download_url)
+          self.invalidate_directory(full_path)
+          self.unpack_archive(membuffer, full_path)
+          self.write_hash_sum(full_path, remote_hash)
+        # Finally consider cache directory up-to-date
+        self.uptodate_paths.append(full_path)
+    except CachingException, e:
+      if self.tolerate_download_failures:
+        # ignore
+        logger.warn("Error occured during cache update. "
+                    "Error tolerate setting is set to true, so"
+                    " ignoring this error and continuing with current cache. "
+                    "Error details: {0}".format(str(e)))
+      else:
+        raise # we are not tolerant to exceptions, command execution will fail
+    return full_path
+
+
+  def build_download_url(self, server_url_prefix,
+                         directory, filename):
+    """
+    Builds up a proper download url for file. Used for downloading files
+    from the server.
+    directory - relative path
+    filename - file inside directory we are trying to fetch
+    """
+    return "{0}/{1}/{2}".format(server_url_prefix,
+                                    directory, filename)
+
+
+  def fetch_url(self, url):
+    """
+    Fetches content on url to in-memory buffer and returns the resulting buffer.
+    May throw exceptions because of various reasons
+    """
+    logger.debug("Trying to download {0}".format(url))
+    try:
+      memory_buffer = StringIO.StringIO()
+      u = urllib2.urlopen(url, timeout=self.SOCKET_TIMEOUT)
+      logger.debug("Connected with {0} with code {1}".format(u.geturl(),
+                                                             u.getcode()))
+      buff = u.read(self.BLOCK_SIZE)
+      while buff:
+        memory_buffer.write(buff)
+        buff = u.read(self.BLOCK_SIZE)
+        if not buff:
+          break
+      return memory_buffer
+    except Exception, err:
+      raise CachingException("Can not download file from"
+                             " url {0} : {1}".format(url, str(err)))
+
+
+  def read_hash_sum(self, directory):
+    """
+    Tries to read a hash sum from previously generated file. Returns string
+    containing hash or None
+    """
+    hash_file = os.path.join(directory, self.HASH_SUM_FILE)
+    try:
+      with open(hash_file) as fh:
+        return fh.readline().strip()
+    except:
+      return None # We don't care
+
+
+  def write_hash_sum(self, directory, new_hash):
+    """
+    Tries to read a hash sum from previously generated file. Returns string
+    containing hash or None
+    """
+    hash_file = os.path.join(directory, self.HASH_SUM_FILE)
+    try:
+      with open(hash_file, "w") as fh:
+        fh.write(new_hash)
+    except Exception, err:
+      raise CachingException("Can not write to file {0} : {1}".format(hash_file,
+                                                                 str(err)))
+
+
+  def invalidate_directory(self, directory):
+    """
+    Recursively removes directory content (if any). Also, creates
+    directory and any parent directories if needed. May throw exceptions
+    on permission problems
+    """
+    logger.debug("Invalidating directory {0}".format(directory))
+    try:
+      if os.path.isfile(directory): # It would be a strange situation
+        os.unlink(directory)
+      elif os.path.isdir(directory):
+        shutil.rmtree(directory)
+      # create directory itself and any parent directories
+      os.makedirs(directory)
+    except Exception, err:
+      raise CachingException("Can not invalidate cache directory {0}: {1}",
+                             directory, str(err))
+
+
+  def unpack_archive(self, mem_buffer, target_directory):
+    """
+    Unpacks contents of in-memory buffer to file system.
+    In-memory buffer is expected to contain a valid zip archive
+    """
+    try:
+      zfile = zipfile.ZipFile(mem_buffer)
+      for name in zfile.namelist():
+        (dirname, filename) = os.path.split(name)
+        concrete_dir=os.path.abspath(os.path.join(target_directory, dirname))
+        if not os.path.isdir(concrete_dir):
+          os.makedirs(concrete_dir)
+        logger.debug("Unpacking file {0} to {1}".format(name, concrete_dir))
+        zfile.extract(name, target_directory)
+    except Exception, err:
+      raise CachingException("Can not unpack zip file to "
+                             "directory {0} : {1}".format(
+                            target_directory, str(err)))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 1918641..abc6edc 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -143,8 +143,13 @@ class TestActionQueue(TestCase):
 
   @patch.object(ActionQueue, "process_command")
   @patch.object(Queue, "get")
-  def test_ActionQueueStartStop(self, get_mock, process_command_mock):
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
+                                get_mock, process_command_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
     actionQueue.start()
     time.sleep(0.1)
     actionQueue.stop()
@@ -158,7 +163,8 @@ class TestActionQueue(TestCase):
   @patch.object(ActionQueue, "execute_status_command")
   def test_process_command(self, execute_status_command_mock,
                            execute_command_mock, print_exc_mock):
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
     execution_command = {
       'commandType' : ActionQueue.EXECUTION_COMMAND,
     }
@@ -228,7 +234,10 @@ class TestActionQueue(TestCase):
     config = AmbariConfig().getConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
-    actionQueue = ActionQueue(config, 'dummy_controller')
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
     unfreeze_flag = threading.Event()
     puppet_execution_result_dict = {
       'stdout': 'out',
@@ -388,7 +397,10 @@ class TestActionQueue(TestCase):
     config = AmbariConfig().getConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
-    actionQueue = ActionQueue(config, 'dummy_controller')
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
     actionQueue.execute_command(self.datanode_restart_command)
     report = actionQueue.result()
     expected = {'actionId': '1-1',
@@ -414,11 +426,15 @@ class TestActionQueue(TestCase):
   @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(LiveStatus, "build")
-  def test_execute_status_command(self, build_mock, execute_command_mock,
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_status_command(self, CustomServiceOrchestrator_mock,
+                                  build_mock, execute_command_mock,
                                   requestComponentStatus_mock, read_stack_version_mock,
                                   determine_command_format_version_mock,
                                   status_update_callback):
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
 
     build_mock.return_value = "dummy report"
     # Check execution ov V1 status command
@@ -441,7 +457,10 @@ class TestActionQueue(TestCase):
     self.assertTrue(requestComponentStatus_mock.called)
 
 
-  def test_determine_command_format_version(self):
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_determine_command_format_version(self,
+                                            CustomServiceOrchestrator_mock):
+    CustomServiceOrchestrator_mock.return_value = None
     v1_command = {
       'commandParams': {
         'schema_version': '1.0'
@@ -455,7 +474,8 @@ class TestActionQueue(TestCase):
     current_command = {
       # Absent 'commandParams' section
     }
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
     self.assertEqual(actionQueue.determine_command_format_version(v1_command),
                      ActionQueue.COMMAND_FORMAT_V1)
     self.assertEqual(actionQueue.determine_command_format_version(v2_command),
@@ -469,12 +489,16 @@ class TestActionQueue(TestCase):
   @patch.object(PuppetExecutor, "runCommand")
   @patch.object(CustomServiceOrchestrator, "runCommand")
   @patch.object(ActionQueue, "status_update_callback")
+  @patch.object(CustomServiceOrchestrator, "__init__")
   def test_command_execution_depending_on_command_format(self,
+                                CustomServiceOrchestrator_mock,
                                 status_update_callback_mock,
                                 custom_ex_runCommand_mock,
                                 puppet_runCommand_mock, open_mock,
                                 determine_command_format_version_mock):
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
     ret = {
       'stdout' : '',
       'stderr' : '',

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index b4439e9..1e110da 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -209,7 +209,32 @@ class TestController(unittest.TestCase):
     heartbeatWithServer = MagicMock(name="heartbeatWithServer")
     self.controller.heartbeatWithServer = heartbeatWithServer
 
-    self.controller.isRegistered = True;
+    listener1 = MagicMock()
+    listener2 = MagicMock()
+    self.controller.registration_listeners.append(listener1)
+    self.controller.registration_listeners.append(listener2)
+    self.controller.isRegistered = True
+    self.controller.registerAndHeartbeat()
+    registerWithServer.assert_called_once_with()
+    heartbeatWithServer.assert_called_once_with()
+    self.assertTrue(listener1.called)
+    self.assertTrue(listener2.called)
+
+    self.controller.registerWithServer = \
+      Controller.Controller.registerWithServer
+    self.controller.heartbeatWithServer = \
+      Controller.Controller.registerWithServer
+
+
+  @patch("time.sleep")
+  def test_registerAndHeartbeat_check_registration_listener(self, sleepMock):
+    registerWithServer = MagicMock(name="registerWithServer")
+    registerWithServer.return_value = {"response":"resp"}
+    self.controller.registerWithServer = registerWithServer
+    heartbeatWithServer = MagicMock(name="heartbeatWithServer")
+    self.controller.heartbeatWithServer = heartbeatWithServer
+
+    self.controller.isRegistered = True
     self.controller.registerAndHeartbeat()
     registerWithServer.assert_called_once_with()
     heartbeatWithServer.assert_called_once_with()

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index 971048b..54c17a6 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -56,11 +56,26 @@ class TestCustomServiceOrchestrator(TestCase):
     self.config.set('python', 'custom_actions_dir', tmpdir)
 
 
+  @patch.object(FileCache, "__init__")
+  def test_add_reg_listener_to_controller(self, FileCache_mock):
+    FileCache_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = AmbariConfig().getConfig()
+    tempdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tempdir)
+    CustomServiceOrchestrator(config, dummy_controller)
+    self.assertTrue(dummy_controller.registration_listeners.append.called)
+
+
   @patch.object(manifestGenerator, 'decompressClusterHostInfo')
   @patch("hostname.public_hostname")
   @patch("os.path.isfile")
   @patch("os.unlink")
-  def test_dump_command_to_json(self, unlink_mock, isfile_mock, hostname_mock, decompress_cluster_host_info_mock):
+  @patch.object(FileCache, "__init__")
+  def test_dump_command_to_json(self, FileCache_mock, unlink_mock,
+                                isfile_mock, hostname_mock,
+                                decompress_cluster_host_info_mock):
+    FileCache_mock.return_value = None
     hostname_mock.return_value = "test.hst"
     command = {
       'commandType': 'EXECUTION_COMMAND',
@@ -86,7 +101,8 @@ class TestCustomServiceOrchestrator(TestCase):
     config = AmbariConfig().getConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
-    orchestrator = CustomServiceOrchestrator(config)
+    dummy_controller = MagicMock()
+    orchestrator = CustomServiceOrchestrator(config, dummy_controller)
     isfile_mock.return_value = True
     # Test dumping EXECUTION_COMMAND
     json_file = orchestrator.dump_command_to_json(command)
@@ -112,9 +128,12 @@ class TestCustomServiceOrchestrator(TestCase):
 
 
   @patch("os.path.exists")
-  def test_resolve_script_path(self, exists_mock):
+  @patch.object(FileCache, "__init__")
+  def test_resolve_script_path(self, FileCache_mock, exists_mock):
+    FileCache_mock.return_value = None
+    dummy_controller = MagicMock()
     config = AmbariConfig().getConfig()
-    orchestrator = CustomServiceOrchestrator(config)
+    orchestrator = CustomServiceOrchestrator(config, dummy_controller)
     # Testing existing path
     exists_mock.return_value = True
     path = orchestrator.\
@@ -136,14 +155,18 @@ class TestCustomServiceOrchestrator(TestCase):
   @patch.object(FileCache, "get_hook_base_dir")
   @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
   @patch.object(PythonExecutor, "run_file")
-  def test_runCommand(self, run_file_mock, dump_command_to_json_mock,
+  @patch.object(FileCache, "__init__")
+  def test_runCommand(self, FileCache_mock,
+                      run_file_mock, dump_command_to_json_mock,
                       get_hook_base_dir_mock, get_service_base_dir_mock,
                       resolve_hook_script_path_mock, resolve_script_path_mock):
+    FileCache_mock.return_value = None
     command = {
       'role' : 'REGION_SERVER',
       'hostLevelParams' : {
         'stack_name' : 'HDP',
         'stack_version' : '2.0.7',
+        'jdk_location' : 'some_location'
       },
       'commandParams': {
         'script_type': 'PYTHON',
@@ -159,7 +182,8 @@ class TestCustomServiceOrchestrator(TestCase):
     resolve_hook_script_path_mock.return_value = \
       ('/hooks_dir/prefix-command/scripts/hook.py',
        '/hooks_dir/prefix-command')
-    orchestrator = CustomServiceOrchestrator(self.config)
+    dummy_controller = MagicMock()
+    orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
     get_hook_base_dir_mock.return_value = "/hooks/"
     # normal run case
     run_file_mock.return_value = {
@@ -206,10 +230,19 @@ class TestCustomServiceOrchestrator(TestCase):
 
   @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
   @patch.object(PythonExecutor, "run_file")
-  def test_runCommand_custom_action(self, run_file_mock, dump_command_to_json_mock):
+  @patch.object(FileCache, "__init__")
+  @patch.object(FileCache, "get_custom_actions_base_dir")
+  def test_runCommand_custom_action(self, get_custom_actions_base_dir_mock,
+                                    FileCache_mock,
+                                    run_file_mock, dump_command_to_json_mock):
+    FileCache_mock.return_value = None
+    get_custom_actions_base_dir_mock.return_value = "some path"
     _, script = tempfile.mkstemp()
     command = {
       'role' : 'any',
+      'hostLevelParams' : {
+        'jdk_location' : 'some_location'
+      },
       'commandParams': {
         'script_type': 'PYTHON',
         'script': 'some_custom_action.py',
@@ -218,8 +251,8 @@ class TestCustomServiceOrchestrator(TestCase):
       'taskId' : '3',
       'roleCommand': 'ACTIONEXECUTE'
     }
-
-    orchestrator = CustomServiceOrchestrator(self.config)
+    dummy_controller = MagicMock()
+    orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
     # normal run case
     run_file_mock.return_value = {
       'stdout' : 'sss',
@@ -235,9 +268,11 @@ class TestCustomServiceOrchestrator(TestCase):
 
 
   @patch("os.path.isfile")
-  def test_resolve_hook_script_path(self, isfile_mock):
-
-    orchestrator = CustomServiceOrchestrator(self.config)
+  @patch.object(FileCache, "__init__")
+  def test_resolve_hook_script_path(self, FileCache_mock, isfile_mock):
+    FileCache_mock.return_value = None
+    dummy_controller = MagicMock()
+    orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
     # Testing None param
     res1 = orchestrator.resolve_hook_script_path(None, "prefix", "command",
                                             "script_type")
@@ -256,7 +291,9 @@ class TestCustomServiceOrchestrator(TestCase):
 
 
   @patch.object(CustomServiceOrchestrator, "runCommand")
-  def test_requestComponentStatus(self, runCommand_mock):
+  @patch.object(FileCache, "__init__")
+  def test_requestComponentStatus(self, FileCache_mock, runCommand_mock):
+    FileCache_mock.return_value = None
     status_command = {
       "serviceName" : 'HDFS',
       "commandType" : "STATUS_COMMAND",
@@ -264,7 +301,8 @@ class TestCustomServiceOrchestrator(TestCase):
       "componentName" : "DATANODE",
       'configurations':{}
     }
-    orchestrator = CustomServiceOrchestrator(self.config)
+    dummy_controller = MagicMock()
+    orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
     # Test alive case
     runCommand_mock.return_value = {
       "exitcode" : 0

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/TestFileCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestFileCache.py b/ambari-agent/src/test/python/ambari_agent/TestFileCache.py
index 5e389d5..023d19a 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestFileCache.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestFileCache.py
@@ -28,15 +28,12 @@ import tempfile
 import time
 from threading import Thread
 
-from PythonExecutor import PythonExecutor
-from CustomServiceOrchestrator import CustomServiceOrchestrator
-from FileCache import FileCache
+from FileCache import FileCache, CachingException
 from AmbariConfig import AmbariConfig
 from mock.mock import MagicMock, patch
 import StringIO
 import sys
-from ambari_agent import AgentException
-from AgentException import AgentException
+import shutil
 
 
 class TestFileCache(TestCase):
@@ -51,56 +48,314 @@ class TestFileCache(TestCase):
     self.config.add_section('agent')
     self.config.set('agent', 'prefix', tmpdir)
     self.config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    self.config.set('agent', 'tolerate_download_failures', "true")
 
 
-  @patch("os.path.isdir")
-  def test_get_service_base_dir(self, isdir_mock):
+  def test_reset(self):
     fileCache = FileCache(self.config)
-    # Check existing dir case
-    isdir_mock.return_value = True
-    service_subpath = "HDP/2.1.1/services/ZOOKEEPER/package"
-    base = fileCache.get_service_base_dir(service_subpath)
-    self.assertEqual(base, "/var/lib/ambari-agent/cache/stacks/HDP/2.1.1/"
-                           "services/ZOOKEEPER/package")
-    # Check absent dir case
-    isdir_mock.return_value = False
-    try:
-      fileCache.get_service_base_dir(service_subpath)
-      self.fail("Should throw an exception")
-    except AgentException:
-      pass # Expected
+    fileCache.uptodate_paths.append('dummy-path')
+    fileCache.reset()
+    self.assertFalse(fileCache.uptodate_paths)
 
 
+  @patch.object(FileCache, "provide_directory")
+  def test_get_service_base_dir(self, provide_directory_mock):
+    provide_directory_mock.return_value = "dummy value"
+    fileCache = FileCache(self.config)
+    command = {
+      'commandParams' : {
+        'service_package_folder' : 'HDP/2.1.1/services/ZOOKEEPER/package'
+      }
+    }
+    res = fileCache.get_service_base_dir(command, "server_url_pref")
+    self.assertEquals(
+      pprint.pformat(provide_directory_mock.call_args_list[0][0]),
+      "('/var/lib/ambari-agent/cache',\n "
+      "'stacks/HDP/2.1.1/services/ZOOKEEPER/package',\n"
+      " 'server_url_pref')")
+    self.assertEquals(res, "dummy value")
 
 
-  @patch("os.path.isdir")
-  def test_get_hook_base_dir(self, isdir_mock):
+  @patch.object(FileCache, "provide_directory")
+  def test_get_hook_base_dir(self, provide_directory_mock):
     fileCache = FileCache(self.config)
     # Check missing parameter
     command = {
       'commandParams' : {
       }
     }
-    base = fileCache.get_hook_base_dir(command)
+    base = fileCache.get_hook_base_dir(command, "server_url_pref")
     self.assertEqual(base, None)
+    self.assertFalse(provide_directory_mock.called)
 
     # Check existing dir case
-    isdir_mock.return_value = True
     command = {
       'commandParams' : {
         'hooks_folder' : 'HDP/2.1.1/hooks'
       }
     }
-    base = fileCache.get_hook_base_dir(command)
-    self.assertEqual(base, "/var/lib/ambari-agent/cache/stacks/HDP/2.1.1/hooks")
+    provide_directory_mock.return_value = "dummy value"
+    fileCache = FileCache(self.config)
+    res = fileCache.get_hook_base_dir(command, "server_url_pref")
+    self.assertEquals(
+      pprint.pformat(provide_directory_mock.call_args_list[0][0]),
+      "('/var/lib/ambari-agent/cache', "
+      "'stacks/HDP/2.1.1/hooks', "
+      "'server_url_pref')")
+    self.assertEquals(res, "dummy value")
+
+
+  @patch.object(FileCache, "provide_directory")
+  def test_get_custom_actions_base_dir(self, provide_directory_mock):
+    provide_directory_mock.return_value = "dummy value"
+    fileCache = FileCache(self.config)
+    res = fileCache.get_custom_actions_base_dir("server_url_pref")
+    self.assertEquals(
+      pprint.pformat(provide_directory_mock.call_args_list[0][0]),
+      "('/var/lib/ambari-agent/cache', 'custom_actions', 'server_url_pref')")
+    self.assertEquals(res, "dummy value")
+
+
+  @patch.object(FileCache, "build_download_url")
+  @patch.object(FileCache, "fetch_url")
+  @patch.object(FileCache, "read_hash_sum")
+  @patch.object(FileCache, "invalidate_directory")
+  @patch.object(FileCache, "unpack_archive")
+  @patch.object(FileCache, "write_hash_sum")
+  def test_provide_directory(self, write_hash_sum_mock, unpack_archive_mock,
+                             invalidate_directory_mock,
+                             read_hash_sum_mock, fetch_url_mock,
+                             build_download_url_mock):
+    build_download_url_mock.return_value = "http://dummy-url/"
+    HASH1 = "hash1"
+    membuffer = MagicMock()
+    membuffer.getvalue.return_value.strip.return_value = HASH1
+    fileCache = FileCache(self.config)
+
+    # Test uptodate dirs after start
+    self.assertFalse(fileCache.uptodate_paths)
+
+    # Test initial downloading (when dir does not exist)
+    fetch_url_mock.return_value = membuffer
+    read_hash_sum_mock.return_value = "hash2"
+    res = fileCache.provide_directory("cache_path", "subdirectory",
+                                      "server_url_prefix")
+    self.assertTrue(invalidate_directory_mock.called)
+    self.assertTrue(write_hash_sum_mock.called)
+    self.assertEquals(fetch_url_mock.call_count, 2)
+    self.assertEquals(pprint.pformat(fileCache.uptodate_paths),
+                     "['cache_path/subdirectory']")
+    self.assertEquals(res, 'cache_path/subdirectory')
+
+    fetch_url_mock.reset_mock()
+    write_hash_sum_mock.reset_mock()
+    invalidate_directory_mock.reset_mock()
+    unpack_archive_mock.reset_mock()
+
+    # Test cache invalidation when local hash does not differ
+    fetch_url_mock.return_value = membuffer
+    read_hash_sum_mock.return_value = HASH1
+    fileCache.reset()
+
+    res = fileCache.provide_directory("cache_path", "subdirectory",
+                                      "server_url_prefix")
+    self.assertFalse(invalidate_directory_mock.called)
+    self.assertFalse(write_hash_sum_mock.called)
+    self.assertEquals(fetch_url_mock.call_count, 1)
+    self.assertEquals(pprint.pformat(fileCache.uptodate_paths),
+                      "['cache_path/subdirectory']")
+    self.assertEquals(res, 'cache_path/subdirectory')
+
+    fetch_url_mock.reset_mock()
+    write_hash_sum_mock.reset_mock()
+    invalidate_directory_mock.reset_mock()
+    unpack_archive_mock.reset_mock()
+
+    # Test execution path when path is up-to date (already checked)
+    res = fileCache.provide_directory("cache_path", "subdirectory",
+                                      "server_url_prefix")
+    self.assertFalse(invalidate_directory_mock.called)
+    self.assertFalse(write_hash_sum_mock.called)
+    self.assertEquals(fetch_url_mock.call_count, 0)
+    self.assertEquals(pprint.pformat(fileCache.uptodate_paths),
+                      "['cache_path/subdirectory']")
+    self.assertEquals(res, 'cache_path/subdirectory')
+
+    # Check exception handling when tolerance is disabled
+    self.config.set('agent', 'tolerate_download_failures', "false")
+    fetch_url_mock.side_effect = self.caching_exc_side_effect
+    fileCache = FileCache(self.config)
+    try:
+      fileCache.provide_directory("cache_path", "subdirectory",
+                                  "server_url_prefix")
+      self.fail('CachingException not thrown')
+    except CachingException:
+      pass # Expected
+    except Exception, e:
+      self.fail('Unexpected exception thrown:' + str(e))
+
+    # Check that unexpected exceptions are still propagated when
+    # tolerance is enabled
+    self.config.set('agent', 'tolerate_download_failures', "false")
+    fetch_url_mock.side_effect = self.exc_side_effect
+    fileCache = FileCache(self.config)
+    try:
+      fileCache.provide_directory("cache_path", "subdirectory",
+                                  "server_url_prefix")
+      self.fail('Exception not thrown')
+    except Exception:
+      pass # Expected
 
-    # Check absent dir case
+
+    # Check exception handling when tolerance is enabled
+    self.config.set('agent', 'tolerate_download_failures', "true")
+    fetch_url_mock.side_effect = self.caching_exc_side_effect
+    fileCache = FileCache(self.config)
+    res = fileCache.provide_directory("cache_path", "subdirectory",
+                                  "server_url_prefix")
+    self.assertEquals(res, 'cache_path/subdirectory')
+
+
+  def test_build_download_url(self):
+    fileCache = FileCache(self.config)
+    url = fileCache.build_download_url('http://localhost:8080/resources/',
+                                       'stacks/HDP/2.1.1/hooks', 'archive.zip')
+    self.assertEqual(url,
+        'http://localhost:8080/resources//stacks/HDP/2.1.1/hooks/archive.zip')
+
+
+  @patch("urllib2.urlopen")
+  def test_fetch_url(self, urlopen_mock):
+    fileCache = FileCache(self.config)
+    remote_url = "http://dummy-url/"
+    # Test normal download
+    test_str = 'abc' * 100000 # Very long string
+    test_string_io = StringIO.StringIO(test_str)
+    test_buffer = MagicMock()
+    test_buffer.read.side_effect = test_string_io.read
+    urlopen_mock.return_value = test_buffer
+
+    memory_buffer = fileCache.fetch_url(remote_url)
+
+    self.assertEquals(memory_buffer.getvalue(), test_str)
+    self.assertEqual(test_buffer.read.call_count, 20) # depends on buffer size
+    # Test exception handling
+    test_buffer.read.side_effect = self.exc_side_effect
+    try:
+      fileCache.fetch_url(remote_url)
+      self.fail('CachingException not thrown')
+    except CachingException:
+      pass # Expected
+    except Exception, e:
+      self.fail('Unexpected exception thrown:' + str(e))
+
+
+  def test_read_write_hash_sum(self):
+    tmpdir = tempfile.mkdtemp()
+    dummyhash = "DUMMY_HASH"
+    fileCache = FileCache(self.config)
+    fileCache.write_hash_sum(tmpdir, dummyhash)
+    newhash = fileCache.read_hash_sum(tmpdir)
+    self.assertEquals(newhash, dummyhash)
+    shutil.rmtree(tmpdir)
+    # Test read of not existing file
+    newhash = fileCache.read_hash_sum(tmpdir)
+    self.assertEquals(newhash, None)
+    # Test write to not existing file
+    with patch("__builtin__.open") as open_mock:
+      open_mock.side_effect = self.exc_side_effect
+      try:
+        fileCache.write_hash_sum(tmpdir, dummyhash)
+        self.fail('CachingException not thrown')
+      except CachingException:
+        pass # Expected
+      except Exception, e:
+        self.fail('Unexpected exception thrown:' + str(e))
+
+
+  @patch("os.path.isfile")
+  @patch("os.path.isdir")
+  @patch("os.unlink")
+  @patch("shutil.rmtree")
+  @patch("os.makedirs")
+  def test_invalidate_directory(self, makedirs_mock, rmtree_mock,
+                                unlink_mock, isdir_mock, isfile_mock):
+    fileCache = FileCache(self.config)
+    # Test execution flow if path points to file
+    isfile_mock.return_value = True
     isdir_mock.return_value = False
+
+    fileCache.invalidate_directory("dummy-dir")
+
+    self.assertTrue(unlink_mock.called)
+    self.assertFalse(rmtree_mock.called)
+    self.assertTrue(makedirs_mock.called)
+
+    unlink_mock.reset_mock()
+    rmtree_mock.reset_mock()
+    makedirs_mock.reset_mock()
+
+    # Test execution flow if path points to dir
+    isfile_mock.return_value = False
+    isdir_mock.return_value = True
+
+    fileCache.invalidate_directory("dummy-dir")
+
+    self.assertFalse(unlink_mock.called)
+    self.assertTrue(rmtree_mock.called)
+    self.assertTrue(makedirs_mock.called)
+
+    unlink_mock.reset_mock()
+    rmtree_mock.reset_mock()
+    makedirs_mock.reset_mock()
+
+    # Test exception handling
+    makedirs_mock.side_effect = self.exc_side_effect
     try:
-      fileCache.get_hook_base_dir(command)
-      self.fail("Should throw an exception")
-    except AgentException:
+      fileCache.invalidate_directory("dummy-dir")
+      self.fail('CachingException not thrown')
+    except CachingException:
       pass # Expected
+    except Exception, e:
+      self.fail('Unexpected exception thrown:' + str(e))
+
+
+  def test_unpack_archive(self):
+    tmpdir = tempfile.mkdtemp()
+    dummy_archive = os.path.join("ambari_agent", "dummy_files",
+                                 "dummy_archive.zip")
+    # Test normal flow
+    with open(dummy_archive, "r") as f:
+      data = f.read(os.path.getsize(dummy_archive))
+      membuf = StringIO.StringIO(data)
+
+    fileCache = FileCache(self.config)
+    fileCache.unpack_archive(membuf, tmpdir)
+    # Count summary size of unpacked files:
+    total_size = 0
+    total_files = 0
+    total_dirs = 0
+    for dirpath, dirnames, filenames in os.walk(tmpdir):
+      total_dirs += 1
+      for f in filenames:
+        fp = os.path.join(dirpath, f)
+        total_size += os.path.getsize(fp)
+        total_files += 1
+    self.assertEquals(total_size, 51258L)
+    self.assertEquals(total_files, 28)
+    self.assertEquals(total_dirs, 8)
+    shutil.rmtree(tmpdir)
+
+    # Test exception handling
+    with patch("os.path.isdir") as isdir_mock:
+      isdir_mock.side_effect = self.exc_side_effect
+      try:
+        fileCache.unpack_archive(membuf, tmpdir)
+        self.fail('CachingException not thrown')
+      except CachingException:
+        pass # Expected
+      except Exception, e:
+        self.fail('Unexpected exception thrown:' + str(e))
 
 
   def tearDown(self):
@@ -108,3 +363,8 @@ class TestFileCache(TestCase):
     sys.stdout = sys.__stdout__
 
 
+  def exc_side_effect(self, *a):
+    raise Exception("horrible_exc")
+
+  def caching_exc_side_effect(self, *a):
+    raise CachingException("horrible_caching_exc")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
index 906244d..c6a834d 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
@@ -47,7 +47,12 @@ class TestHeartbeat(TestCase):
 
 
   def test_build(self):
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
+    config = AmbariConfig.AmbariConfig().getConfig()
+    config.set('agent', 'prefix', 'tmp')
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
     heartbeat = Heartbeat(actionQueue)
     result = heartbeat.build(100)
     print "Heartbeat: " + str(result)
@@ -80,7 +85,12 @@ class TestHeartbeat(TestCase):
                    'exitCode': 777}],
       'componentStatus': [{'status': 'HEALTHY', 'componentName': 'NAMENODE'}]
     }
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
+    config = AmbariConfig.AmbariConfig().getConfig()
+    config.set('agent', 'prefix', 'tmp')
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
     heartbeat = Heartbeat(actionQueue)
     hb = heartbeat.build(id = 10, state_interval=1, componentsMapped=True)
     self.assertEqual(register_mock.call_args_list[0][0][1], True)
@@ -92,7 +102,12 @@ class TestHeartbeat(TestCase):
 
   @patch.object(ActionQueue, "result")
   def test_build_long_result(self, result_mock):
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
+    config = AmbariConfig.AmbariConfig().getConfig()
+    config.set('agent', 'prefix', 'tmp')
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
     result_mock.return_value = {
       'reports': [{'status': 'IN_PROGRESS',
             'stderr': 'Read from /tmp/errors-3.txt',
@@ -178,7 +193,12 @@ class TestHeartbeat(TestCase):
 
   @patch.object(HostInfo, 'register')
   def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock):
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
+    config = AmbariConfig.AmbariConfig().getConfig()
+    config.set('agent', 'prefix', 'tmp')
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
     statusCommand = {
       "serviceName" : 'HDFS',
       "commandType" : "STATUS_COMMAND",
@@ -198,7 +218,12 @@ class TestHeartbeat(TestCase):
 
   @patch.object(HostInfo, 'register')
   def test_heartbeat_host_check_no_cmd(self, register_mock):
-    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
+    config = AmbariConfig.AmbariConfig().getConfig()
+    config.set('agent', 'prefix', 'tmp')
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
     heartbeat = Heartbeat(actionQueue)
     heartbeat.build(12, 6)
     self.assertTrue(register_mock.called)

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/dummy_files/dummy_archive.zip
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/dummy_archive.zip b/ambari-agent/src/test/python/ambari_agent/dummy_files/dummy_archive.zip
new file mode 100644
index 0000000..9a77d7e
Binary files /dev/null and b/ambari-agent/src/test/python/ambari_agent/dummy_files/dummy_archive.zip differ

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml
index 1d6b427..bc83dd8 100644
--- a/ambari-server/pom.xml
+++ b/ambari-server/pom.xml
@@ -327,7 +327,6 @@
                   <location>${project.build.directory}/DBConnectionVerification.jar</location>
                 </source>
                 <source>
-                  <!-- This file is also included into agent rpm -->
                   <location>src/main/resources/role_command_order.json</location>
                 </source>
               </sources>
@@ -390,6 +389,9 @@
               <groupname>root</groupname>
               <sources>
                 <source>
+                  <location>src/main/python/ambari_server</location>
+                </source>
+                <source>
                   <location>src/main/python/bootstrap.py</location>
                 </source>
                 <source>
@@ -420,6 +422,18 @@
                 </source>
               </sources>
             </mapping>
+            <mapping>
+              <!-- custom actions root-->
+              <directory>/var/lib/ambari-server/resources/custom_actions</directory>
+              <filemode>755</filemode>
+              <username>root</username>
+              <groupname>root</groupname>
+              <sources>
+                <source>
+                  <location>src/main/resources/custom_actions</location>
+                </source>
+              </sources>
+            </mapping>
           </mappings>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index 2babd6b..a339784 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -28,6 +28,7 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.state.*;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.host.HostHeartbeatLostEvent;
@@ -49,6 +50,7 @@ public class HeartbeatMonitor implements Runnable {
   private Thread monitorThread = null;
   private final ConfigHelper configHelper;
   private final AmbariMetaInfo ambariMetaInfo;
+  private final AmbariManagementController ambariManagementController;
   private final Configuration configuration;
 
   public HeartbeatMonitor(Clusters clusters, ActionQueue aq, ActionManager am,
@@ -59,6 +61,8 @@ public class HeartbeatMonitor implements Runnable {
     this.threadWakeupInterval = threadWakeupInterval;
     this.configHelper = injector.getInstance(ConfigHelper.class);
     this.ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class);
+    this.ambariManagementController = injector.getInstance(
+            AmbariManagementController.class);
     this.configuration = injector.getInstance(Configuration.class);
   }
 
@@ -191,7 +195,6 @@ public class HeartbeatMonitor implements Runnable {
     String serviceName = sch.getServiceName();
     String componentName = sch.getServiceComponentName();
     Service service = cluster.getService(sch.getServiceName());
-    ServiceComponent sc = service.getServiceComponent(componentName);
     StackId stackId = cluster.getDesiredStackVersion();
     ServiceInfo serviceInfo = ambariMetaInfo.getServiceInfo(stackId.getStackName(),
             stackId.getStackVersion(), serviceName);
@@ -267,6 +270,7 @@ public class HeartbeatMonitor implements Runnable {
     commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
     // Fill host level params
     Map<String, String> hostLevelParams = statusCmd.getHostLevelParams();
+    hostLevelParams.put(JDK_LOCATION, ambariManagementController.getJdkResourceUrl());
     hostLevelParams.put(STACK_NAME, stackId.getStackName());
     hostLevelParams.put(STACK_VERSION, stackId.getStackVersion());
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
index 7f2d1fb..be800e7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -125,8 +125,8 @@ public class AmbariCustomCommandExecutionHelper {
   @Inject
   private ConfigHelper configHelper;
 
-  private Boolean isServiceCheckCommand(String
-                                          command, String service) {
+
+  private Boolean isServiceCheckCommand(String command, String service) {
     List<String> actions = actionMetadata.getActions(service);
     if (actions == null || actions.size() == 0) {
       return false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index e4ebc1c..b9f62e4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -322,9 +322,7 @@ public class AmbariServer {
       resources.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
           "com.sun.jersey.api.core.PackagesResourceConfig");
       resources.setInitParameter("com.sun.jersey.config.property.packages",
-          "org.apache.ambari.server.resources.api.rest;" + "org.apache.ambari.server.api");
-      resources.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
-          "true");
+          "org.apache.ambari.server.resources.api.rest;");
       root.addServlet(resources, "/resources/*");
       resources.setInitOrder(6);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/python/ambari-server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari-server.py b/ambari-server/src/main/python/ambari-server.py
index 61233d3..e3439e0 100755
--- a/ambari-server/src/main/python/ambari-server.py
+++ b/ambari-server/src/main/python/ambari-server.py
@@ -39,6 +39,7 @@ import datetime
 import tempfile
 import random
 import pwd
+from ambari_server.resourceFilesKeeper import ResourceFilesKeeper, KeeperException
 
 # debug settings
 VERBOSE = False
@@ -593,6 +594,8 @@ NR_ADJUST_OWNERSHIP_LIST =[
   ( "/var/lib/ambari-server/keys/db", "700", "{0}", False ),
   ( "/var/lib/ambari-server/keys/db/newcerts", "700", "{0}", False ),
   ( "/var/lib/ambari-server/keys/.ssh", "700", "{0}", False ),
+  ( "/var/lib/ambari-server/resources/stacks/", "755", "{0}", True ),
+  ( "/var/lib/ambari-server/resources/custom_actions/", "755", "{0}", True ),
   ( "/etc/ambari-server/conf", "644", "{0}", True ),
   ( "/etc/ambari-server/conf", "755", "{0}", False ),
   ( "/etc/ambari-server/conf/password.dat", "640", "{0}", False ),
@@ -2550,6 +2553,20 @@ def start(args):
       print "Please do not forget to start PostgreSQL server."
 
   properties = get_ambari_properties()
+  stack_location = get_stack_location(properties)
+  # Hack: we determine resource dir as a parent dir for stack_location
+  resources_location = os.path.dirname(stack_location)
+  resource_files_keeper = ResourceFilesKeeper(resources_location)
+
+  try:
+    print "Organizing resource files at {0}...".format(resources_location,
+                                                       verbose=VERBOSE)
+    resource_files_keeper.perform_housekeeping()
+  except KeeperException, ex:
+    msg = "Can not organize resource files at {0}: {1}".format(
+                                                resources_location, str(ex))
+    raise FatalException(-1, msg)
+
   isSecure = get_is_secure(properties)
   (isPersisted, masterKeyFile) = get_is_persisted(properties)
   environ = os.environ.copy()
@@ -2804,16 +2821,20 @@ def upgrade_local_repo_db(args, dbkey, dbvalue):
     return retcode
   pass
 
+
+def get_stack_location(properties):
+  stack_location = properties[STACK_LOCATION_KEY]
+  if stack_location is None:
+    stack_location = STACK_LOCATION_DEFAULT
+  return stack_location
+
 def upgrade_local_repo(args):
   properties = get_ambari_properties()
   if properties == -1:
     print_error_msg ("Error getting ambari properties")
     return -1
 
-  stack_location = properties[STACK_LOCATION_KEY]
-  if stack_location is None:
-    stack_location = STACK_LOCATION_DEFAULT
-
+  stack_location = get_stack_location(properties)
   stack_root_local = os.path.join(stack_location, "HDPLocal")
   if not os.path.exists(stack_root_local):
     print_info_msg("HDPLocal stack directory does not exist, skipping")

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/python/ambari_server/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/__init__.py b/ambari-server/src/main/python/ambari_server/__init__.py
new file mode 100644
index 0000000..16818c9
--- /dev/null
+++ b/ambari-server/src/main/python/ambari_server/__init__.py
@@ -0,0 +1,21 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/python/ambari_server/resourceFilesKeeper.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/resourceFilesKeeper.py b/ambari-server/src/main/python/ambari_server/resourceFilesKeeper.py
new file mode 100644
index 0000000..92cab3d
--- /dev/null
+++ b/ambari-server/src/main/python/ambari_server/resourceFilesKeeper.py
@@ -0,0 +1,258 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import hashlib
+
+import os, sys
+import zipfile
+import glob
+import pprint
+from xml.dom import minidom
+
+
+class KeeperException(Exception):
+  pass
+
+class ResourceFilesKeeper():
+  """
+  This class incapsulates all utility methods for resource files maintenance.
+  """
+
+  HOOKS_DIR="hooks"
+  PACKAGE_DIR="package"
+  STACKS_DIR="stacks"
+  CUSTOM_ACTIONS_DIR="custom_actions"
+
+  # For these directories archives are created
+  ARCHIVABLE_DIRS = [HOOKS_DIR, PACKAGE_DIR]
+
+  HASH_SUM_FILE=".hash"
+  ARCHIVE_NAME="archive.zip"
+
+  PYC_EXT=".pyc"
+  METAINFO_XML = "metainfo.xml"
+
+  BUFFER = 1024 * 32
+
+  # Change that to True to see debug output at stderr
+  DEBUG=False
+
+  def __init__(self, resources_dir, verbose=False):
+    self.resources_dir = resources_dir
+    self.verbose = verbose
+
+
+  def perform_housekeeping(self):
+    """
+    Performs housekeeping operations on resource files
+    """
+    self.update_directory_archieves()
+    # probably, later we will need some additional operations
+
+
+  def update_directory_archieves(self):
+    """
+    Please see AMBARI-4481 for more details
+    """
+    stacks_root = os.path.join(self.resources_dir, self.STACKS_DIR)
+    self.dbg_out("Updating archives for stack dirs at {0}...".format(stacks_root))
+    active_stacks = self.list_active_stacks(stacks_root)
+    self.dbg_out("Active stacks: {0}".format(pprint.pformat(active_stacks)))
+    # Iterate over active stack directories
+    for stack_dir in active_stacks:
+      for root, dirs, _ in os.walk(stack_dir):
+        for d in dirs:
+          if d in self.ARCHIVABLE_DIRS:
+            full_path = os.path.abspath(os.path.join(root, d))
+            self.update_directory_archive(full_path)
+
+
+    custom_actions_root = os.path.join(self.resources_dir,
+                                       self.CUSTOM_ACTIONS_DIR)
+    self.dbg_out("Updating archive for custom_actions dir at {0}...".format(
+                                       custom_actions_root))
+    self.update_directory_archive(custom_actions_root)
+
+
+
+  def list_active_stacks(self, stacks_root):
+    """
+    Builds a list of stack directories, that are active (enabled)
+    """
+    active_stacks = [] # Format: <stack_dir, ignore(True|False)>
+    glob_pattern = "{0}/*/*".format(stacks_root)
+    try:
+      stack_dirs = glob.glob(glob_pattern)
+      for directory in stack_dirs:
+        metainfo_file = os.path.join(directory, self.METAINFO_XML)
+        if os.path.exists(metainfo_file) and self.is_active_stack(metainfo_file):
+          active_stacks.append(directory)
+      return active_stacks
+    except Exception, err:
+      raise KeeperException("Can not list active stacks: {0}".format(str(err)))
+
+
+  def update_directory_archive(self, directory):
+    """
+    If hash sum for directory is not present or differs from saved value,
+    recalculates hash sum and creates directory archive
+    """
+    cur_hash = self.count_hash_sum(directory)
+    saved_hash = self.read_hash_sum(directory)
+    if cur_hash != saved_hash:
+      self.zip_directory(directory)
+      self.write_hash_sum(directory, cur_hash)
+
+
+  def count_hash_sum(self, directory):
+    """
+    Recursively counts hash sum of all files in directory and subdirectories.
+    Files and directories are processed in alphabetical order.
+    Ignores previously created directory archives and files containing
+    previously calculated hashes. Compiled pyc files are also ignored
+    """
+    try:
+      sha1 = hashlib.sha1()
+      file_list = []
+      for root, dirs, files in os.walk(directory):
+        for f in files:
+          if not self.is_ignored(f):
+            full_path = os.path.abspath(os.path.join(root, f))
+            file_list.append(full_path)
+      file_list.sort()
+      for path in file_list:
+        self.dbg_out("Counting hash of {0}".format(path))
+        with open(path, 'rb') as fh:
+          while True:
+            data = fh.read(self.BUFFER)
+            if not data:
+              break
+            sha1.update(data)
+      return sha1.hexdigest()
+    except Exception, err:
+      raise KeeperException("Can not calculate directory "
+                            "hash: {0}".format(str(err)))
+
+
+  def read_hash_sum(self, directory):
+    """
+    Tries to read a hash sum from previously generated file. Returns string
+    containing hash or None
+    """
+    hash_file = os.path.join(directory, self.HASH_SUM_FILE)
+    if os.path.isfile(hash_file):
+      try:
+        with open(hash_file) as fh:
+          return fh.readline().strip()
+      except Exception, err:
+        raise KeeperException("Can not read file {0} : {1}".format(hash_file,
+                                                                   str(err)))
+    else:
+      return None
+
+
+  def write_hash_sum(self, directory, new_hash):
+    """
+    Tries to read a hash sum from previously generated file. Returns string
+    containing hash or None
+    """
+    hash_file = os.path.join(directory, self.HASH_SUM_FILE)
+    try:
+      with open(hash_file, "w") as fh:
+        fh.write(new_hash)
+    except Exception, err:
+      raise KeeperException("Can not write to file {0} : {1}".format(hash_file,
+                                                                   str(err)))
+
+
+  def zip_directory(self, directory):
+    """
+    Packs entire directory into zip file. Hash file is also packaged
+    into archive
+    """
+    self.dbg_out("creating archive for directory {0}".format(directory))
+    try:
+      zf = zipfile.ZipFile(os.path.join(directory, self.ARCHIVE_NAME), "w")
+      abs_src = os.path.abspath(directory)
+      for root, dirs, files in os.walk(directory):
+        for filename in files:
+          # Avoid zipping previous archive and hash file and binary pyc files
+          if not self.is_ignored(filename):
+            absname = os.path.abspath(os.path.join(root, filename))
+            arcname = absname[len(abs_src) + 1:]
+            self.dbg_out('zipping %s as %s' % (os.path.join(root, filename),
+                                        arcname))
+            zf.write(absname, arcname)
+      zf.close()
+    except Exception, err:
+      raise KeeperException("Can not create zip archive of "
+                            "directory {0} : {1}".format(directory, str(err)))
+
+
+  def is_ignored(self, filename):
+    """
+    returns True if filename is ignored when calculating hashing or archiving
+    """
+    return filename in [self.HASH_SUM_FILE, self.ARCHIVE_NAME] or \
+           filename.endswith(self.PYC_EXT)
+
+
+  def dbg_out(self, text):
+    if self.DEBUG:
+      sys.stderr.write("{0}\n".format(text))
+    if not self.DEBUG and self.verbose:
+      print text
+
+
+  def is_active_stack(self, xmlfile):
+    try:
+      xmldoc = minidom.parse(xmlfile)
+      value = self.xpath_like_bycicle(xmldoc, ['metainfo', 'versions', 'active'])
+      return value.lower().strip() == 'true'
+    except Exception, err:
+      raise KeeperException("Can not parse XML file {0} : {1}",
+                            xmlfile, str(err))
+
+
+  def xpath_like_bycicle(self, xml_doc, path):
+    # Default Python 2.6 distribution have no good XPATH support,
+    # implementing own bycicle here
+    cur_elem = xml_doc
+    for name in path:
+      elem = self.find_xml_element_by_name(cur_elem._get_childNodes(), name)
+      if name:
+        cur_elem = elem
+      else:
+        return None
+    # Select text in tags
+    value = cur_elem._get_childNodes()[0].nodeValue
+    if value:
+      return value.lower().strip()
+    else:
+      return None
+
+
+  def find_xml_element_by_name(self, elements, element_name):
+    for xml_element in elements:
+      if xml_element.nodeType == xml_element.ELEMENT_NODE and \
+                      xml_element.nodeName == element_name:
+        return xml_element
+    return None
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/test/python/TestAmbariServer.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/TestAmbariServer.py b/ambari-server/src/test/python/TestAmbariServer.py
index 43494a7..3075391 100644
--- a/ambari-server/src/test/python/TestAmbariServer.py
+++ b/ambari-server/src/test/python/TestAmbariServer.py
@@ -29,6 +29,7 @@ import stat
 import datetime
 import operator
 from pwd import getpwnam
+from ambari_server.resourceFilesKeeper import ResourceFilesKeeper, KeeperException
 
 # We have to use this import HACK because the filename contains a dash
 ambari_server = __import__('ambari-server')
@@ -2455,7 +2456,9 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
   @patch.object(ambari_server, "find_jdbc_driver")
   @patch("getpass.getuser")
   @patch("os.chdir")
-  def test_start(self, chdir_mock, getuser_mock, find_jdbc_driver_mock, is_root_mock, read_ambari_user_mock,
+  @patch.object(ResourceFilesKeeper, "perform_housekeeping")
+  def test_start(self, perform_housekeeping_mock, chdir_mock, getuser_mock,
+                 find_jdbc_driver_mock, is_root_mock, read_ambari_user_mock,
                  parse_properties_file_mock, check_postgre_up_mock,
                  print_error_msg_mock, find_jdk_mock, search_file_mock,
                  print_info_msg_mock, popenMock, openMock, pexistsMock,
@@ -2556,6 +2559,18 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
       # Ignored
       pass
 
+    # Test exception handling on resource files housekeeping
+    perform_housekeeping_mock.reset_mock()
+    perform_housekeeping_mock.side_effect = KeeperException("some_reason")
+    try:
+      ambari_server.start(args)
+      self.fail("Should fail with exception")
+    except FatalException as e:
+      self.assertTrue('some_reason' in e.reason)
+    self.assertTrue(perform_housekeeping_mock.called)
+    perform_housekeeping_mock.side_effect = lambda *v, **kv : None
+    perform_housekeeping_mock.reset_mock()
+
     self.assertFalse('Unable to start PostgreSQL server' in e.reason)
     self.assertFalse(check_postgre_up_mock.called)
 
@@ -2585,6 +2600,8 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
     self.assertTrue(popenMock.called)
     popen_arg = popenMock.call_args[0][0]
     self.assertTrue(popen_arg[0] == "/bin/sh")
+    self.assertTrue(perform_housekeeping_mock.called)
+    perform_housekeeping_mock.reset_mock()
     popenMock.reset_mock()
 
     parse_properties_file_mock.reset_mock()
@@ -2596,6 +2613,7 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
     self.assertTrue(popenMock.called)
     popen_arg = popenMock.call_args[0][0]
     self.assertTrue(popen_arg[0] == "/bin/su")
+    self.assertTrue(perform_housekeeping_mock.called)
     check_postgre_up_mock.reset_mock()
 
     popenMock.reset_mock()