You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ch...@apache.org on 2015/12/01 13:05:55 UTC

stratos git commit: PCA - Validate agent.conf entries when loading Config, refactor handler methods for mb events, fix util methods, fix exception initialization

Repository: stratos
Updated Branches:
  refs/heads/stratos-4.1.x 64f05a519 -> a242b52be


PCA - Validate agent.conf entries when loading Config, refactor handler methods for mb events, fix util methods, fix exception initialization


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/a242b52b
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/a242b52b
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/a242b52b

Branch: refs/heads/stratos-4.1.x
Commit: a242b52be5452ba03a2234333c62810d3ea67cfb
Parents: 64f05a5
Author: Chamila de Alwis <ch...@apache.org>
Authored: Tue Dec 1 17:35:25 2015 +0530
Committer: Chamila de Alwis <ch...@apache.org>
Committed: Tue Dec 1 17:35:34 2015 +0530

----------------------------------------------------------------------
 .../cartridge.agent/cartridge.agent/agent.py    | 144 +++++++++++--------
 .../cartridge.agent/cartridge.agent/config.py   |  83 ++++++++++-
 .../cartridge.agent/exception.py                |  46 +++---
 .../cartridge.agent/healthstats.py              |  34 +----
 .../cartridge.agent/logpublisher.py             |   2 +-
 .../modules/artifactmgt/git/agentgithandler.py  |   4 +-
 .../modules/event/application/signup/events.py  |   3 -
 .../modules/event/instance/notifier/events.py   |   2 +-
 .../modules/event/tenant/events.py              |   2 +-
 .../modules/util/cartridgeagentutils.py         |  75 ++++++----
 .../cartridge.agent/publisher.py                |   4 +-
 11 files changed, 250 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
index 6b81dff..7f95f26 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
@@ -30,12 +30,10 @@ from subscriber import EventSubscriber
 class CartridgeAgent(object):
     def __init__(self):
         Config.initialize_config()
-        self.__tenant_context_initialized = False
-        self.__log_publish_manager = None
         self.__terminated = False
         self.__log = LogFactory().get_log(__name__)
 
-        mb_urls = Config.mb_urls
+        mb_urls = Config.mb_urls.split(",")
         mb_uname = Config.mb_username
         mb_pwd = Config.mb_password
 
@@ -87,8 +85,8 @@ class CartridgeAgent(object):
         # Execute start servers extension
         try:
             event_handler.start_server_extension()
-        except Exception as e:
-            self.__log.exception("Error processing start servers event: %s" % e)
+        except Exception as ex:
+            self.__log.exception("Error processing start servers event: %s" % ex)
 
         # check if artifact management is required before publishing instance activated event
         repo_url = Config.repo_url
@@ -107,21 +105,22 @@ class CartridgeAgent(object):
             event_handler.volume_mount_extension(persistence_mapping_payload)
 
         # start log publishing thread
+        log_publish_manager = None
         if DataPublisherConfiguration.get_instance().enabled:
             log_file_paths = Config.log_file_paths
             if log_file_paths is None:
                 self.__log.exception("No valid log file paths found, no logs will be published")
             else:
                 self.__log.debug("Starting Log Publisher Manager: [Log file paths] %s" % ", ".join(log_file_paths))
-                self.__log_publish_manager = LogPublisherManager(log_file_paths)
-                self.__log_publish_manager.start()
+                log_publish_manager = LogPublisherManager(log_file_paths)
+                log_publish_manager.start()
 
         # run until terminated
         while not self.__terminated:
             time.sleep(5)
 
         if DataPublisherConfiguration.get_instance().enabled:
-            self.__log_publish_manager.terminate_all_publishers()
+            log_publish_manager.terminate_all_publishers()
 
     def terminate(self):
         """
@@ -134,9 +133,10 @@ class CartridgeAgent(object):
     def register_instance_topic_listeners(self):
         self.__log.debug("Starting instance notifier event message receiver thread")
 
-        self.__inst_topic_subscriber.register_handler("ArtifactUpdatedEvent", self.on_artifact_updated)
-        self.__inst_topic_subscriber.register_handler("InstanceCleanupMemberEvent", self.on_instance_cleanup_member)
-        self.__inst_topic_subscriber.register_handler("InstanceCleanupClusterEvent", self.on_instance_cleanup_cluster)
+        self.__inst_topic_subscriber.register_handler("ArtifactUpdatedEvent", Handlers.on_artifact_updated)
+        self.__inst_topic_subscriber.register_handler("InstanceCleanupMemberEvent", Handlers.on_instance_cleanup_member)
+        self.__inst_topic_subscriber.register_handler(
+            "InstanceCleanupClusterEvent", Handlers.on_instance_cleanup_cluster)
 
         self.__inst_topic_subscriber.start()
         self.__log.info("Instance notifier event message receiver thread started")
@@ -148,13 +148,13 @@ class CartridgeAgent(object):
     def register_topology_event_listeners(self):
         self.__log.debug("Starting topology event message receiver thread")
 
-        self.__topology_event_subscriber.register_handler("MemberActivatedEvent", self.on_member_activated)
-        self.__topology_event_subscriber.register_handler("MemberTerminatedEvent", self.on_member_terminated)
-        self.__topology_event_subscriber.register_handler("MemberSuspendedEvent", self.on_member_suspended)
-        self.__topology_event_subscriber.register_handler("CompleteTopologyEvent", self.on_complete_topology)
-        self.__topology_event_subscriber.register_handler("MemberStartedEvent", self.on_member_started)
-        self.__topology_event_subscriber.register_handler("MemberCreatedEvent", self.on_member_created)
-        self.__topology_event_subscriber.register_handler("MemberInitializedEvent", self.on_member_initialized)
+        self.__topology_event_subscriber.register_handler("MemberActivatedEvent", Handlers.on_member_activated)
+        self.__topology_event_subscriber.register_handler("MemberTerminatedEvent", Handlers.on_member_terminated)
+        self.__topology_event_subscriber.register_handler("MemberSuspendedEvent", Handlers.on_member_suspended)
+        self.__topology_event_subscriber.register_handler("CompleteTopologyEvent", Handlers.on_complete_topology)
+        self.__topology_event_subscriber.register_handler("MemberStartedEvent", Handlers.on_member_started)
+        self.__topology_event_subscriber.register_handler("MemberCreatedEvent", Handlers.on_member_created)
+        self.__topology_event_subscriber.register_handler("MemberInitializedEvent", Handlers.on_member_initialized)
 
         self.__topology_event_subscriber.start()
         self.__log.info("Cartridge agent topology receiver thread started")
@@ -166,11 +166,11 @@ class CartridgeAgent(object):
     def register_tenant_event_listeners(self):
         self.__log.debug("Starting tenant event message receiver thread")
         self.__tenant_topic_subscriber.register_handler("DomainMappingAddedEvent",
-                                                        self.on_domain_mapping_added)
+                                                        Handlers.on_domain_mapping_added)
         self.__tenant_topic_subscriber.register_handler("DomainsMappingRemovedEvent",
-                                                        self.on_domain_mapping_removed)
-        self.__tenant_topic_subscriber.register_handler("CompleteTenantEvent", self.on_complete_tenant)
-        self.__tenant_topic_subscriber.register_handler("TenantSubscribedEvent", self.on_tenant_subscribed)
+                                                        Handlers.on_domain_mapping_removed)
+        self.__tenant_topic_subscriber.register_handler("CompleteTenantEvent", Handlers.on_complete_tenant)
+        self.__tenant_topic_subscriber.register_handler("TenantSubscribedEvent", Handlers.on_tenant_subscribed)
 
         self.__tenant_topic_subscriber.start()
         self.__log.info("Tenant event message receiver thread started")
@@ -182,7 +182,7 @@ class CartridgeAgent(object):
     def register_application_signup_event_listeners(self):
         self.__log.debug("Starting application signup event message receiver thread")
         self.__app_topic_subscriber.register_handler("ApplicationSignUpRemovedEvent",
-                                                     self.on_application_signup_removed)
+                                                     Handlers.on_application_signup_removed)
 
         self.__app_topic_subscriber.start()
         self.__log.info("Application signup event message receiver thread started")
@@ -191,18 +191,36 @@ class CartridgeAgent(object):
         while not self.__app_topic_subscriber.is_subscribed():
             time.sleep(1)
 
-    def on_artifact_updated(self, msg):
+    def wait_for_complete_topology(self):
+        while not TopologyContext.topology.initialized:
+            self.__log.info("Waiting for complete topology event...")
+            time.sleep(5)
+        self.__log.info("Complete topology event received")
+
+
+class Handlers(object):
+    """
+    Handler methods for message broker events
+    """
+
+    __log = LogFactory().get_log(__name__)
+    __tenant_context_initialized = False
+
+    @staticmethod
+    def on_artifact_updated(msg):
         event_obj = ArtifactUpdatedEvent.create_from_json(msg.payload)
         event_handler.on_artifact_updated_event(event_obj)
 
-    def on_instance_cleanup_member(self, msg):
+    @staticmethod
+    def on_instance_cleanup_member(msg):
         member_in_payload = Config.member_id
         event_obj = InstanceCleanupMemberEvent.create_from_json(msg.payload)
         member_in_event = event_obj.member_id
         if member_in_payload == member_in_event:
             event_handler.on_instance_cleanup_member_event()
 
-    def on_instance_cleanup_cluster(self, msg):
+    @staticmethod
+    def on_instance_cleanup_cluster(msg):
         event_obj = InstanceCleanupClusterEvent.create_from_json(msg.payload)
         cluster_in_payload = Config.cluster_id
         cluster_in_event = event_obj.cluster_id
@@ -212,11 +230,13 @@ class CartridgeAgent(object):
         if cluster_in_event == cluster_in_payload and instance_in_payload == instance_in_event:
             event_handler.on_instance_cleanup_cluster_event()
 
-    def on_member_created(self, msg):
-        self.__log.debug("Member created event received: %r" % msg.payload)
+    @staticmethod
+    def on_member_created(msg):
+        Handlers.__log.debug("Member created event received: %r" % msg.payload)
 
-    def on_member_initialized(self, msg):
-        self.__log.debug("Member initialized event received: %r" % msg.payload)
+    @staticmethod
+    def on_member_initialized(msg):
+        Handlers.__log.debug("Member initialized event received: %r" % msg.payload)
         event_obj = MemberInitializedEvent.create_from_json(msg.payload)
 
         if not TopologyContext.topology.initialized:
@@ -224,84 +244,88 @@ class CartridgeAgent(object):
 
         event_handler.on_member_initialized_event(event_obj)
 
-    def on_member_activated(self, msg):
-        self.__log.debug("Member activated event received: %r" % msg.payload)
+    @staticmethod
+    def on_member_activated(msg):
+        Handlers.__log.debug("Member activated event received: %r" % msg.payload)
         if not TopologyContext.topology.initialized:
             return
 
         event_obj = MemberActivatedEvent.create_from_json(msg.payload)
         event_handler.on_member_activated_event(event_obj)
 
-    def on_member_terminated(self, msg):
-        self.__log.debug("Member terminated event received: %r" % msg.payload)
+    @staticmethod
+    def on_member_terminated(msg):
+        Handlers.__log.debug("Member terminated event received: %r" % msg.payload)
         if not TopologyContext.topology.initialized:
             return
 
         event_obj = MemberTerminatedEvent.create_from_json(msg.payload)
         event_handler.on_member_terminated_event(event_obj)
 
-    def on_member_suspended(self, msg):
-        self.__log.debug("Member suspended event received: %r" % msg.payload)
+    @staticmethod
+    def on_member_suspended(msg):
+        Handlers.__log.debug("Member suspended event received: %r" % msg.payload)
         if not TopologyContext.topology.initialized:
             return
 
         event_obj = MemberSuspendedEvent.create_from_json(msg.payload)
         event_handler.on_member_suspended_event(event_obj)
 
-    def on_complete_topology(self, msg):
+    @staticmethod
+    def on_complete_topology(msg):
         event_obj = CompleteTopologyEvent.create_from_json(msg.payload)
         TopologyContext.update(event_obj.topology)
         if not TopologyContext.topology.initialized:
-            self.__log.info("Topology initialized from complete topology event")
+            Handlers.__log.info("Topology initialized from complete topology event")
             TopologyContext.topology.initialized = True
             event_handler.on_complete_topology_event(event_obj)
 
-        self.__log.debug("Topology context updated with [topology] %r" % event_obj.topology.json_str)
+        Handlers.__log.debug("Topology context updated with [topology] %r" % event_obj.topology.json_str)
 
-    def on_member_started(self, msg):
-        self.__log.debug("Member started event received: %r" % msg.payload)
+    @staticmethod
+    def on_member_started(msg):
+        Handlers.__log.debug("Member started event received: %r" % msg.payload)
         if not TopologyContext.topology.initialized:
             return
 
         event_obj = MemberStartedEvent.create_from_json(msg.payload)
         event_handler.on_member_started_event(event_obj)
 
-    def on_domain_mapping_added(self, msg):
-        self.__log.debug("Subscription domain added event received : %r" % msg.payload)
+    @staticmethod
+    def on_domain_mapping_added(msg):
+        Handlers.__log.debug("Subscription domain added event received : %r" % msg.payload)
         event_obj = DomainMappingAddedEvent.create_from_json(msg.payload)
         event_handler.on_domain_mapping_added_event(event_obj)
 
-    def on_domain_mapping_removed(self, msg):
-        self.__log.debug("Subscription domain removed event received : %r" % msg.payload)
+    @staticmethod
+    def on_domain_mapping_removed(msg):
+        Handlers.__log.debug("Subscription domain removed event received : %r" % msg.payload)
         event_obj = DomainMappingRemovedEvent.create_from_json(msg.payload)
         event_handler.on_domain_mapping_removed_event(event_obj)
 
-    def on_complete_tenant(self, msg):
+    @staticmethod
+    def on_complete_tenant(msg):
         event_obj = CompleteTenantEvent.create_from_json(msg.payload)
         TenantContext.update(event_obj.tenants)
-        if not self.__tenant_context_initialized:
-            self.__log.info("Tenant context initialized from complete tenant event")
-            self.__tenant_context_initialized = True
+        if not Handlers.__tenant_context_initialized:
+            Handlers.__log.info("Tenant context initialized from complete tenant event")
+            Handlers.__tenant_context_initialized = True
             event_handler.on_complete_tenant_event(event_obj)
 
-        self.__log.debug("Tenant context updated with [tenant list] %r" % event_obj.tenant_list_json)
+        Handlers.__log.debug("Tenant context updated with [tenant list] %r" % event_obj.tenant_list_json)
 
-    def on_tenant_subscribed(self, msg):
-        self.__log.debug("Tenant subscribed event received: %r" % msg.payload)
+    @staticmethod
+    def on_tenant_subscribed(msg):
+        Handlers.__log.debug("Tenant subscribed event received: %r" % msg.payload)
         event_obj = TenantSubscribedEvent.create_from_json(msg.payload)
         event_handler.on_tenant_subscribed_event(event_obj)
 
-    def on_application_signup_removed(self, msg):
-        self.__log.debug("Application signup removed event received: %r" % msg.payload)
+    @staticmethod
+    def on_application_signup_removed(msg):
+        Handlers.__log.debug("Application signup removed event received: %r" % msg.payload)
         event_obj = ApplicationSignUpRemovedEvent.create_from_json(msg.payload)
         event_handler.on_application_signup_removed_event(event_obj)
 
-    def wait_for_complete_topology(self):
-        while not TopologyContext.topology.initialized:
-            self.__log.info("Waiting for complete topology event...")
-            time.sleep(5)
-        self.__log.info("Complete topology event received")
-
 
 if __name__ == "__main__":
     log = LogFactory().get_log(__name__)

http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
index f1a70ec..72fc5e2 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
@@ -22,7 +22,7 @@ import sys
 from yapsy.PluginManager import PluginManager
 
 from modules.util.log import LogFactory
-from exception import ParameterNotFoundException
+from exception import ParameterNotFoundException, InvalidConfigValueException
 import constants
 from plugins.contracts import ICartridgeAgentPlugin, IArtifactCommitPlugin, IArtifactCheckoutPlugin, \
     IHealthStatReaderPlugin
@@ -148,6 +148,18 @@ class Config:
     """ :type : str """
     mb_publisher_timeout = None
     """ :type : int """
+    cep_username = None
+    """ :type : str """
+    cep_password = None
+    """ :type : str """
+    cep_urls = []
+    """ :type : list """
+    artifact_clone_retry_count = None
+    """ :type : str """
+    artifact_clone_retry_interval = None
+    """ :type : str """
+    port_check_timeout = None
+    """ :type : str """
 
     @staticmethod
     def read_conf_file():
@@ -351,8 +363,29 @@ class Config:
 
             Config.mb_username = Config.read_property(constants.MB_USERNAME, False)
             Config.mb_password = Config.read_property(constants.MB_PASSWORD, False)
-            Config.mb_urls = Config.read_property(constants.MB_URLS).split(",")
+            Config.mb_urls = Config.read_property(constants.MB_URLS)
             Config.mb_publisher_timeout = int(Config.read_property(constants.MB_PUBLISHER_TIMEOUT))
+
+            Config.cep_username = Config.read_property(constants.CEP_SERVER_ADMIN_USERNAME)
+            Config.cep_password = Config.read_property(constants.CEP_SERVER_ADMIN_PASSWORD)
+            Config.cep_urls = Config.read_property(constants.CEP_RECEIVER_URLS)
+
+            try:
+                Config.artifact_clone_retry_count = Config.read_property(constants.ARTIFACT_CLONE_RETRIES)
+            except ParameterNotFoundException:
+                Config.artifact_clone_retry_count = "5"
+
+            try:
+                Config.artifact_clone_retry_interval = Config.read_property(constants.ARTIFACT_CLONE_INTERVAL)
+            except ParameterNotFoundException:
+                Config.artifact_clone_retry_interval = "10"
+
+            try:
+                Config.port_check_timeout = Config.read_property(constants.PORT_CHECK_TIMEOUT)
+            except ParameterNotFoundException:
+                Config.port_check_timeout = "600000"
+
+            Config.validate_config()
         except ParameterNotFoundException as ex:
             raise RuntimeError(ex)
 
@@ -381,6 +414,52 @@ class Config:
         Config.extension_executor = Config.initialize_extensions()
 
     @staticmethod
+    def validate_config():
+        try:
+            Config.validate_url_list(Config.mb_urls, constants.MB_URLS)
+            Config.validate_int(Config.mb_publisher_timeout, constants.MB_PUBLISHER_TIMEOUT)
+            Config.validate_url_list(Config.cep_urls, constants.CEP_RECEIVER_URLS)
+            Config.validate_int(Config.artifact_update_interval, constants.ARTIFACT_UPDATE_INTERVAL)
+            Config.validate_int(Config.artifact_clone_retry_count, constants.ARTIFACT_CLONE_RETRIES)
+            Config.validate_int(Config.artifact_clone_retry_interval, constants.ARTIFACT_CLONE_INTERVAL)
+            Config.validate_int(Config.port_check_timeout, constants.PORT_CHECK_TIMEOUT)
+        except ValueError as err:
+            raise InvalidConfigValueException("Invalid configuration for Cartridge Agent", err)
+
+    @staticmethod
+    def validate_url_list(urls, field_name):
+        """
+        host1:port1,host2:port2
+
+        :param urls:
+        :param field_name:
+        :return:
+        """
+        url_list = str(urls).split(",")
+        if len(url_list) < 1:
+            raise ValueError("Invalid value [field] \"%s\"" % field_name)
+
+        for single_url in url_list:
+            try:
+                url_ip, url_port = single_url.split(":")
+            except ValueError:
+                raise ValueError("Invalid host or port number value for [field] %s", field_name)
+
+    @staticmethod
+    def validate_int(int_value, field_name):
+        """
+        valid integer value
+
+        :param int_value:
+        :param field_name:
+        :return:
+        """
+        try:
+            int(int_value)
+        except ValueError:
+            raise ValueError("Invalid int value for [field] %s " % field_name)
+
+    @staticmethod
     def initialize_plugins():
         """ Find, load, activate and group plugins for Python CA
         :return: a tuple of (PluginManager, plugins, artifact management plugins)

http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
index 1586c4d..345efee 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
@@ -21,17 +21,8 @@ class CartridgeAgentException(Exception):
     Exception super class to be used by specific exceptions thrown in the cartridge agent
     """
 
-    def __init__(self, message):
-        super(CartridgeAgentException, self).__init__(message)
-        self.__message = message
-
-    def get_message(self):
-        """
-        The message provided when the exception is raised
-        :return: message
-        :rtype: str
-        """
-        return self.__message
+    def __init__(self, *args, **kwargs):
+        Exception.__init__(self, *args, **kwargs)
 
 
 class DataPublisherException(CartridgeAgentException):
@@ -39,8 +30,8 @@ class DataPublisherException(CartridgeAgentException):
     Exception to be used during log publishing operations
     """
 
-    def __init__(self, message):
-        super(DataPublisherException, self).__init__(message)
+    def __init__(self, *args, **kwargs):
+        CartridgeAgentException.__init__(self, *args, **kwargs)
 
 
 class PluginExecutionException(CartridgeAgentException):
@@ -48,8 +39,8 @@ class PluginExecutionException(CartridgeAgentException):
     Exception raised when a runtime error is met while executing an plugin script
     """
 
-    def __init__(self, message):
-        super(PluginExecutionException, self).__init__(message)
+    def __init__(self, *args, **kwargs):
+        CartridgeAgentException.__init__(self, *args, **kwargs)
 
 
 class GitRepositorySynchronizationException(CartridgeAgentException):
@@ -57,8 +48,8 @@ class GitRepositorySynchronizationException(CartridgeAgentException):
     Exception raised during a git repository related task
     """
 
-    def __init__(self, message):
-        super(GitRepositorySynchronizationException, self).__init__(message)
+    def __init__(self, *args, **kwargs):
+        CartridgeAgentException.__init__(self, *args, **kwargs)
 
 
 class ParameterNotFoundException(CartridgeAgentException):
@@ -67,8 +58,8 @@ class ParameterNotFoundException(CartridgeAgentException):
     of the cartridge agent
     """
 
-    def __init__(self, message):
-        super(ParameterNotFoundException, self).__init__(message)
+    def __init__(self, *args, **kwargs):
+        CartridgeAgentException.__init__(self, *args, **kwargs)
 
 
 class ThriftReceiverOfflineException(CartridgeAgentException):
@@ -76,8 +67,8 @@ class ThriftReceiverOfflineException(CartridgeAgentException):
     Exception raised when the connection to the Thrift receiver is dropped when publishing events
     """
 
-    def __init__(self, message):
-        super(ThriftReceiverOfflineException, self).__init__(message)
+    def __init__(self, *args, **kwargs):
+        CartridgeAgentException.__init__(self, *args, **kwargs)
 
 
 class CEPPublisherException(CartridgeAgentException):
@@ -85,5 +76,14 @@ class CEPPublisherException(CartridgeAgentException):
     Exception to be used during CEP publishing operations
     """
 
-    def __init__(self, message):
-        super(CEPPublisherException, self).__init__(message)
+    def __init__(self, *args, **kwargs):
+        CartridgeAgentException.__init__(self, *args, **kwargs)
+
+
+class InvalidConfigValueException(CartridgeAgentException):
+    """
+    Exception to be used when validating agent configuration
+    """
+
+    def __init__(self, *args, **kwargs):
+        CartridgeAgentException.__init__(self, *args, **kwargs)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
index 71f2894..2005537 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
@@ -62,7 +62,7 @@ class HealthStatisticsPublisherManager(Thread):
 
                 self.log.debug("Publishing load average: %r" % cartridge_stats.load_avg)
                 self.publisher.publish_load_average(cartridge_stats.load_avg)
-            except Exception as e:
+            except Exception:
                 self.log.exception(
                     "Couldn't publish health statistics to CEP. Thrift Receiver offline. Reconnecting...")
                 self.publisher = HealthStatisticsPublisher()
@@ -76,33 +76,17 @@ class HealthStatisticsPublisher:
     """
     log = LogFactory().get_log(__name__)
 
-    @staticmethod
-    def read_config(conf_key):
-        """
-        Read a given key from the cartridge agent configuration
-        :param conf_key: The key to look for in the CA config
-        :return: The value for the key from the CA config
-        :raise: RuntimeError if the given key is not found in the CA config
-        """
-        conf_value = Config.read_property(conf_key, False)
-
-        if conf_value is None or conf_value.strip() == "":
-            raise RuntimeError("System property not found: " + conf_key)
-
-        return conf_value
-
     def __init__(self):
 
         self.publishers = []
         self.deactive_publishers = []
-        self.cep_admin_username = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_USERNAME)
-        self.cep_admin_password = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_PASSWORD)
+        self.cep_admin_username = Config.cep_username
+        self.cep_admin_password = Config.cep_password
         self.stream_definition = HealthStatisticsPublisher.create_stream_definition()
         HealthStatisticsPublisher.log.debug("Stream definition created: %r" % str(self.stream_definition))
 
         # 1.1.1.1:1883,2.2.2.2:1883
-        cep_urls = HealthStatisticsPublisher.read_config(constants.CEP_RECEIVER_URLS)
-        cep_urls = cep_urls.split(',')
+        cep_urls = Config.cep_urls.split(',')
         for cep_url in cep_urls:
 
             cep_active = self.is_cep_active(cep_url)
@@ -207,14 +191,10 @@ class HealthStatisticsPublisher:
         return true if active
         :param cep_url:
         """
-        ports = []
         cep_ip = cep_url.split(':')[0]
         cep_port = cep_url.split(':')[1]
-        ports.append(cep_port)
 
-        cep_active = cartridgeagentutils.check_ports_active(
-            cep_ip,
-            ports)
+        cep_active = cartridgeagentutils.check_port_active(cep_ip, cep_port)
 
         return cep_active
 
@@ -229,8 +209,8 @@ class HealthStatisticsPublisher:
             except Exception as ex:
                 raise ThriftReceiverOfflineException(ex)
 
-        deactive_ceps = self.deactive_publishers
-        for cep_url in deactive_ceps:
+        inactive_ceps = self.deactive_publishers
+        for cep_url in inactive_ceps:
             cep_active = self.is_cep_active(cep_url)
             if cep_active:
                 self.add_publishers(cep_url)

http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
index e7ab1c7..b9f5d63 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
@@ -145,7 +145,7 @@ class LogPublisherManager(Thread):
         ports_active = cartridgeagentutils.wait_until_ports_active(
             DataPublisherConfiguration.get_instance().monitoring_server_ip,
             self.ports,
-            int(Config.read_property("port.check.timeout", critical=False)))
+            int(Config.port_check_timeout))
 
         if not ports_active:
             self.log.debug("Monitoring server is not active")

http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
index 2170a33..15d3733 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
@@ -168,8 +168,8 @@ class AgentGitHandler:
         """
         git_clone_successful = False
         # Read properties from agent.conf
-        max_retry_attempts = int(Config.read_property(constants.ARTIFACT_CLONE_RETRIES, 5))
-        retry_interval = int(Config.read_property(constants.ARTIFACT_CLONE_INTERVAL, 10))
+        max_retry_attempts = int(Config.artifact_clone_retry_count)
+        retry_interval = int(Config.artifact_clone_retry_interval)
         retry_attempts = 0
 
         # Iterate until git clone is successful or reaches max retry attempts

http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py
index f44dd04..13bb680 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py
@@ -27,7 +27,6 @@ class ApplicationSignUpAddedEvent:
         self.clusterIds = None
         """ :type : list[str]  """
 
-
     @staticmethod
     def create_from_json(json_str):
         json_obj = json.loads(json_str)
@@ -49,7 +48,6 @@ class ApplicationSignUpRemovedEvent:
         self.clusterIds = None
         """ :type : list[str]  """
 
-
     @staticmethod
     def create_from_json(json_str):
         json_obj = json.loads(json_str)
@@ -59,4 +57,3 @@ class ApplicationSignUpRemovedEvent:
         app_signup_removed.tenantId = json_obj["tenantId"] if "tenantId" in json_obj else None
 
         return app_signup_removed
-

http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py
index 41dc133..d19cd0c 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py
@@ -79,4 +79,4 @@ class InstanceCleanupMemberEvent:
         json_obj = json.loads(json_str)
         m_id = json_obj["memberId"] if "memberId" in json_obj else None
 
-        return InstanceCleanupMemberEvent(m_id)
\ No newline at end of file
+        return InstanceCleanupMemberEvent(m_id)

http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py
index 2e287bd..8a49bc2 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py
@@ -84,4 +84,4 @@ class TenantUnsubscribedEvent:
         instance.service_name = json_obj["serviceName"] if "serviceName" in json_obj else None
         instance.cluster_ids = json_obj["clusterIds"] if "clusterIds" in json_obj else None
 
-        return instance
\ No newline at end of file
+        return instance

http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
index 79bc6c5..e5ad877 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
@@ -24,12 +24,18 @@ import hashlib
 
 from log import LogFactory
 
+BS = 16
+
 log = LogFactory().get_log(__name__)
 
-unpad = lambda s: s[0:-ord(s[-1])]
-current_milli_time = lambda: int(round(time.time() * 1000))
-BS = 16
-pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
+
+def unpad(s): return s[0:-ord(s[-1])]
+
+
+def current_milli_time(): return int(round(time.time() * 1000))
+
+
+def pad(s): return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
 
 
 def decrypt_password(pass_str, secret):
@@ -77,11 +83,26 @@ def wait_until_ports_active(ip_address, ports, ports_check_timeout=600000):
 
     log.debug("Port check timeout: %s" % ports_check_timeout)
 
-    active = False
+    ports_left = ports
     start_time = current_milli_time()
-    while not active:
+
+    # check ports until all are active or timeout exceeds
+    while True:
         log.info("Waiting for ports to be active: [ip] %s [ports] %s" % (ip_address, ports))
-        active = check_ports_active(ip_address, ports)
+
+        # check each port for activity
+        for checking_port in list(ports_left):
+            port_active = check_port_active(ip_address, checking_port)
+            if port_active:
+                log.debug("Port %s on host %s active" % (checking_port, ip_address))
+                ports_left.remove(checking_port)
+
+        # if no ports are left to check for activity, return
+        if len(ports_left) == 0:
+            log.info("Ports activated: [ip] %r [ports] %r" % (ip_address, ports))
+            return True
+
+        # active = check_ports_active(ip_address, ports)
         end_time = current_milli_time()
         duration = end_time - start_time
 
@@ -92,33 +113,33 @@ def wait_until_ports_active(ip_address, ports, ports_check_timeout=600000):
 
         time.sleep(5)
 
-    log.info("Ports activated: [ip] %r [ports] %r" % (ip_address, ports))
-    return True
-
 
-def check_ports_active(ip_address, ports):
+def check_port_active(ip_address, port):
     """
-    Checks the given list of port addresses for active state
+    Checks the given port on the given host for activity
     :param str ip_address: Ip address of the member to be checked
-    :param list[str] ports: The list of ports to be checked
+    :param str port: The port to be checked
     :return: True if the ports are active, False if at least one is not active
     :rtype: bool
     """
-    if len(ports) < 1:
-        raise RuntimeError("No ports found")
-
-    for port in ports:
-        s = socket.socket()
-        s.settimeout(5)
-        try:
-            s.connect((ip_address, int(port)))
-            log.debug("Port %s is active" % port)
-            s.close()
-        except socket.error:
-            log.debug("Port %s is not active" % port)
-            return False
+    if port is None:
+        raise RuntimeError("Cannot check invalid port for activity")
+
+    try:
+        port_int = int(port)
+    except ValueError:
+        raise RuntimeError("Cannot check invalid port for activity %s" % port)
 
-    return True
+    s = socket.socket()
+    s.settimeout(5)
+    try:
+        s.connect((ip_address, port_int))
+        log.debug("Port %s is active" % port)
+        s.close()
+        return True
+    except socket.error:
+        log.debug("Port %s is not active" % port)
+        return False
 
 
 class IncrementalCeilingListIterator(object):

http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
index 5b6190e..229c354 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
@@ -71,7 +71,7 @@ def publish_instance_activated_event():
         ports_active = cartridgeagentutils.wait_until_ports_active(
             listen_address,
             configuration_ports,
-            int(Config.read_property("port.check.timeout", critical=False)))
+            int(Config.port_check_timeout))
 
         if ports_active:
             log.info("Publishing instance activated event...")
@@ -226,7 +226,7 @@ class EventPublisher(object):
         while int(time.time()) - self.__start_time < (Config.mb_publisher_timeout * 1000):
             retry_interval = retry_iterator.get_next_retry_interval()
 
-            for mb_url in Config.mb_urls:
+            for mb_url in Config.mb_urls.split(","):
                 mb_ip, mb_port = mb_url.split(":")
 
                 # start a thread to execute publish event