You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2014/02/18 22:06:33 UTC
[2/2] git commit: AMBARI-4481. Add to the agent ability to download
service scripts and hooks (dlysnichenko)
AMBARI-4481. Add to the agent ability to download service scripts and hooks (dlysnichenko)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/02f9c453
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/02f9c453
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/02f9c453
Branch: refs/heads/trunk
Commit: 02f9c45311ee6f6e7d87fe07b0059ad43a265782
Parents: f2c18bc
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Fri Jan 31 15:33:57 2014 +0200
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Tue Feb 18 23:03:50 2014 +0200
----------------------------------------------------------------------
ambari-agent/conf/unix/ambari-agent.ini | 4 +-
ambari-agent/pom.xml | 3 +-
.../src/main/python/ambari_agent/ActionQueue.py | 4 +-
.../src/main/python/ambari_agent/Controller.py | 15 +-
.../ambari_agent/CustomServiceOrchestrator.py | 18 +-
.../src/main/python/ambari_agent/FileCache.py | 211 ++++++++++--
.../test/python/ambari_agent/TestActionQueue.py | 44 ++-
.../test/python/ambari_agent/TestController.py | 27 +-
.../TestCustomServiceOrchestrator.py | 66 +++-
.../test/python/ambari_agent/TestFileCache.py | 320 +++++++++++++++++--
.../test/python/ambari_agent/TestHeartbeat.py | 35 +-
.../ambari_agent/dummy_files/dummy_archive.zip | Bin 0 -> 29558 bytes
ambari-server/pom.xml | 16 +-
.../ambari/server/agent/HeartbeatMonitor.java | 6 +-
.../AmbariCustomCommandExecutionHelper.java | 4 +-
.../ambari/server/controller/AmbariServer.java | 4 +-
ambari-server/src/main/python/ambari-server.py | 29 +-
.../src/main/python/ambari_server/__init__.py | 21 ++
.../python/ambari_server/resourceFilesKeeper.py | 258 +++++++++++++++
.../src/test/python/TestAmbariServer.py | 20 +-
.../src/test/python/TestResourceFilesKeeper.py | 319 ++++++++++++++++++
.../active_stack/metainfo.xml | 23 ++
.../HIVE/configuration/hive-site.xml | 267 ++++++++++++++++
.../dummy_stack/HIVE/metainfo.xml | 47 +++
.../dummy_stack/HIVE/package/.hash | 1 +
.../HIVE/package/files/addMysqlUser.sh | 41 +++
.../dummy_stack/HIVE/package/files/hcatSmoke.sh | 35 ++
.../dummy_stack/HIVE/package/files/hiveSmoke.sh | 23 ++
.../HIVE/package/files/hiveserver2.sql | 23 ++
.../HIVE/package/files/hiveserver2Smoke.sh | 31 ++
.../dummy_stack/HIVE/package/files/pigSmoke.sh | 18 ++
.../HIVE/package/files/startHiveserver2.sh | 22 ++
.../HIVE/package/files/startMetastore.sh | 22 ++
.../HIVE/package/scripts/__init__.py | 19 ++
.../dummy_stack/HIVE/package/scripts/hcat.py | 47 +++
.../HIVE/package/scripts/hcat_client.py | 43 +++
.../HIVE/package/scripts/hcat_service_check.py | 63 ++++
.../dummy_stack/HIVE/package/scripts/hive.py | 122 +++++++
.../HIVE/package/scripts/hive_client.py | 41 +++
.../HIVE/package/scripts/hive_metastore.py | 63 ++++
.../HIVE/package/scripts/hive_server.py | 63 ++++
.../HIVE/package/scripts/hive_service.py | 56 ++++
.../HIVE/package/scripts/mysql_server.py | 77 +++++
.../HIVE/package/scripts/mysql_service.py | 44 +++
.../dummy_stack/HIVE/package/scripts/params.py | 123 +++++++
.../HIVE/package/scripts/service_check.py | 56 ++++
.../HIVE/package/scripts/status_params.py | 30 ++
.../HIVE/package/templates/hcat-env.sh.j2 | 25 ++
.../HIVE/package/templates/hive-env.sh.j2 | 55 ++++
.../inactive_stack/metainfo.xml | 23 ++
50 files changed, 2807 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 11d6f1a..27eb6e1 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -25,6 +25,7 @@ data_cleanup_interval=86400
data_cleanup_max_age=2592000
ping_port=8670
cache_dir=/var/lib/ambari-agent/cache
+tolerate_download_failures=true
[puppet]
puppetmodules=/var/lib/ambari-agent/puppet
@@ -32,9 +33,6 @@ ruby_home=/usr/lib/ambari-agent/lib/ruby-1.8.7-p370
puppet_home=/usr/lib/ambari-agent/lib/puppet-2.7.9
facter_home=/usr/lib/ambari-agent/lib/facter-1.6.10
-[python]
-custom_actions_dir = /var/lib/ambari-agent/resources/custom_actions
-
[command]
maxretries=2
sleepBetweenRetries=1
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml
index d037d46..12a9910 100644
--- a/ambari-agent/pom.xml
+++ b/ambari-agent/pom.xml
@@ -336,7 +336,6 @@
</sources>
</mapping>
<mapping>
- <!-- TODO: Remove when we introduce metadata downloading by agent-->
<directory>/var/lib/ambari-agent/cache/stacks</directory>
<sources>
<source>
@@ -346,7 +345,7 @@
</mapping>
<mapping>
<!-- custom actions root-->
- <directory>/var/lib/ambari-agent/resources/custom_actions</directory>
+ <directory>/var/lib/ambari-agent/cache/custom_actions</directory>
<filemode>755</filemode>
<username>root</username>
<groupname>root</groupname>
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 942cc75..731ac54 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -71,7 +71,9 @@ class ActionQueue(threading.Thread):
self.sh = shellRunner()
self._stop = threading.Event()
self.tmpdir = config.get('agent', 'prefix')
- self.customServiceOrchestrator = CustomServiceOrchestrator(config)
+ self.customServiceOrchestrator = CustomServiceOrchestrator(config,
+ controller)
+
def stop(self):
self._stop.set()
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index b842b4d..26985ef 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -54,7 +54,8 @@ class Controller(threading.Thread):
self.credential = None
self.config = config
self.hostname = hostname.hostname()
- server_secured_url = 'https://' + config.get('server', 'hostname') + ':' + config.get('server', 'secured_url_port')
+ server_secured_url = 'https://' + config.get('server', 'hostname') + \
+ ':' + config.get('server', 'secured_url_port')
self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
self.netutil = NetUtil()
@@ -67,14 +68,15 @@ class Controller(threading.Thread):
# Event is used for synchronizing heartbeat iterations (to make possible
# manual wait() interruption between heartbeats )
self.heartbeat_wait_event = threading.Event()
+ # List of callbacks that are called at agent registration
+ self.registration_listeners = []
+
def __del__(self):
logger.info("Server connection disconnected.")
pass
def registerWithServer(self):
- retry=False
- firstTime=True
id = -1
ret = {}
@@ -257,8 +259,11 @@ class Controller(threading.Thread):
message = registerResponse['response']
logger.info("Response from server = " + message)
if self.isRegistered:
- time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
- self.heartbeatWithServer()
+ # Process callbacks
+ for callback in self.registration_listeners:
+ callback()
+ time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
+ self.heartbeatWithServer()
def restartAgent(self):
os._exit(AGENT_AUTO_RESTART_EXIT_CODE)
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 95ad2cd..7436d26 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -48,7 +48,7 @@ class CustomServiceOrchestrator():
PRE_HOOK_PREFIX="before"
POST_HOOK_PREFIX="after"
- def __init__(self, config):
+ def __init__(self, config, controller):
self.config = config
self.tmp_dir = config.get('agent', 'prefix')
self.file_cache = FileCache(config)
@@ -57,6 +57,8 @@ class CustomServiceOrchestrator():
'status_command_stdout.txt')
self.status_commands_stderr = os.path.join(self.tmp_dir,
'status_command_stderr.txt')
+ # cache reset will be called on every agent registration
+ controller.registration_listeners.append(self.file_cache.reset)
# Clean up old status command files if any
try:
os.unlink(self.status_commands_stdout)
@@ -72,15 +74,10 @@ class CustomServiceOrchestrator():
command json, is ignored.
"""
try:
- try:
- component_name = command['role']
- except KeyError:
- # For status commands and (maybe) custom actions component name
- # is stored at another location
- component_name = command['componentName']
script_type = command['commandParams']['script_type']
script = command['commandParams']['script']
timeout = int(command['commandParams']['command_timeout'])
+ server_url_prefix = command['hostLevelParams']['jdk_location']
task_id = "status"
try:
task_id = command['taskId']
@@ -92,15 +89,14 @@ class CustomServiceOrchestrator():
command_name = forsed_command_name
if command_name == self.CUSTOM_ACTION_COMMAND:
- base_dir = self.config.get('python', 'custom_actions_dir')
+ base_dir = self.file_cache.get_custom_actions_base_dir(server_url_prefix)
script_tuple = (os.path.join(base_dir, script) , base_dir)
hook_dir = None
else:
if command_name == self.CUSTOM_COMMAND_COMMAND:
command_name = command['hostLevelParams']['custom_command']
- hook_dir = self.file_cache.get_hook_base_dir(command)
- service_subpath = command['commandParams']['service_package_folder']
- base_dir = self.file_cache.get_service_base_dir(service_subpath)
+ hook_dir = self.file_cache.get_hook_base_dir(command, server_url_prefix)
+ base_dir = self.file_cache.get_service_base_dir(command, server_url_prefix)
script_path = self.resolve_script_path(base_dir, script, script_type)
script_tuple = (script_path, base_dir)
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/main/python/ambari_agent/FileCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/FileCache.py b/ambari-agent/src/main/python/ambari_agent/FileCache.py
index 01d2e52..6b307f1 100644
--- a/ambari-agent/src/main/python/ambari_agent/FileCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/FileCache.py
@@ -17,18 +17,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
-
+import StringIO
import logging
-import Queue
-import threading
-import pprint
import os
-import json
-from AgentException import AgentException
+import shutil
+import zipfile
+import urllib2
logger = logging.getLogger()
+class CachingException(Exception):
+ pass
+
class FileCache():
"""
Provides caching and lookup for service metadata files.
@@ -36,28 +37,40 @@ class FileCache():
downloads relevant files from the server.
"""
+ STACKS_CACHE_DIRECTORY="stacks"
+ CUSTOM_ACTIONS_CACHE_DIRECTORY="custom_actions"
+ HASH_SUM_FILE=".hash"
+ ARCHIVE_NAME="archive.zip"
+
+ BLOCK_SIZE=1024*16
+ SOCKET_TIMEOUT=10
+
def __init__(self, config):
self.service_component_pool = {}
self.config = config
self.cache_dir = config.get('agent', 'cache_dir')
+ # Defines whether command should fail when downloading scripts
+ # from the server is not possible or agent should rollback to local copy
+ self.tolerate_download_failures = \
+ config.get('agent','tolerate_download_failures').lower() == 'true'
+ self.reset()
+
+
+ def reset(self):
+ self.uptodate_paths = [] # Paths that already have been recently checked
- def get_service_base_dir(self, service_subpath):
+ def get_service_base_dir(self, command, server_url_prefix):
"""
Returns a base directory for service
"""
- service_base_dir = os.path.join(self.cache_dir, "stacks", service_subpath)
- if not os.path.isdir(service_base_dir):
- # TODO: Metadata downloading will be implemented at Phase 2
- # As of now, all stack definitions are packaged and distributed with
- # agent rpm
- message = "Service base dir not found at expected location {0}".\
- format(service_base_dir)
- raise AgentException(message)
- return service_base_dir
+ service_subpath = command['commandParams']['service_package_folder']
+ subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, service_subpath)
+ return self.provide_directory(self.cache_dir, subpath,
+ server_url_prefix)
- def get_hook_base_dir(self, command):
+ def get_hook_base_dir(self, command, server_url_prefix):
"""
Returns a base directory for hooks
"""
@@ -65,13 +78,159 @@ class FileCache():
hooks_subpath = command['commandParams']['hooks_folder']
except KeyError:
return None
- hook_base_path = os.path.join(self.cache_dir, "stacks", hooks_subpath)
- if not os.path.isdir(hook_base_path):
- # TODO: Metadata downloading will be implemented at Phase 2
- # As of now, all stack definitions are packaged and distributed with
- # agent rpm
- message = "Hook scripts dir for not found at " \
- "expected location {0}".format(hook_base_path)
- raise AgentException(message)
- return hook_base_path
+ subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, hooks_subpath)
+ return self.provide_directory(self.cache_dir, subpath,
+ server_url_prefix)
+
+
+ def get_custom_actions_base_dir(self, server_url_prefix):
+ """
+ Returns a base directory for custom action scripts
+ """
+ return self.provide_directory(self.cache_dir,
+ self.CUSTOM_ACTIONS_CACHE_DIRECTORY,
+ server_url_prefix)
+
+ def provide_directory(self, cache_path, subdirectory, server_url_prefix):
+ """
+ Ensures that directory at cache is up-to-date. Throws a CachingException
+ if any problems occur
+ Parameters;
+ cache_path: full path to cache directory
+ subdirectory: subpath inside cache
+ server_url_prefix: url of "resources" folder at the server
+ """
+ full_path = os.path.join(cache_path, subdirectory)
+ logger.debug("Trying to provide directory {0}".format(subdirectory))
+ try:
+ if full_path not in self.uptodate_paths:
+ logger.debug("Checking if update is available for "
+ "directory {0}".format(full_path))
+ # Need to check for updates at server
+ remote_url = self.build_download_url(server_url_prefix,
+ subdirectory, self.HASH_SUM_FILE)
+ memory_buffer = self.fetch_url(remote_url)
+ remote_hash = memory_buffer.getvalue().strip()
+ local_hash = self.read_hash_sum(full_path)
+ if not local_hash or local_hash != remote_hash:
+ logger.debug("Updating directory {0}".format(full_path))
+ download_url = self.build_download_url(server_url_prefix,
+ subdirectory, self.ARCHIVE_NAME)
+ membuffer = self.fetch_url(download_url)
+ self.invalidate_directory(full_path)
+ self.unpack_archive(membuffer, full_path)
+ self.write_hash_sum(full_path, remote_hash)
+ # Finally consider cache directory up-to-date
+ self.uptodate_paths.append(full_path)
+ except CachingException, e:
+ if self.tolerate_download_failures:
+ # ignore
+ logger.warn("Error occured during cache update. "
+ "Error tolerate setting is set to true, so"
+ " ignoring this error and continuing with current cache. "
+ "Error details: {0}".format(str(e)))
+ else:
+ raise # we are not tolerant to exceptions, command execution will fail
+ return full_path
+
+
+ def build_download_url(self, server_url_prefix,
+ directory, filename):
+ """
+ Builds up a proper download url for file. Used for downloading files
+ from the server.
+ directory - relative path
+ filename - file inside directory we are trying to fetch
+ """
+ return "{0}/{1}/{2}".format(server_url_prefix,
+ directory, filename)
+
+
+ def fetch_url(self, url):
+ """
+ Fetches content on url to in-memory buffer and returns the resulting buffer.
+ May throw exceptions because of various reasons
+ """
+ logger.debug("Trying to download {0}".format(url))
+ try:
+ memory_buffer = StringIO.StringIO()
+ u = urllib2.urlopen(url, timeout=self.SOCKET_TIMEOUT)
+ logger.debug("Connected with {0} with code {1}".format(u.geturl(),
+ u.getcode()))
+ buff = u.read(self.BLOCK_SIZE)
+ while buff:
+ memory_buffer.write(buff)
+ buff = u.read(self.BLOCK_SIZE)
+ if not buff:
+ break
+ return memory_buffer
+ except Exception, err:
+ raise CachingException("Can not download file from"
+ " url {0} : {1}".format(url, str(err)))
+
+
+ def read_hash_sum(self, directory):
+ """
+ Tries to read a hash sum from previously generated file. Returns string
+ containing hash or None
+ """
+ hash_file = os.path.join(directory, self.HASH_SUM_FILE)
+ try:
+ with open(hash_file) as fh:
+ return fh.readline().strip()
+ except:
+ return None # We don't care
+
+
+ def write_hash_sum(self, directory, new_hash):
+ """
+ Tries to read a hash sum from previously generated file. Returns string
+ containing hash or None
+ """
+ hash_file = os.path.join(directory, self.HASH_SUM_FILE)
+ try:
+ with open(hash_file, "w") as fh:
+ fh.write(new_hash)
+ except Exception, err:
+ raise CachingException("Can not write to file {0} : {1}".format(hash_file,
+ str(err)))
+
+
+ def invalidate_directory(self, directory):
+ """
+ Recursively removes directory content (if any). Also, creates
+ directory and any parent directories if needed. May throw exceptions
+ on permission problems
+ """
+ logger.debug("Invalidating directory {0}".format(directory))
+ try:
+ if os.path.isfile(directory): # It would be a strange situation
+ os.unlink(directory)
+ elif os.path.isdir(directory):
+ shutil.rmtree(directory)
+ # create directory itself and any parent directories
+ os.makedirs(directory)
+ except Exception, err:
+ raise CachingException("Can not invalidate cache directory {0}: {1}",
+ directory, str(err))
+
+
+ def unpack_archive(self, mem_buffer, target_directory):
+ """
+ Unpacks contents of in-memory buffer to file system.
+ In-memory buffer is expected to contain a valid zip archive
+ """
+ try:
+ zfile = zipfile.ZipFile(mem_buffer)
+ for name in zfile.namelist():
+ (dirname, filename) = os.path.split(name)
+ concrete_dir=os.path.abspath(os.path.join(target_directory, dirname))
+ if not os.path.isdir(concrete_dir):
+ os.makedirs(concrete_dir)
+ logger.debug("Unpacking file {0} to {1}".format(name, concrete_dir))
+ zfile.extract(name, target_directory)
+ except Exception, err:
+ raise CachingException("Can not unpack zip file to "
+ "directory {0} : {1}".format(
+ target_directory, str(err)))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 1918641..abc6edc 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -143,8 +143,13 @@ class TestActionQueue(TestCase):
@patch.object(ActionQueue, "process_command")
@patch.object(Queue, "get")
- def test_ActionQueueStartStop(self, get_mock, process_command_mock):
- actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
+ get_mock, process_command_mock):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ config = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
actionQueue.start()
time.sleep(0.1)
actionQueue.stop()
@@ -158,7 +163,8 @@ class TestActionQueue(TestCase):
@patch.object(ActionQueue, "execute_status_command")
def test_process_command(self, execute_status_command_mock,
execute_command_mock, print_exc_mock):
- actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
execution_command = {
'commandType' : ActionQueue.EXECUTION_COMMAND,
}
@@ -228,7 +234,10 @@ class TestActionQueue(TestCase):
config = AmbariConfig().getConfig()
tempdir = tempfile.gettempdir()
config.set('agent', 'prefix', tempdir)
- actionQueue = ActionQueue(config, 'dummy_controller')
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
unfreeze_flag = threading.Event()
puppet_execution_result_dict = {
'stdout': 'out',
@@ -388,7 +397,10 @@ class TestActionQueue(TestCase):
config = AmbariConfig().getConfig()
tempdir = tempfile.gettempdir()
config.set('agent', 'prefix', tempdir)
- actionQueue = ActionQueue(config, 'dummy_controller')
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
actionQueue.execute_command(self.datanode_restart_command)
report = actionQueue.result()
expected = {'actionId': '1-1',
@@ -414,11 +426,15 @@ class TestActionQueue(TestCase):
@patch.object(CustomServiceOrchestrator, "requestComponentStatus")
@patch.object(ActionQueue, "execute_command")
@patch.object(LiveStatus, "build")
- def test_execute_status_command(self, build_mock, execute_command_mock,
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_execute_status_command(self, CustomServiceOrchestrator_mock,
+ build_mock, execute_command_mock,
requestComponentStatus_mock, read_stack_version_mock,
determine_command_format_version_mock,
status_update_callback):
- actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
build_mock.return_value = "dummy report"
# Check execution ov V1 status command
@@ -441,7 +457,10 @@ class TestActionQueue(TestCase):
self.assertTrue(requestComponentStatus_mock.called)
- def test_determine_command_format_version(self):
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_determine_command_format_version(self,
+ CustomServiceOrchestrator_mock):
+ CustomServiceOrchestrator_mock.return_value = None
v1_command = {
'commandParams': {
'schema_version': '1.0'
@@ -455,7 +474,8 @@ class TestActionQueue(TestCase):
current_command = {
# Absent 'commandParams' section
}
- actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
self.assertEqual(actionQueue.determine_command_format_version(v1_command),
ActionQueue.COMMAND_FORMAT_V1)
self.assertEqual(actionQueue.determine_command_format_version(v2_command),
@@ -469,12 +489,16 @@ class TestActionQueue(TestCase):
@patch.object(PuppetExecutor, "runCommand")
@patch.object(CustomServiceOrchestrator, "runCommand")
@patch.object(ActionQueue, "status_update_callback")
+ @patch.object(CustomServiceOrchestrator, "__init__")
def test_command_execution_depending_on_command_format(self,
+ CustomServiceOrchestrator_mock,
status_update_callback_mock,
custom_ex_runCommand_mock,
puppet_runCommand_mock, open_mock,
determine_command_format_version_mock):
- actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
ret = {
'stdout' : '',
'stderr' : '',
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index b4439e9..1e110da 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -209,7 +209,32 @@ class TestController(unittest.TestCase):
heartbeatWithServer = MagicMock(name="heartbeatWithServer")
self.controller.heartbeatWithServer = heartbeatWithServer
- self.controller.isRegistered = True;
+ listener1 = MagicMock()
+ listener2 = MagicMock()
+ self.controller.registration_listeners.append(listener1)
+ self.controller.registration_listeners.append(listener2)
+ self.controller.isRegistered = True
+ self.controller.registerAndHeartbeat()
+ registerWithServer.assert_called_once_with()
+ heartbeatWithServer.assert_called_once_with()
+ self.assertTrue(listener1.called)
+ self.assertTrue(listener2.called)
+
+ self.controller.registerWithServer = \
+ Controller.Controller.registerWithServer
+ self.controller.heartbeatWithServer = \
+ Controller.Controller.registerWithServer
+
+
+ @patch("time.sleep")
+ def test_registerAndHeartbeat_check_registration_listener(self, sleepMock):
+ registerWithServer = MagicMock(name="registerWithServer")
+ registerWithServer.return_value = {"response":"resp"}
+ self.controller.registerWithServer = registerWithServer
+ heartbeatWithServer = MagicMock(name="heartbeatWithServer")
+ self.controller.heartbeatWithServer = heartbeatWithServer
+
+ self.controller.isRegistered = True
self.controller.registerAndHeartbeat()
registerWithServer.assert_called_once_with()
heartbeatWithServer.assert_called_once_with()
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index 971048b..54c17a6 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -56,11 +56,26 @@ class TestCustomServiceOrchestrator(TestCase):
self.config.set('python', 'custom_actions_dir', tmpdir)
+ @patch.object(FileCache, "__init__")
+ def test_add_reg_listener_to_controller(self, FileCache_mock):
+ FileCache_mock.return_value = None
+ dummy_controller = MagicMock()
+ config = AmbariConfig().getConfig()
+ tempdir = tempfile.gettempdir()
+ config.set('agent', 'prefix', tempdir)
+ CustomServiceOrchestrator(config, dummy_controller)
+ self.assertTrue(dummy_controller.registration_listeners.append.called)
+
+
@patch.object(manifestGenerator, 'decompressClusterHostInfo')
@patch("hostname.public_hostname")
@patch("os.path.isfile")
@patch("os.unlink")
- def test_dump_command_to_json(self, unlink_mock, isfile_mock, hostname_mock, decompress_cluster_host_info_mock):
+ @patch.object(FileCache, "__init__")
+ def test_dump_command_to_json(self, FileCache_mock, unlink_mock,
+ isfile_mock, hostname_mock,
+ decompress_cluster_host_info_mock):
+ FileCache_mock.return_value = None
hostname_mock.return_value = "test.hst"
command = {
'commandType': 'EXECUTION_COMMAND',
@@ -86,7 +101,8 @@ class TestCustomServiceOrchestrator(TestCase):
config = AmbariConfig().getConfig()
tempdir = tempfile.gettempdir()
config.set('agent', 'prefix', tempdir)
- orchestrator = CustomServiceOrchestrator(config)
+ dummy_controller = MagicMock()
+ orchestrator = CustomServiceOrchestrator(config, dummy_controller)
isfile_mock.return_value = True
# Test dumping EXECUTION_COMMAND
json_file = orchestrator.dump_command_to_json(command)
@@ -112,9 +128,12 @@ class TestCustomServiceOrchestrator(TestCase):
@patch("os.path.exists")
- def test_resolve_script_path(self, exists_mock):
+ @patch.object(FileCache, "__init__")
+ def test_resolve_script_path(self, FileCache_mock, exists_mock):
+ FileCache_mock.return_value = None
+ dummy_controller = MagicMock()
config = AmbariConfig().getConfig()
- orchestrator = CustomServiceOrchestrator(config)
+ orchestrator = CustomServiceOrchestrator(config, dummy_controller)
# Testing existing path
exists_mock.return_value = True
path = orchestrator.\
@@ -136,14 +155,18 @@ class TestCustomServiceOrchestrator(TestCase):
@patch.object(FileCache, "get_hook_base_dir")
@patch.object(CustomServiceOrchestrator, "dump_command_to_json")
@patch.object(PythonExecutor, "run_file")
- def test_runCommand(self, run_file_mock, dump_command_to_json_mock,
+ @patch.object(FileCache, "__init__")
+ def test_runCommand(self, FileCache_mock,
+ run_file_mock, dump_command_to_json_mock,
get_hook_base_dir_mock, get_service_base_dir_mock,
resolve_hook_script_path_mock, resolve_script_path_mock):
+ FileCache_mock.return_value = None
command = {
'role' : 'REGION_SERVER',
'hostLevelParams' : {
'stack_name' : 'HDP',
'stack_version' : '2.0.7',
+ 'jdk_location' : 'some_location'
},
'commandParams': {
'script_type': 'PYTHON',
@@ -159,7 +182,8 @@ class TestCustomServiceOrchestrator(TestCase):
resolve_hook_script_path_mock.return_value = \
('/hooks_dir/prefix-command/scripts/hook.py',
'/hooks_dir/prefix-command')
- orchestrator = CustomServiceOrchestrator(self.config)
+ dummy_controller = MagicMock()
+ orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
get_hook_base_dir_mock.return_value = "/hooks/"
# normal run case
run_file_mock.return_value = {
@@ -206,10 +230,19 @@ class TestCustomServiceOrchestrator(TestCase):
@patch.object(CustomServiceOrchestrator, "dump_command_to_json")
@patch.object(PythonExecutor, "run_file")
- def test_runCommand_custom_action(self, run_file_mock, dump_command_to_json_mock):
+ @patch.object(FileCache, "__init__")
+ @patch.object(FileCache, "get_custom_actions_base_dir")
+ def test_runCommand_custom_action(self, get_custom_actions_base_dir_mock,
+ FileCache_mock,
+ run_file_mock, dump_command_to_json_mock):
+ FileCache_mock.return_value = None
+ get_custom_actions_base_dir_mock.return_value = "some path"
_, script = tempfile.mkstemp()
command = {
'role' : 'any',
+ 'hostLevelParams' : {
+ 'jdk_location' : 'some_location'
+ },
'commandParams': {
'script_type': 'PYTHON',
'script': 'some_custom_action.py',
@@ -218,8 +251,8 @@ class TestCustomServiceOrchestrator(TestCase):
'taskId' : '3',
'roleCommand': 'ACTIONEXECUTE'
}
-
- orchestrator = CustomServiceOrchestrator(self.config)
+ dummy_controller = MagicMock()
+ orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
# normal run case
run_file_mock.return_value = {
'stdout' : 'sss',
@@ -235,9 +268,11 @@ class TestCustomServiceOrchestrator(TestCase):
@patch("os.path.isfile")
- def test_resolve_hook_script_path(self, isfile_mock):
-
- orchestrator = CustomServiceOrchestrator(self.config)
+ @patch.object(FileCache, "__init__")
+ def test_resolve_hook_script_path(self, FileCache_mock, isfile_mock):
+ FileCache_mock.return_value = None
+ dummy_controller = MagicMock()
+ orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
# Testing None param
res1 = orchestrator.resolve_hook_script_path(None, "prefix", "command",
"script_type")
@@ -256,7 +291,9 @@ class TestCustomServiceOrchestrator(TestCase):
@patch.object(CustomServiceOrchestrator, "runCommand")
- def test_requestComponentStatus(self, runCommand_mock):
+ @patch.object(FileCache, "__init__")
+ def test_requestComponentStatus(self, FileCache_mock, runCommand_mock):
+ FileCache_mock.return_value = None
status_command = {
"serviceName" : 'HDFS',
"commandType" : "STATUS_COMMAND",
@@ -264,7 +301,8 @@ class TestCustomServiceOrchestrator(TestCase):
"componentName" : "DATANODE",
'configurations':{}
}
- orchestrator = CustomServiceOrchestrator(self.config)
+ dummy_controller = MagicMock()
+ orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
# Test alive case
runCommand_mock.return_value = {
"exitcode" : 0
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/TestFileCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestFileCache.py b/ambari-agent/src/test/python/ambari_agent/TestFileCache.py
index 5e389d5..023d19a 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestFileCache.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestFileCache.py
@@ -28,15 +28,12 @@ import tempfile
import time
from threading import Thread
-from PythonExecutor import PythonExecutor
-from CustomServiceOrchestrator import CustomServiceOrchestrator
-from FileCache import FileCache
+from FileCache import FileCache, CachingException
from AmbariConfig import AmbariConfig
from mock.mock import MagicMock, patch
import StringIO
import sys
-from ambari_agent import AgentException
-from AgentException import AgentException
+import shutil
class TestFileCache(TestCase):
@@ -51,56 +48,314 @@ class TestFileCache(TestCase):
self.config.add_section('agent')
self.config.set('agent', 'prefix', tmpdir)
self.config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ self.config.set('agent', 'tolerate_download_failures', "true")
- @patch("os.path.isdir")
- def test_get_service_base_dir(self, isdir_mock):
+ def test_reset(self):
fileCache = FileCache(self.config)
- # Check existing dir case
- isdir_mock.return_value = True
- service_subpath = "HDP/2.1.1/services/ZOOKEEPER/package"
- base = fileCache.get_service_base_dir(service_subpath)
- self.assertEqual(base, "/var/lib/ambari-agent/cache/stacks/HDP/2.1.1/"
- "services/ZOOKEEPER/package")
- # Check absent dir case
- isdir_mock.return_value = False
- try:
- fileCache.get_service_base_dir(service_subpath)
- self.fail("Should throw an exception")
- except AgentException:
- pass # Expected
+ fileCache.uptodate_paths.append('dummy-path')
+ fileCache.reset()
+ self.assertFalse(fileCache.uptodate_paths)
+ @patch.object(FileCache, "provide_directory")
+ def test_get_service_base_dir(self, provide_directory_mock):
+ provide_directory_mock.return_value = "dummy value"
+ fileCache = FileCache(self.config)
+ command = {
+ 'commandParams' : {
+ 'service_package_folder' : 'HDP/2.1.1/services/ZOOKEEPER/package'
+ }
+ }
+ res = fileCache.get_service_base_dir(command, "server_url_pref")
+ self.assertEquals(
+ pprint.pformat(provide_directory_mock.call_args_list[0][0]),
+ "('/var/lib/ambari-agent/cache',\n "
+ "'stacks/HDP/2.1.1/services/ZOOKEEPER/package',\n"
+ " 'server_url_pref')")
+ self.assertEquals(res, "dummy value")
- @patch("os.path.isdir")
- def test_get_hook_base_dir(self, isdir_mock):
+ @patch.object(FileCache, "provide_directory")
+ def test_get_hook_base_dir(self, provide_directory_mock):
fileCache = FileCache(self.config)
# Check missing parameter
command = {
'commandParams' : {
}
}
- base = fileCache.get_hook_base_dir(command)
+ base = fileCache.get_hook_base_dir(command, "server_url_pref")
self.assertEqual(base, None)
+ self.assertFalse(provide_directory_mock.called)
# Check existing dir case
- isdir_mock.return_value = True
command = {
'commandParams' : {
'hooks_folder' : 'HDP/2.1.1/hooks'
}
}
- base = fileCache.get_hook_base_dir(command)
- self.assertEqual(base, "/var/lib/ambari-agent/cache/stacks/HDP/2.1.1/hooks")
+ provide_directory_mock.return_value = "dummy value"
+ fileCache = FileCache(self.config)
+ res = fileCache.get_hook_base_dir(command, "server_url_pref")
+ self.assertEquals(
+ pprint.pformat(provide_directory_mock.call_args_list[0][0]),
+ "('/var/lib/ambari-agent/cache', "
+ "'stacks/HDP/2.1.1/hooks', "
+ "'server_url_pref')")
+ self.assertEquals(res, "dummy value")
+
+
+ @patch.object(FileCache, "provide_directory")
+ def test_get_custom_actions_base_dir(self, provide_directory_mock):
+ provide_directory_mock.return_value = "dummy value"
+ fileCache = FileCache(self.config)
+ res = fileCache.get_custom_actions_base_dir("server_url_pref")
+ self.assertEquals(
+ pprint.pformat(provide_directory_mock.call_args_list[0][0]),
+ "('/var/lib/ambari-agent/cache', 'custom_actions', 'server_url_pref')")
+ self.assertEquals(res, "dummy value")
+
+
+ @patch.object(FileCache, "build_download_url")
+ @patch.object(FileCache, "fetch_url")
+ @patch.object(FileCache, "read_hash_sum")
+ @patch.object(FileCache, "invalidate_directory")
+ @patch.object(FileCache, "unpack_archive")
+ @patch.object(FileCache, "write_hash_sum")
+ def test_provide_directory(self, write_hash_sum_mock, unpack_archive_mock,
+ invalidate_directory_mock,
+ read_hash_sum_mock, fetch_url_mock,
+ build_download_url_mock):
+ build_download_url_mock.return_value = "http://dummy-url/"
+ HASH1 = "hash1"
+ membuffer = MagicMock()
+ membuffer.getvalue.return_value.strip.return_value = HASH1
+ fileCache = FileCache(self.config)
+
+ # Test uptodate dirs after start
+ self.assertFalse(fileCache.uptodate_paths)
+
+ # Test initial downloading (when dir does not exist)
+ fetch_url_mock.return_value = membuffer
+ read_hash_sum_mock.return_value = "hash2"
+ res = fileCache.provide_directory("cache_path", "subdirectory",
+ "server_url_prefix")
+ self.assertTrue(invalidate_directory_mock.called)
+ self.assertTrue(write_hash_sum_mock.called)
+ self.assertEquals(fetch_url_mock.call_count, 2)
+ self.assertEquals(pprint.pformat(fileCache.uptodate_paths),
+ "['cache_path/subdirectory']")
+ self.assertEquals(res, 'cache_path/subdirectory')
+
+ fetch_url_mock.reset_mock()
+ write_hash_sum_mock.reset_mock()
+ invalidate_directory_mock.reset_mock()
+ unpack_archive_mock.reset_mock()
+
+ # Test cache invalidation when local hash does not differ
+ fetch_url_mock.return_value = membuffer
+ read_hash_sum_mock.return_value = HASH1
+ fileCache.reset()
+
+ res = fileCache.provide_directory("cache_path", "subdirectory",
+ "server_url_prefix")
+ self.assertFalse(invalidate_directory_mock.called)
+ self.assertFalse(write_hash_sum_mock.called)
+ self.assertEquals(fetch_url_mock.call_count, 1)
+ self.assertEquals(pprint.pformat(fileCache.uptodate_paths),
+ "['cache_path/subdirectory']")
+ self.assertEquals(res, 'cache_path/subdirectory')
+
+ fetch_url_mock.reset_mock()
+ write_hash_sum_mock.reset_mock()
+ invalidate_directory_mock.reset_mock()
+ unpack_archive_mock.reset_mock()
+
+ # Test execution path when path is up-to date (already checked)
+ res = fileCache.provide_directory("cache_path", "subdirectory",
+ "server_url_prefix")
+ self.assertFalse(invalidate_directory_mock.called)
+ self.assertFalse(write_hash_sum_mock.called)
+ self.assertEquals(fetch_url_mock.call_count, 0)
+ self.assertEquals(pprint.pformat(fileCache.uptodate_paths),
+ "['cache_path/subdirectory']")
+ self.assertEquals(res, 'cache_path/subdirectory')
+
+ # Check exception handling when tolerance is disabled
+ self.config.set('agent', 'tolerate_download_failures', "false")
+ fetch_url_mock.side_effect = self.caching_exc_side_effect
+ fileCache = FileCache(self.config)
+ try:
+ fileCache.provide_directory("cache_path", "subdirectory",
+ "server_url_prefix")
+ self.fail('CachingException not thrown')
+ except CachingException:
+ pass # Expected
+ except Exception, e:
+ self.fail('Unexpected exception thrown:' + str(e))
+
+ # Check that unexpected exceptions are still propagated when
+ # tolerance is enabled
+ self.config.set('agent', 'tolerate_download_failures', "false")
+ fetch_url_mock.side_effect = self.exc_side_effect
+ fileCache = FileCache(self.config)
+ try:
+ fileCache.provide_directory("cache_path", "subdirectory",
+ "server_url_prefix")
+ self.fail('Exception not thrown')
+ except Exception:
+ pass # Expected
- # Check absent dir case
+
+ # Check exception handling when tolerance is enabled
+ self.config.set('agent', 'tolerate_download_failures', "true")
+ fetch_url_mock.side_effect = self.caching_exc_side_effect
+ fileCache = FileCache(self.config)
+ res = fileCache.provide_directory("cache_path", "subdirectory",
+ "server_url_prefix")
+ self.assertEquals(res, 'cache_path/subdirectory')
+
+
+ def test_build_download_url(self):
+ fileCache = FileCache(self.config)
+ url = fileCache.build_download_url('http://localhost:8080/resources/',
+ 'stacks/HDP/2.1.1/hooks', 'archive.zip')
+ self.assertEqual(url,
+ 'http://localhost:8080/resources//stacks/HDP/2.1.1/hooks/archive.zip')
+
+
+ @patch("urllib2.urlopen")
+ def test_fetch_url(self, urlopen_mock):
+ fileCache = FileCache(self.config)
+ remote_url = "http://dummy-url/"
+ # Test normal download
+ test_str = 'abc' * 100000 # Very long string
+ test_string_io = StringIO.StringIO(test_str)
+ test_buffer = MagicMock()
+ test_buffer.read.side_effect = test_string_io.read
+ urlopen_mock.return_value = test_buffer
+
+ memory_buffer = fileCache.fetch_url(remote_url)
+
+ self.assertEquals(memory_buffer.getvalue(), test_str)
+ self.assertEqual(test_buffer.read.call_count, 20) # depends on buffer size
+ # Test exception handling
+ test_buffer.read.side_effect = self.exc_side_effect
+ try:
+ fileCache.fetch_url(remote_url)
+ self.fail('CachingException not thrown')
+ except CachingException:
+ pass # Expected
+ except Exception, e:
+ self.fail('Unexpected exception thrown:' + str(e))
+
+
+ def test_read_write_hash_sum(self):
+ tmpdir = tempfile.mkdtemp()
+ dummyhash = "DUMMY_HASH"
+ fileCache = FileCache(self.config)
+ fileCache.write_hash_sum(tmpdir, dummyhash)
+ newhash = fileCache.read_hash_sum(tmpdir)
+ self.assertEquals(newhash, dummyhash)
+ shutil.rmtree(tmpdir)
+ # Test read of not existing file
+ newhash = fileCache.read_hash_sum(tmpdir)
+ self.assertEquals(newhash, None)
+ # Test write to not existing file
+ with patch("__builtin__.open") as open_mock:
+ open_mock.side_effect = self.exc_side_effect
+ try:
+ fileCache.write_hash_sum(tmpdir, dummyhash)
+ self.fail('CachingException not thrown')
+ except CachingException:
+ pass # Expected
+ except Exception, e:
+ self.fail('Unexpected exception thrown:' + str(e))
+
+
+ @patch("os.path.isfile")
+ @patch("os.path.isdir")
+ @patch("os.unlink")
+ @patch("shutil.rmtree")
+ @patch("os.makedirs")
+ def test_invalidate_directory(self, makedirs_mock, rmtree_mock,
+ unlink_mock, isdir_mock, isfile_mock):
+ fileCache = FileCache(self.config)
+ # Test execution flow if path points to file
+ isfile_mock.return_value = True
isdir_mock.return_value = False
+
+ fileCache.invalidate_directory("dummy-dir")
+
+ self.assertTrue(unlink_mock.called)
+ self.assertFalse(rmtree_mock.called)
+ self.assertTrue(makedirs_mock.called)
+
+ unlink_mock.reset_mock()
+ rmtree_mock.reset_mock()
+ makedirs_mock.reset_mock()
+
+ # Test execution flow if path points to dir
+ isfile_mock.return_value = False
+ isdir_mock.return_value = True
+
+ fileCache.invalidate_directory("dummy-dir")
+
+ self.assertFalse(unlink_mock.called)
+ self.assertTrue(rmtree_mock.called)
+ self.assertTrue(makedirs_mock.called)
+
+ unlink_mock.reset_mock()
+ rmtree_mock.reset_mock()
+ makedirs_mock.reset_mock()
+
+ # Test exception handling
+ makedirs_mock.side_effect = self.exc_side_effect
try:
- fileCache.get_hook_base_dir(command)
- self.fail("Should throw an exception")
- except AgentException:
+ fileCache.invalidate_directory("dummy-dir")
+ self.fail('CachingException not thrown')
+ except CachingException:
pass # Expected
+ except Exception, e:
+ self.fail('Unexpected exception thrown:' + str(e))
+
+
+ def test_unpack_archive(self):
+ tmpdir = tempfile.mkdtemp()
+ dummy_archive = os.path.join("ambari_agent", "dummy_files",
+ "dummy_archive.zip")
+ # Test normal flow
+ with open(dummy_archive, "r") as f:
+ data = f.read(os.path.getsize(dummy_archive))
+ membuf = StringIO.StringIO(data)
+
+ fileCache = FileCache(self.config)
+ fileCache.unpack_archive(membuf, tmpdir)
+ # Count summary size of unpacked files:
+ total_size = 0
+ total_files = 0
+ total_dirs = 0
+ for dirpath, dirnames, filenames in os.walk(tmpdir):
+ total_dirs += 1
+ for f in filenames:
+ fp = os.path.join(dirpath, f)
+ total_size += os.path.getsize(fp)
+ total_files += 1
+ self.assertEquals(total_size, 51258L)
+ self.assertEquals(total_files, 28)
+ self.assertEquals(total_dirs, 8)
+ shutil.rmtree(tmpdir)
+
+ # Test exception handling
+ with patch("os.path.isdir") as isdir_mock:
+ isdir_mock.side_effect = self.exc_side_effect
+ try:
+ fileCache.unpack_archive(membuf, tmpdir)
+ self.fail('CachingException not thrown')
+ except CachingException:
+ pass # Expected
+ except Exception, e:
+ self.fail('Unexpected exception thrown:' + str(e))
def tearDown(self):
@@ -108,3 +363,8 @@ class TestFileCache(TestCase):
sys.stdout = sys.__stdout__
+ def exc_side_effect(self, *a):
+ raise Exception("horrible_exc")
+
+ def caching_exc_side_effect(self, *a):
+ raise CachingException("horrible_caching_exc")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
index 906244d..c6a834d 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
@@ -47,7 +47,12 @@ class TestHeartbeat(TestCase):
def test_build(self):
- actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
+ config = AmbariConfig.AmbariConfig().getConfig()
+ config.set('agent', 'prefix', 'tmp')
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
heartbeat = Heartbeat(actionQueue)
result = heartbeat.build(100)
print "Heartbeat: " + str(result)
@@ -80,7 +85,12 @@ class TestHeartbeat(TestCase):
'exitCode': 777}],
'componentStatus': [{'status': 'HEALTHY', 'componentName': 'NAMENODE'}]
}
- actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
+ config = AmbariConfig.AmbariConfig().getConfig()
+ config.set('agent', 'prefix', 'tmp')
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
heartbeat = Heartbeat(actionQueue)
hb = heartbeat.build(id = 10, state_interval=1, componentsMapped=True)
self.assertEqual(register_mock.call_args_list[0][0][1], True)
@@ -92,7 +102,12 @@ class TestHeartbeat(TestCase):
@patch.object(ActionQueue, "result")
def test_build_long_result(self, result_mock):
- actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
+ config = AmbariConfig.AmbariConfig().getConfig()
+ config.set('agent', 'prefix', 'tmp')
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
result_mock.return_value = {
'reports': [{'status': 'IN_PROGRESS',
'stderr': 'Read from /tmp/errors-3.txt',
@@ -178,7 +193,12 @@ class TestHeartbeat(TestCase):
@patch.object(HostInfo, 'register')
def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock):
- actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
+ config = AmbariConfig.AmbariConfig().getConfig()
+ config.set('agent', 'prefix', 'tmp')
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
statusCommand = {
"serviceName" : 'HDFS',
"commandType" : "STATUS_COMMAND",
@@ -198,7 +218,12 @@ class TestHeartbeat(TestCase):
@patch.object(HostInfo, 'register')
def test_heartbeat_host_check_no_cmd(self, register_mock):
- actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller')
+ config = AmbariConfig.AmbariConfig().getConfig()
+ config.set('agent', 'prefix', 'tmp')
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
heartbeat = Heartbeat(actionQueue)
heartbeat.build(12, 6)
self.assertTrue(register_mock.called)
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-agent/src/test/python/ambari_agent/dummy_files/dummy_archive.zip
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/dummy_archive.zip b/ambari-agent/src/test/python/ambari_agent/dummy_files/dummy_archive.zip
new file mode 100644
index 0000000..9a77d7e
Binary files /dev/null and b/ambari-agent/src/test/python/ambari_agent/dummy_files/dummy_archive.zip differ
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml
index 1d6b427..bc83dd8 100644
--- a/ambari-server/pom.xml
+++ b/ambari-server/pom.xml
@@ -327,7 +327,6 @@
<location>${project.build.directory}/DBConnectionVerification.jar</location>
</source>
<source>
- <!-- This file is also included into agent rpm -->
<location>src/main/resources/role_command_order.json</location>
</source>
</sources>
@@ -390,6 +389,9 @@
<groupname>root</groupname>
<sources>
<source>
+ <location>src/main/python/ambari_server</location>
+ </source>
+ <source>
<location>src/main/python/bootstrap.py</location>
</source>
<source>
@@ -420,6 +422,18 @@
</source>
</sources>
</mapping>
+ <mapping>
+ <!-- custom actions root-->
+ <directory>/var/lib/ambari-server/resources/custom_actions</directory>
+ <filemode>755</filemode>
+ <username>root</username>
+ <groupname>root</groupname>
+ <sources>
+ <source>
+ <location>src/main/resources/custom_actions</location>
+ </source>
+ </sources>
+ </mapping>
</mappings>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index 2babd6b..a339784 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -28,6 +28,7 @@ import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.state.*;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.host.HostHeartbeatLostEvent;
@@ -49,6 +50,7 @@ public class HeartbeatMonitor implements Runnable {
private Thread monitorThread = null;
private final ConfigHelper configHelper;
private final AmbariMetaInfo ambariMetaInfo;
+ private final AmbariManagementController ambariManagementController;
private final Configuration configuration;
public HeartbeatMonitor(Clusters clusters, ActionQueue aq, ActionManager am,
@@ -59,6 +61,8 @@ public class HeartbeatMonitor implements Runnable {
this.threadWakeupInterval = threadWakeupInterval;
this.configHelper = injector.getInstance(ConfigHelper.class);
this.ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class);
+ this.ambariManagementController = injector.getInstance(
+ AmbariManagementController.class);
this.configuration = injector.getInstance(Configuration.class);
}
@@ -191,7 +195,6 @@ public class HeartbeatMonitor implements Runnable {
String serviceName = sch.getServiceName();
String componentName = sch.getServiceComponentName();
Service service = cluster.getService(sch.getServiceName());
- ServiceComponent sc = service.getServiceComponent(componentName);
StackId stackId = cluster.getDesiredStackVersion();
ServiceInfo serviceInfo = ambariMetaInfo.getServiceInfo(stackId.getStackName(),
stackId.getStackVersion(), serviceName);
@@ -267,6 +270,7 @@ public class HeartbeatMonitor implements Runnable {
commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
// Fill host level params
Map<String, String> hostLevelParams = statusCmd.getHostLevelParams();
+ hostLevelParams.put(JDK_LOCATION, ambariManagementController.getJdkResourceUrl());
hostLevelParams.put(STACK_NAME, stackId.getStackName());
hostLevelParams.put(STACK_VERSION, stackId.getStackVersion());
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
index 7f2d1fb..be800e7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -125,8 +125,8 @@ public class AmbariCustomCommandExecutionHelper {
@Inject
private ConfigHelper configHelper;
- private Boolean isServiceCheckCommand(String
- command, String service) {
+
+ private Boolean isServiceCheckCommand(String command, String service) {
List<String> actions = actionMetadata.getActions(service);
if (actions == null || actions.size() == 0) {
return false;
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index e4ebc1c..b9f62e4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -322,9 +322,7 @@ public class AmbariServer {
resources.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig");
resources.setInitParameter("com.sun.jersey.config.property.packages",
- "org.apache.ambari.server.resources.api.rest;" + "org.apache.ambari.server.api");
- resources.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
- "true");
+ "org.apache.ambari.server.resources.api.rest;");
root.addServlet(resources, "/resources/*");
resources.setInitOrder(6);
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/python/ambari-server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari-server.py b/ambari-server/src/main/python/ambari-server.py
index 61233d3..e3439e0 100755
--- a/ambari-server/src/main/python/ambari-server.py
+++ b/ambari-server/src/main/python/ambari-server.py
@@ -39,6 +39,7 @@ import datetime
import tempfile
import random
import pwd
+from ambari_server.resourceFilesKeeper import ResourceFilesKeeper, KeeperException
# debug settings
VERBOSE = False
@@ -593,6 +594,8 @@ NR_ADJUST_OWNERSHIP_LIST =[
( "/var/lib/ambari-server/keys/db", "700", "{0}", False ),
( "/var/lib/ambari-server/keys/db/newcerts", "700", "{0}", False ),
( "/var/lib/ambari-server/keys/.ssh", "700", "{0}", False ),
+ ( "/var/lib/ambari-server/resources/stacks/", "755", "{0}", True ),
+ ( "/var/lib/ambari-server/resources/custom_actions/", "755", "{0}", True ),
( "/etc/ambari-server/conf", "644", "{0}", True ),
( "/etc/ambari-server/conf", "755", "{0}", False ),
( "/etc/ambari-server/conf/password.dat", "640", "{0}", False ),
@@ -2550,6 +2553,20 @@ def start(args):
print "Please do not forget to start PostgreSQL server."
properties = get_ambari_properties()
+ stack_location = get_stack_location(properties)
+ # Hack: we determine resource dir as a parent dir for stack_location
+ resources_location = os.path.dirname(stack_location)
+ resource_files_keeper = ResourceFilesKeeper(resources_location)
+
+ try:
+ print "Organizing resource files at {0}...".format(resources_location,
+ verbose=VERBOSE)
+ resource_files_keeper.perform_housekeeping()
+ except KeeperException, ex:
+ msg = "Can not organize resource files at {0}: {1}".format(
+ resources_location, str(ex))
+ raise FatalException(-1, msg)
+
isSecure = get_is_secure(properties)
(isPersisted, masterKeyFile) = get_is_persisted(properties)
environ = os.environ.copy()
@@ -2804,16 +2821,20 @@ def upgrade_local_repo_db(args, dbkey, dbvalue):
return retcode
pass
+
+def get_stack_location(properties):
+ stack_location = properties[STACK_LOCATION_KEY]
+ if stack_location is None:
+ stack_location = STACK_LOCATION_DEFAULT
+ return stack_location
+
def upgrade_local_repo(args):
properties = get_ambari_properties()
if properties == -1:
print_error_msg ("Error getting ambari properties")
return -1
- stack_location = properties[STACK_LOCATION_KEY]
- if stack_location is None:
- stack_location = STACK_LOCATION_DEFAULT
-
+ stack_location = get_stack_location(properties)
stack_root_local = os.path.join(stack_location, "HDPLocal")
if not os.path.exists(stack_root_local):
print_info_msg("HDPLocal stack directory does not exist, skipping")
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/python/ambari_server/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/__init__.py b/ambari-server/src/main/python/ambari_server/__init__.py
new file mode 100644
index 0000000..16818c9
--- /dev/null
+++ b/ambari-server/src/main/python/ambari_server/__init__.py
@@ -0,0 +1,21 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/main/python/ambari_server/resourceFilesKeeper.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/resourceFilesKeeper.py b/ambari-server/src/main/python/ambari_server/resourceFilesKeeper.py
new file mode 100644
index 0000000..92cab3d
--- /dev/null
+++ b/ambari-server/src/main/python/ambari_server/resourceFilesKeeper.py
@@ -0,0 +1,258 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import hashlib
+
+import os, sys
+import zipfile
+import glob
+import pprint
+from xml.dom import minidom
+
+
+class KeeperException(Exception):
+ pass
+
+class ResourceFilesKeeper():
+ """
+ This class incapsulates all utility methods for resource files maintenance.
+ """
+
+ HOOKS_DIR="hooks"
+ PACKAGE_DIR="package"
+ STACKS_DIR="stacks"
+ CUSTOM_ACTIONS_DIR="custom_actions"
+
+ # For these directories archives are created
+ ARCHIVABLE_DIRS = [HOOKS_DIR, PACKAGE_DIR]
+
+ HASH_SUM_FILE=".hash"
+ ARCHIVE_NAME="archive.zip"
+
+ PYC_EXT=".pyc"
+ METAINFO_XML = "metainfo.xml"
+
+ BUFFER = 1024 * 32
+
+ # Change that to True to see debug output at stderr
+ DEBUG=False
+
+ def __init__(self, resources_dir, verbose=False):
+ self.resources_dir = resources_dir
+ self.verbose = verbose
+
+
+ def perform_housekeeping(self):
+ """
+ Performs housekeeping operations on resource files
+ """
+ self.update_directory_archieves()
+ # probably, later we will need some additional operations
+
+
+ def update_directory_archieves(self):
+ """
+ Please see AMBARI-4481 for more details
+ """
+ stacks_root = os.path.join(self.resources_dir, self.STACKS_DIR)
+ self.dbg_out("Updating archives for stack dirs at {0}...".format(stacks_root))
+ active_stacks = self.list_active_stacks(stacks_root)
+ self.dbg_out("Active stacks: {0}".format(pprint.pformat(active_stacks)))
+ # Iterate over active stack directories
+ for stack_dir in active_stacks:
+ for root, dirs, _ in os.walk(stack_dir):
+ for d in dirs:
+ if d in self.ARCHIVABLE_DIRS:
+ full_path = os.path.abspath(os.path.join(root, d))
+ self.update_directory_archive(full_path)
+
+
+ custom_actions_root = os.path.join(self.resources_dir,
+ self.CUSTOM_ACTIONS_DIR)
+ self.dbg_out("Updating archive for custom_actions dir at {0}...".format(
+ custom_actions_root))
+ self.update_directory_archive(custom_actions_root)
+
+
+
+ def list_active_stacks(self, stacks_root):
+ """
+ Builds a list of stack directories, that are active (enabled)
+ """
+ active_stacks = [] # Format: <stack_dir, ignore(True|False)>
+ glob_pattern = "{0}/*/*".format(stacks_root)
+ try:
+ stack_dirs = glob.glob(glob_pattern)
+ for directory in stack_dirs:
+ metainfo_file = os.path.join(directory, self.METAINFO_XML)
+ if os.path.exists(metainfo_file) and self.is_active_stack(metainfo_file):
+ active_stacks.append(directory)
+ return active_stacks
+ except Exception, err:
+ raise KeeperException("Can not list active stacks: {0}".format(str(err)))
+
+
+ def update_directory_archive(self, directory):
+ """
+ If hash sum for directory is not present or differs from saved value,
+ recalculates hash sum and creates directory archive
+ """
+ cur_hash = self.count_hash_sum(directory)
+ saved_hash = self.read_hash_sum(directory)
+ if cur_hash != saved_hash:
+ self.zip_directory(directory)
+ self.write_hash_sum(directory, cur_hash)
+
+
+ def count_hash_sum(self, directory):
+ """
+ Recursively counts hash sum of all files in directory and subdirectories.
+ Files and directories are processed in alphabetical order.
+ Ignores previously created directory archives and files containing
+ previously calculated hashes. Compiled pyc files are also ignored
+ """
+ try:
+ sha1 = hashlib.sha1()
+ file_list = []
+ for root, dirs, files in os.walk(directory):
+ for f in files:
+ if not self.is_ignored(f):
+ full_path = os.path.abspath(os.path.join(root, f))
+ file_list.append(full_path)
+ file_list.sort()
+ for path in file_list:
+ self.dbg_out("Counting hash of {0}".format(path))
+ with open(path, 'rb') as fh:
+ while True:
+ data = fh.read(self.BUFFER)
+ if not data:
+ break
+ sha1.update(data)
+ return sha1.hexdigest()
+ except Exception, err:
+ raise KeeperException("Can not calculate directory "
+ "hash: {0}".format(str(err)))
+
+
+ def read_hash_sum(self, directory):
+ """
+ Tries to read a hash sum from previously generated file. Returns string
+ containing hash or None
+ """
+ hash_file = os.path.join(directory, self.HASH_SUM_FILE)
+ if os.path.isfile(hash_file):
+ try:
+ with open(hash_file) as fh:
+ return fh.readline().strip()
+ except Exception, err:
+ raise KeeperException("Can not read file {0} : {1}".format(hash_file,
+ str(err)))
+ else:
+ return None
+
+
+ def write_hash_sum(self, directory, new_hash):
+ """
+ Tries to read a hash sum from previously generated file. Returns string
+ containing hash or None
+ """
+ hash_file = os.path.join(directory, self.HASH_SUM_FILE)
+ try:
+ with open(hash_file, "w") as fh:
+ fh.write(new_hash)
+ except Exception, err:
+ raise KeeperException("Can not write to file {0} : {1}".format(hash_file,
+ str(err)))
+
+
+ def zip_directory(self, directory):
+ """
+ Packs entire directory into zip file. Hash file is also packaged
+ into archive
+ """
+ self.dbg_out("creating archive for directory {0}".format(directory))
+ try:
+ zf = zipfile.ZipFile(os.path.join(directory, self.ARCHIVE_NAME), "w")
+ abs_src = os.path.abspath(directory)
+ for root, dirs, files in os.walk(directory):
+ for filename in files:
+ # Avoid zipping previous archive and hash file and binary pyc files
+ if not self.is_ignored(filename):
+ absname = os.path.abspath(os.path.join(root, filename))
+ arcname = absname[len(abs_src) + 1:]
+ self.dbg_out('zipping %s as %s' % (os.path.join(root, filename),
+ arcname))
+ zf.write(absname, arcname)
+ zf.close()
+ except Exception, err:
+ raise KeeperException("Can not create zip archive of "
+ "directory {0} : {1}".format(directory, str(err)))
+
+
+ def is_ignored(self, filename):
+ """
+ returns True if filename is ignored when calculating hashing or archiving
+ """
+ return filename in [self.HASH_SUM_FILE, self.ARCHIVE_NAME] or \
+ filename.endswith(self.PYC_EXT)
+
+
+ def dbg_out(self, text):
+ if self.DEBUG:
+ sys.stderr.write("{0}\n".format(text))
+ if not self.DEBUG and self.verbose:
+ print text
+
+
+ def is_active_stack(self, xmlfile):
+ try:
+ xmldoc = minidom.parse(xmlfile)
+ value = self.xpath_like_bycicle(xmldoc, ['metainfo', 'versions', 'active'])
+ return value.lower().strip() == 'true'
+ except Exception, err:
+ raise KeeperException("Can not parse XML file {0} : {1}",
+ xmlfile, str(err))
+
+
+ def xpath_like_bycicle(self, xml_doc, path):
+ # Default Python 2.6 distribution have no good XPATH support,
+ # implementing own bycicle here
+ cur_elem = xml_doc
+ for name in path:
+ elem = self.find_xml_element_by_name(cur_elem._get_childNodes(), name)
+ if name:
+ cur_elem = elem
+ else:
+ return None
+ # Select text in tags
+ value = cur_elem._get_childNodes()[0].nodeValue
+ if value:
+ return value.lower().strip()
+ else:
+ return None
+
+
+ def find_xml_element_by_name(self, elements, element_name):
+ for xml_element in elements:
+ if xml_element.nodeType == xml_element.ELEMENT_NODE and \
+ xml_element.nodeName == element_name:
+ return xml_element
+ return None
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/02f9c453/ambari-server/src/test/python/TestAmbariServer.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/TestAmbariServer.py b/ambari-server/src/test/python/TestAmbariServer.py
index 43494a7..3075391 100644
--- a/ambari-server/src/test/python/TestAmbariServer.py
+++ b/ambari-server/src/test/python/TestAmbariServer.py
@@ -29,6 +29,7 @@ import stat
import datetime
import operator
from pwd import getpwnam
+from ambari_server.resourceFilesKeeper import ResourceFilesKeeper, KeeperException
# We have to use this import HACK because the filename contains a dash
ambari_server = __import__('ambari-server')
@@ -2455,7 +2456,9 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
@patch.object(ambari_server, "find_jdbc_driver")
@patch("getpass.getuser")
@patch("os.chdir")
- def test_start(self, chdir_mock, getuser_mock, find_jdbc_driver_mock, is_root_mock, read_ambari_user_mock,
+ @patch.object(ResourceFilesKeeper, "perform_housekeeping")
+ def test_start(self, perform_housekeeping_mock, chdir_mock, getuser_mock,
+ find_jdbc_driver_mock, is_root_mock, read_ambari_user_mock,
parse_properties_file_mock, check_postgre_up_mock,
print_error_msg_mock, find_jdk_mock, search_file_mock,
print_info_msg_mock, popenMock, openMock, pexistsMock,
@@ -2556,6 +2559,18 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
# Ignored
pass
+ # Test exception handling on resource files housekeeping
+ perform_housekeeping_mock.reset_mock()
+ perform_housekeeping_mock.side_effect = KeeperException("some_reason")
+ try:
+ ambari_server.start(args)
+ self.fail("Should fail with exception")
+ except FatalException as e:
+ self.assertTrue('some_reason' in e.reason)
+ self.assertTrue(perform_housekeeping_mock.called)
+ perform_housekeeping_mock.side_effect = lambda *v, **kv : None
+ perform_housekeeping_mock.reset_mock()
+
self.assertFalse('Unable to start PostgreSQL server' in e.reason)
self.assertFalse(check_postgre_up_mock.called)
@@ -2585,6 +2600,8 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
self.assertTrue(popenMock.called)
popen_arg = popenMock.call_args[0][0]
self.assertTrue(popen_arg[0] == "/bin/sh")
+ self.assertTrue(perform_housekeeping_mock.called)
+ perform_housekeeping_mock.reset_mock()
popenMock.reset_mock()
parse_properties_file_mock.reset_mock()
@@ -2596,6 +2613,7 @@ MIIFHjCCAwYCCQDpHKOBI+Lt0zANBgkqhkiG9w0BAQUFADBRMQswCQYDVQQGEwJV
self.assertTrue(popenMock.called)
popen_arg = popenMock.call_args[0][0]
self.assertTrue(popen_arg[0] == "/bin/su")
+ self.assertTrue(perform_housekeeping_mock.called)
check_postgre_up_mock.reset_mock()
popenMock.reset_mock()