You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ch...@apache.org on 2015/11/30 14:44:19 UTC

[2/5] stratos git commit: PCA - Publish MB events using a thread and timeout after 5 seconds. Improved PCA structure and removed unnecessary threading PCA Live Test - Improved logging, improved MB HA test case

PCA - Publish MB events using a thread and timeout after 5 seconds. Improved PCA structure and removed unnecessary threading
PCA Live Test - Improved logging, improved MB HA test case


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d1912180
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d1912180
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d1912180

Branch: refs/heads/stratos-4.1.x
Commit: d19121809154d42998ddc0c85722600f03f7225f
Parents: e722ff3
Author: Chamila de Alwis <ch...@apache.org>
Authored: Fri Nov 27 14:36:42 2015 +0530
Committer: Chamila de Alwis <ch...@apache.org>
Committed: Mon Nov 30 19:13:55 2015 +0530

----------------------------------------------------------------------
 .../cartridge.agent/cartridge.agent/agent.py    |   62 +-
 .../modules/event/eventhandler.py               | 1181 +++++++++---------
 .../cartridge.agent/publisher.py                |   72 +-
 .../cartridge.agent/subscriber.py               |    8 +-
 .../integration/tests/ADCExtensionTestCase.java |    5 +
 .../tests/ADCMTAppTenantUserTestCase.java       |    5 +
 .../integration/tests/ADCMTAppTestCase.java     |    5 +
 .../agent/integration/tests/ADCTestCase.java    |    5 +
 .../integration/tests/AgentStartupTestCase.java |    5 +
 .../integration/tests/CEPHAModeTestCase.java    |    5 +
 .../tests/MessageBrokerHATestCase.java          |   30 +-
 .../tests/PythonAgentIntegrationTest.java       |   14 +-
 .../MessageBrokerHATestCase/agent.conf          |    2 +-
 .../src/test/resources/log4j.properties         |    2 +-
 .../src/test/resources/test-suite-all.xml       |    1 +
 .../src/test/resources/test-suite-smoke.xml     |    2 +
 16 files changed, 734 insertions(+), 670 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
index 959568b..6b81dff 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
@@ -16,22 +16,19 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import threading
-
 import publisher
 from logpublisher import *
 from modules.event.application.signup.events import *
 from modules.event.domain.mapping.events import *
-from modules.event.eventhandler import EventHandler
+import modules.event.eventhandler as event_handler
 from modules.event.instance.notifier.events import *
 from modules.event.tenant.events import *
 from modules.event.topology.events import *
 from subscriber import EventSubscriber
 
 
-class CartridgeAgent(threading.Thread):
+class CartridgeAgent(object):
     def __init__(self):
-        threading.Thread.__init__(self)
         Config.initialize_config()
         self.__tenant_context_initialized = False
         self.__log_publish_manager = None
@@ -47,9 +44,7 @@ class CartridgeAgent(threading.Thread):
         self.__app_topic_subscriber = EventSubscriber(constants.APPLICATION_SIGNUP, mb_urls, mb_uname, mb_pwd)
         self.__topology_event_subscriber = EventSubscriber(constants.TOPOLOGY_TOPIC, mb_urls, mb_uname, mb_pwd)
 
-        self.__event_handler = EventHandler()
-
-    def run(self):
+    def run_agent(self):
         self.__log.info("Starting Cartridge Agent...")
 
         # Start topology event receiver thread
@@ -58,7 +53,7 @@ class CartridgeAgent(threading.Thread):
         if Config.lvs_virtual_ip is None or str(Config.lvs_virtual_ip).strip() == "":
             self.__log.debug("LVS Virtual IP is not defined")
         else:
-            self.__event_handler.create_dummy_interface()
+            event_handler.create_dummy_interface()
 
         # request complete topology event from CC by publishing CompleteTopologyRequestEvent
         publisher.publish_complete_topology_request_event()
@@ -84,14 +79,14 @@ class CartridgeAgent(threading.Thread):
         publisher.publish_complete_tenant_request_event()
 
         # Execute instance started shell script
-        self.__event_handler.on_instance_started_event()
+        event_handler.on_instance_started_event()
 
         # Publish instance started event
         publisher.publish_instance_started_event()
 
         # Execute start servers extension
         try:
-            self.__event_handler.start_server_extension()
+            event_handler.start_server_extension()
         except Exception as e:
             self.__log.exception("Error processing start servers event: %s" % e)
 
@@ -100,7 +95,7 @@ class CartridgeAgent(threading.Thread):
         if repo_url is None or str(repo_url).strip() == "":
             self.__log.info("No artifact repository found")
             publisher.publish_instance_activated_event()
-            self.__event_handler.on_instance_activated_event()
+            event_handler.on_instance_activated_event()
         else:
             # instance activated event will be published in artifact updated event handler
             self.__log.info(
@@ -109,7 +104,7 @@ class CartridgeAgent(threading.Thread):
 
         persistence_mapping_payload = Config.persistence_mappings
         if persistence_mapping_payload is not None:
-            self.__event_handler.volume_mount_extension(persistence_mapping_payload)
+            event_handler.volume_mount_extension(persistence_mapping_payload)
 
         # start log publishing thread
         if DataPublisherConfiguration.get_instance().enabled:
@@ -198,14 +193,14 @@ class CartridgeAgent(threading.Thread):
 
     def on_artifact_updated(self, msg):
         event_obj = ArtifactUpdatedEvent.create_from_json(msg.payload)
-        self.__event_handler.on_artifact_updated_event(event_obj)
+        event_handler.on_artifact_updated_event(event_obj)
 
     def on_instance_cleanup_member(self, msg):
         member_in_payload = Config.member_id
         event_obj = InstanceCleanupMemberEvent.create_from_json(msg.payload)
         member_in_event = event_obj.member_id
         if member_in_payload == member_in_event:
-            self.__event_handler.on_instance_cleanup_member_event()
+            event_handler.on_instance_cleanup_member_event()
 
     def on_instance_cleanup_cluster(self, msg):
         event_obj = InstanceCleanupClusterEvent.create_from_json(msg.payload)
@@ -215,7 +210,7 @@ class CartridgeAgent(threading.Thread):
         instance_in_event = event_obj.cluster_instance_id
 
         if cluster_in_event == cluster_in_payload and instance_in_payload == instance_in_event:
-            self.__event_handler.on_instance_cleanup_cluster_event()
+            event_handler.on_instance_cleanup_cluster_event()
 
     def on_member_created(self, msg):
         self.__log.debug("Member created event received: %r" % msg.payload)
@@ -227,7 +222,7 @@ class CartridgeAgent(threading.Thread):
         if not TopologyContext.topology.initialized:
             return
 
-        self.__event_handler.on_member_initialized_event(event_obj)
+        event_handler.on_member_initialized_event(event_obj)
 
     def on_member_activated(self, msg):
         self.__log.debug("Member activated event received: %r" % msg.payload)
@@ -235,7 +230,7 @@ class CartridgeAgent(threading.Thread):
             return
 
         event_obj = MemberActivatedEvent.create_from_json(msg.payload)
-        self.__event_handler.on_member_activated_event(event_obj)
+        event_handler.on_member_activated_event(event_obj)
 
     def on_member_terminated(self, msg):
         self.__log.debug("Member terminated event received: %r" % msg.payload)
@@ -243,7 +238,7 @@ class CartridgeAgent(threading.Thread):
             return
 
         event_obj = MemberTerminatedEvent.create_from_json(msg.payload)
-        self.__event_handler.on_member_terminated_event(event_obj)
+        event_handler.on_member_terminated_event(event_obj)
 
     def on_member_suspended(self, msg):
         self.__log.debug("Member suspended event received: %r" % msg.payload)
@@ -251,7 +246,7 @@ class CartridgeAgent(threading.Thread):
             return
 
         event_obj = MemberSuspendedEvent.create_from_json(msg.payload)
-        self.__event_handler.on_member_suspended_event(event_obj)
+        event_handler.on_member_suspended_event(event_obj)
 
     def on_complete_topology(self, msg):
         event_obj = CompleteTopologyEvent.create_from_json(msg.payload)
@@ -259,7 +254,7 @@ class CartridgeAgent(threading.Thread):
         if not TopologyContext.topology.initialized:
             self.__log.info("Topology initialized from complete topology event")
             TopologyContext.topology.initialized = True
-            self.__event_handler.on_complete_topology_event(event_obj)
+            event_handler.on_complete_topology_event(event_obj)
 
         self.__log.debug("Topology context updated with [topology] %r" % event_obj.topology.json_str)
 
@@ -269,17 +264,17 @@ class CartridgeAgent(threading.Thread):
             return
 
         event_obj = MemberStartedEvent.create_from_json(msg.payload)
-        self.__event_handler.on_member_started_event(event_obj)
+        event_handler.on_member_started_event(event_obj)
 
     def on_domain_mapping_added(self, msg):
         self.__log.debug("Subscription domain added event received : %r" % msg.payload)
         event_obj = DomainMappingAddedEvent.create_from_json(msg.payload)
-        self.__event_handler.on_domain_mapping_added_event(event_obj)
+        event_handler.on_domain_mapping_added_event(event_obj)
 
     def on_domain_mapping_removed(self, msg):
         self.__log.debug("Subscription domain removed event received : %r" % msg.payload)
         event_obj = DomainMappingRemovedEvent.create_from_json(msg.payload)
-        self.__event_handler.on_domain_mapping_removed_event(event_obj)
+        event_handler.on_domain_mapping_removed_event(event_obj)
 
     def on_complete_tenant(self, msg):
         event_obj = CompleteTenantEvent.create_from_json(msg.payload)
@@ -287,19 +282,19 @@ class CartridgeAgent(threading.Thread):
         if not self.__tenant_context_initialized:
             self.__log.info("Tenant context initialized from complete tenant event")
             self.__tenant_context_initialized = True
-            self.__event_handler.on_complete_tenant_event(event_obj)
+            event_handler.on_complete_tenant_event(event_obj)
 
         self.__log.debug("Tenant context updated with [tenant list] %r" % event_obj.tenant_list_json)
 
     def on_tenant_subscribed(self, msg):
         self.__log.debug("Tenant subscribed event received: %r" % msg.payload)
         event_obj = TenantSubscribedEvent.create_from_json(msg.payload)
-        self.__event_handler.on_tenant_subscribed_event(event_obj)
+        event_handler.on_tenant_subscribed_event(event_obj)
 
     def on_application_signup_removed(self, msg):
         self.__log.debug("Application signup removed event received: %r" % msg.payload)
         event_obj = ApplicationSignUpRemovedEvent.create_from_json(msg.payload)
-        self.__event_handler.on_application_signup_removed_event(event_obj)
+        event_handler.on_application_signup_removed_event(event_obj)
 
     def wait_for_complete_topology(self):
         while not TopologyContext.topology.initialized:
@@ -308,17 +303,12 @@ class CartridgeAgent(threading.Thread):
         self.__log.info("Complete topology event received")
 
 
-def main():
-    cartridge_agent = CartridgeAgent()
+if __name__ == "__main__":
     log = LogFactory().get_log(__name__)
-
     try:
         log.info("Starting Stratos cartridge agent...")
-        cartridge_agent.start()
+        cartridge_agent = CartridgeAgent()
+        cartridge_agent.run_agent()
     except Exception as e:
         log.exception("Cartridge Agent Exception: %r" % e)
-        cartridge_agent.terminate()
-
-
-if __name__ == "__main__":
-    main()
+        # cartridge_agent.terminate()

http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
index 6e2aa4f..f8b0c2b 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
@@ -21,8 +21,6 @@ from threading import Thread
 import publisher
 from entity import *
 
-import constants
-from config import Config
 from ..artifactmgt.git.agentgithandler import *
 from ..artifactmgt.repository import Repository
 from ..util import cartridgeagentutils
@@ -31,485 +29,492 @@ from ..util.log import LogFactory
 SUPER_TENANT_ID = "-1234"
 SUPER_TENANT_REPO_PATH = "/repository/deployment/server/"
 TENANT_REPO_PATH = "/repository/tenants/"
+log = LogFactory().get_log(__name__)
+
+"""
+Event execution related logic
+"""
+
+
+def on_instance_started_event():
+    log.debug("Processing instance started event...")
+    # TODO: copy artifacts extension
+    execute_event_extendables(constants.INSTANCE_STARTED_EVENT, {})
+
+
+def create_dummy_interface():
+    log.debug("Processing lvs dummy interface creation...")
+    lvs_vip = Config.lvs_virtual_ip.split("|")
+    log.debug("LVS dummy interface creation values %s %s " % (lvs_vip[0], lvs_vip[1]))
+    execute_event_extendables(
+        constants.CREATE_LVS_DUMMY_INTERFACE,
+        {"EVENT": constants.CREATE_LVS_DUMMY_INTERFACE,
+         "LVS_DUMMY_VIRTUAL_IP": lvs_vip[0],
+         "LVS_SUBNET_MASK": lvs_vip[1]}
+    )
+
+
+def on_instance_activated_event():
+    log.debug("Processing instance activated event...")
+    execute_event_extendables(constants.INSTANCE_ACTIVATED_EVENT, {})
+
+
+def on_artifact_updated_event(artifacts_updated_event):
+    log.debug(
+        "Processing artifact updated event for [tenant] %s [cluster] %s [status] %s"
+        % (str(artifacts_updated_event.tenant_id), artifacts_updated_event.cluster_id, artifacts_updated_event.status))
+
+    cluster_id_event = str(artifacts_updated_event.cluster_id).strip()
+    cluster_id_payload = Config.cluster_id
+    repo_url = str(artifacts_updated_event.repo_url).strip()
+
+    if repo_url == "":
+        log.error("Repository URL is empty. Failed to process artifact updated event.")
+        return
+
+    if cluster_id_payload is None or cluster_id_payload == "":
+        log.error("Cluster ID in payload is empty. Failed to process artifact updated event.")
+        return
+
+    if cluster_id_payload != cluster_id_event:
+        log.debug("Cluster ID in artifact updated event does not match. Skipping event handler.")
+        return
+
+    repo_password = None
+    if artifacts_updated_event.repo_password is not None:
+        secret = Config.cartridge_key
+        repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret)
+
+    if Config.app_path is None:
+        log.error("Repository path is empty. Failed to process artifact updated event.")
+        return
+
+    repo_username = artifacts_updated_event.repo_username
+    tenant_id = artifacts_updated_event.tenant_id
+    is_multitenant = Config.is_multiTenant
+    commit_enabled = artifacts_updated_event.commit_enabled
+
+    # create repo object
+    local_repo_path = get_repo_path_for_tenant(str(tenant_id), Config.app_path, is_multitenant)
+    repo_info = Repository(repo_url, repo_username, repo_password, local_repo_path, tenant_id, commit_enabled)
+    log.info("Executing checkout job on artifact updated event...")
+
+    try:
+        Config.artifact_checkout_plugin.plugin_object.checkout(repo_info)
+    except Exception as e:
+        log.exception(
+            "Checkout job on artifact updated event failed for tenant: %s %s" % (repo_info.tenant_id, e))
+
+    # execute artifact updated extension
+    plugin_values = {"ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id,
+                     "ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id,
+                     "ARTIFACT_UPDATED_REPO_URL": artifacts_updated_event.repo_url,
+                     "ARTIFACT_UPDATED_REPO_PASSWORD": artifacts_updated_event.repo_password,
+                     "ARTIFACT_UPDATED_REPO_USERNAME": artifacts_updated_event.repo_username,
+                     "ARTIFACT_UPDATED_STATUS": artifacts_updated_event.status}
+
+    try:
+        execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, plugin_values)
+    except Exception as e:
+        log.exception("Could not execute plugins for artifact updated event: %s" % e)
+
+    if not Config.activated:
+        # publish instance activated event if not yet activated
+        publisher.publish_instance_activated_event()
+        on_instance_activated_event()
+
+    update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, True)
+    auto_commit = Config.is_commits_enabled
+    auto_checkout = Config.is_checkout_enabled
+    log.info("ADC configuration: [update_artifacts] %s, [auto-commit] %s, [auto-checkout] %s"
+             % (update_artifacts, auto_commit, auto_checkout))
+
+    if update_artifacts:
+        try:
+            update_interval = int(Config.artifact_update_interval)
+        except ValueError:
+            log.exception("Invalid artifact sync interval specified: %s" % ValueError)
+            update_interval = 10
+
+        log.info("Artifact updating task enabled, update interval: %s seconds" % update_interval)
+
+        log.info("Auto Commit is turned %s " % ("on" if auto_commit else "off"))
+        log.info("Auto Checkout is turned %s " % ("on" if auto_checkout else "off"))
+
+        AgentGitHandler.schedule_artifact_update_task(
+            repo_info,
+            auto_checkout,
+            auto_commit,
+            update_interval)
+
+
+def on_instance_cleanup_cluster_event():
+    log.debug("Processing instance cleanup cluster event...")
+    cleanup(constants.INSTANCE_CLEANUP_CLUSTER_EVENT)
+
+
+def on_instance_cleanup_member_event():
+    log.debug("Processing instance cleanup member event...")
+    cleanup(constants.INSTANCE_CLEANUP_MEMBER_EVENT)
+
+
+def on_member_activated_event(member_activated_event):
+    log.debug(
+        "Processing Member activated event: [service] %r [cluster] %r [member] %r"
+        % (member_activated_event.service_name,
+           member_activated_event.cluster_id,
+           member_activated_event.member_id))
+
+    member_initialized = is_member_initialized_in_topology(
+        member_activated_event.service_name,
+        member_activated_event.cluster_id,
+        member_activated_event.member_id)
+
+    if not member_initialized:
+        log.error("Member has not initialized, failed to execute member activated event")
+        return
+
+    execute_event_extendables(constants.MEMBER_ACTIVATED_EVENT, {})
+
+
+def on_complete_topology_event(complete_topology_event):
+    log.debug("Processing Complete topology event...")
+
+    service_name_in_payload = Config.service_name
+    cluster_id_in_payload = Config.cluster_id
+    member_id_in_payload = Config.member_id
+
+    if not Config.initialized:
+        member_initialized = is_member_initialized_in_topology(
+            service_name_in_payload,
+            cluster_id_in_payload,
+            member_id_in_payload)
 
+        if member_initialized:
+            # Set cartridge agent as initialized since member is available and it is in initialized state
+            Config.initialized = True
+            log.info(
+                "Member initialized [member id] %s, [cluster-id] %s, [service] %s"
+                % (member_id_in_payload, cluster_id_in_payload, service_name_in_payload))
 
-class EventHandler:
+    topology = complete_topology_event.get_topology()
+    service = topology.get_service(service_name_in_payload)
+    if service is None:
+        raise Exception("Service not found in topology [service] %s" % service_name_in_payload)
+
+    cluster = service.get_cluster(cluster_id_in_payload)
+    if cluster is None:
+        raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id_in_payload)
+
+    plugin_values = {"TOPOLOGY_JSON": json.dumps(topology.json_str),
+                     "MEMBER_LIST_JSON": json.dumps(cluster.member_list_json)}
+
+    execute_event_extendables(constants.COMPLETE_TOPOLOGY_EVENT, plugin_values)
+
+
+def on_member_initialized_event(member_initialized_event):
     """
-    Event execution related logic
+     Member initialized event is sent by cloud controller once volume attachment and
+     ip address allocation is completed successfully
+    :param member_initialized_event:
+    :return:
     """
+    log.debug("Processing Member initialized event...")
+    service_name_in_payload = Config.service_name
+    cluster_id_in_payload = Config.cluster_id
+    member_id_in_payload = Config.member_id
+
+    if not Config.initialized and member_id_in_payload == member_initialized_event.member_id:
+        member_exists = member_exists_in_topology(
+            service_name_in_payload,
+            cluster_id_in_payload,
+            member_id_in_payload)
+
+        log.debug("Member exists: %s" % member_exists)
+
+        if member_exists:
+            Config.initialized = True
+            mark_member_as_initialized(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
+            log.info("Instance marked as initialized on member initialized event")
+        else:
+            raise Exception("Member [member-id] %s not found in topology while processing member initialized "
+                            "event. [Topology] %s" % (member_id_in_payload, TopologyContext.get_topology()))
 
-    def __init__(self):
-        self.__log = LogFactory().get_log(__name__)
+    execute_event_extendables(constants.MEMBER_INITIALIZED_EVENT, {})
 
-    def on_instance_started_event(self):
-        self.__log.debug("Processing instance started event...")
-        # TODO: copy artifacts extension
-        self.execute_event_extendables(constants.INSTANCE_STARTED_EVENT, {})
-
-    def create_dummy_interface(self):
-        self.__log.debug("Processing lvs dummy interface creation...")
-        lvs_vip = Config.lvs_virtual_ip.split("|")
-        self.__log.debug("LVS dummy interface creation values %s %s " % (lvs_vip[0], lvs_vip[1]))
-        self.execute_event_extendables(constants.CREATE_LVS_DUMMY_INTERFACE,
-                                       {"EVENT": constants.CREATE_LVS_DUMMY_INTERFACE,
-                                        "LVS_DUMMY_VIRTUAL_IP": lvs_vip[0],
-                                        "LVS_SUBNET_MASK": lvs_vip[1]})
-
-    def on_instance_activated_event(self):
-        self.__log.debug("Processing instance activated event...")
-        self.execute_event_extendables(constants.INSTANCE_ACTIVATED_EVENT, {})
-
-    def on_artifact_updated_event(self, artifacts_updated_event):
-        self.__log.debug("Processing artifact updated event for [tenant] %s [cluster] %s [status] %s" %
-                         (str(artifacts_updated_event.tenant_id),
-                          artifacts_updated_event.cluster_id,
-                          artifacts_updated_event.status))
-
-        cluster_id_event = str(artifacts_updated_event.cluster_id).strip()
-        cluster_id_payload = Config.cluster_id
-        repo_url = str(artifacts_updated_event.repo_url).strip()
-
-        if repo_url == "":
-            self.__log.error("Repository URL is empty. Failed to process artifact updated event.")
-            return
-
-        if cluster_id_payload is None or cluster_id_payload == "":
-            self.__log.error("Cluster ID in payload is empty. Failed to process artifact updated event.")
-            return
-
-        if cluster_id_payload != cluster_id_event:
-            self.__log.debug("Cluster ID in artifact updated event does not match. Skipping event handler.")
-            return
-
-        repo_password = None
-        if artifacts_updated_event.repo_password is not None:
-            secret = Config.cartridge_key
-            repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret)
-
-        if Config.app_path is None:
-            self.__log.error("Repository path is empty. Failed to process artifact updated event.")
-            return
-
-        if not EventHandler.validate_repo_path(Config.app_path):
-            self.__log.error(
-                "Repository path cannot be accessed, or is invalid. Failed to process artifact updated event.")
-            return
-
-        repo_username = artifacts_updated_event.repo_username
-        tenant_id = artifacts_updated_event.tenant_id
-        is_multitenant = Config.is_multiTenant
-        commit_enabled = artifacts_updated_event.commit_enabled
-
-        # create repo object
-        local_repo_path = self.get_repo_path_for_tenant(str(tenant_id), Config.app_path, is_multitenant)
-        repo_info = Repository(repo_url, repo_username, repo_password, local_repo_path, tenant_id, commit_enabled)
-        self.__log.info("Executing checkout job on artifact updated event...")
 
-        try:
-            Config.artifact_checkout_plugin.plugin_object.checkout(repo_info)
-        except Exception as e:
-            self.__log.exception(
-                "Checkout job on artifact updated event failed for tenant: %s %s" % (repo_info.tenant_id, e))
+def on_complete_tenant_event(complete_tenant_event):
+    log.debug("Processing Complete tenant event...")
 
-        # execute artifact updated extension
-        plugin_values = {"ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id,
-                         "ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id,
-                         "ARTIFACT_UPDATED_REPO_URL": artifacts_updated_event.repo_url,
-                         "ARTIFACT_UPDATED_REPO_PASSWORD": artifacts_updated_event.repo_password,
-                         "ARTIFACT_UPDATED_REPO_USERNAME": artifacts_updated_event.repo_username,
-                         "ARTIFACT_UPDATED_STATUS": artifacts_updated_event.status}
+    tenant_list_json = complete_tenant_event.tenant_list_json
+    log.debug("Complete tenants:" + json.dumps(tenant_list_json))
 
-        try:
-            self.execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, plugin_values)
-        except Exception as e:
-            self.__log.exception("Could not execute plugins for artifact updated event: %s" % e)
-
-        if not Config.activated:
-            # publish instance activated event if not yet activated
-            publisher.publish_instance_activated_event()
-            self.on_instance_activated_event()
-
-        update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, True)
-        auto_commit = Config.is_commits_enabled
-        auto_checkout = Config.is_checkout_enabled
-        self.__log.info("ADC configuration: [update_artifacts] %s, [auto-commit] %s, [auto-checkout] %s",
-                        update_artifacts, auto_commit, auto_checkout)
-        if update_artifacts:
-            try:
-                update_interval = int(Config.artifact_update_interval)
-            except ValueError:
-                self.__log.exception("Invalid artifact sync interval specified: %s" % ValueError)
-                update_interval = 10
-
-            self.__log.info("Artifact updating task enabled, update interval: %s seconds" % update_interval)
-
-            self.__log.info("Auto Commit is turned %s " % ("on" if auto_commit else "off"))
-            self.__log.info("Auto Checkout is turned %s " % ("on" if auto_checkout else "off"))
-
-            AgentGitHandler.schedule_artifact_update_task(
-                repo_info,
-                auto_checkout,
-                auto_commit,
-                update_interval)
-
-    def on_instance_cleanup_cluster_event(self):
-        self.__log.debug("Processing instance cleanup cluster event...")
-        self.cleanup(constants.INSTANCE_CLEANUP_CLUSTER_EVENT)
-
-    def on_instance_cleanup_member_event(self):
-        self.__log.debug("Processing instance cleanup member event...")
-        self.cleanup(constants.INSTANCE_CLEANUP_MEMBER_EVENT)
-
-    def on_member_activated_event(self, member_activated_event):
-        self.__log.debug("Processing Member activated event: [service] %r [cluster] %r [member] %r"
-                         % (member_activated_event.service_name,
-                            member_activated_event.cluster_id,
-                            member_activated_event.member_id))
-
-        member_initialized = self.is_member_initialized_in_topology(
-            member_activated_event.service_name,
-            member_activated_event.cluster_id,
-            member_activated_event.member_id)
-
-        if not member_initialized:
-            self.__log.error("Member has not initialized, failed to execute member activated event")
-            return
-
-        self.execute_event_extendables(constants.MEMBER_ACTIVATED_EVENT, {})
-
-    def on_complete_topology_event(self, complete_topology_event):
-        self.__log.debug("Processing Complete topology event...")
-
-        service_name_in_payload = Config.service_name
-        cluster_id_in_payload = Config.cluster_id
-        member_id_in_payload = Config.member_id
-
-        if not Config.initialized:
-            member_initialized = self.is_member_initialized_in_topology(
-                service_name_in_payload,
-                cluster_id_in_payload,
-                member_id_in_payload)
-
-            if member_initialized:
-                # Set cartridge agent as initialized since member is available and it is in initialized state
-                Config.initialized = True
-                self.__log.info("Member initialized [member id] %s, [cluster-id] %s, [service] %s" %
-                                (member_id_in_payload, cluster_id_in_payload, service_name_in_payload))
-
-        topology = complete_topology_event.get_topology()
-        service = topology.get_service(service_name_in_payload)
-        if service is None:
-            raise Exception("Service not found in topology [service] %s" % service_name_in_payload)
+    plugin_values = {"TENANT_LIST_JSON": json.dumps(tenant_list_json)}
 
-        cluster = service.get_cluster(cluster_id_in_payload)
-        if cluster is None:
-            raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id_in_payload)
-
-        plugin_values = {"TOPOLOGY_JSON": json.dumps(topology.json_str),
-                         "MEMBER_LIST_JSON": json.dumps(cluster.member_list_json)}
-
-        self.execute_event_extendables(constants.COMPLETE_TOPOLOGY_EVENT, plugin_values)
-
-    def on_member_initialized_event(self, member_initialized_event):
-        """
-         Member initialized event is sent by cloud controller once volume attachment and
-         ip address allocation is completed successfully
-        :return:
-        """
-        self.__log.debug("Processing Member initialized event...")
-        service_name_in_payload = Config.service_name
-        cluster_id_in_payload = Config.cluster_id
-        member_id_in_payload = Config.member_id
-
-        if not Config.initialized and member_id_in_payload == member_initialized_event.member_id:
-            member_exists = self.member_exists_in_topology(service_name_in_payload, cluster_id_in_payload,
-                                                           member_id_in_payload)
-            self.__log.debug("Member exists: %s" % member_exists)
-            if member_exists:
-                Config.initialized = True
-                self.mark_member_as_initialized(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
-                self.__log.info("Instance marked as initialized on member initialized event")
-            else:
-                raise Exception("Member [member-id] %s not found in topology while processing member initialized "
-                                "event. [Topology] %s" % (member_id_in_payload, TopologyContext.get_topology()))
+    execute_event_extendables(constants.COMPLETE_TENANT_EVENT, plugin_values)
+
+
+def on_member_terminated_event(member_terminated_event):
+    log.debug(
+        "Processing Member terminated event: [service] %s [cluster] %s [member] %s"
+        % (member_terminated_event.service_name, member_terminated_event.cluster_id, member_terminated_event.member_id))
+
+    member_initialized = is_member_initialized_in_topology(
+        member_terminated_event.service_name,
+        member_terminated_event.cluster_id,
+        member_terminated_event.member_id
+    )
+
+    if not member_initialized:
+        log.error("Member has not initialized, failed to execute member terminated event")
+        return
+
+    execute_event_extendables(constants.MEMBER_TERMINATED_EVENT, {})
+
+
+def on_member_suspended_event(member_suspended_event):
+    log.debug(
+        "Processing Member suspended event: [service] %s [cluster] %s [member] %s"
+        % (member_suspended_event.service_name, member_suspended_event.cluster_id, member_suspended_event.member_id))
+
+    member_initialized = is_member_initialized_in_topology(
+        member_suspended_event.service_name,
+        member_suspended_event.cluster_id,
+        member_suspended_event.member_id
+    )
+
+    if not member_initialized:
+        log.error("Member has not initialized, failed to execute member suspended event")
+        return
+
+    execute_event_extendables(constants.MEMBER_SUSPENDED_EVENT, {})
+
+
+def on_member_started_event(member_started_event):
+    log.debug(
+        "Processing Member started event: [service] %s [cluster] %s [member] %s"
+        % (member_started_event.service_name, member_started_event.cluster_id, member_started_event.member_id))
+
+    member_initialized = is_member_initialized_in_topology(
+        member_started_event.service_name,
+        member_started_event.cluster_id,
+        member_started_event.member_id
+    )
+
+    if not member_initialized:
+        log.error("Member has not initialized, failed to execute member started event")
+        return
+
+    execute_event_extendables(constants.MEMBER_STARTED_EVENT, {})
+
+
+def start_server_extension():
+    log.debug("Processing start server extension...")
+    service_name_in_payload = Config.service_name
+    cluster_id_in_payload = Config.cluster_id
+    member_id_in_payload = Config.member_id
+    member_initialized = is_member_initialized_in_topology(
+        service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
+
+    if not member_initialized:
+        log.error("Member has not initialized, failed to execute start server event")
+        return
+
+    execute_event_extendables("StartServers", {})
+
+
+def volume_mount_extension(persistence_mappings_payload):
+    log.debug("Processing volume mount extension...")
+    execute_event_extendables("VolumeMount", persistence_mappings_payload)
+
+
+def on_domain_mapping_added_event(domain_mapping_added_event):
+    tenant_domain = find_tenant_domain(domain_mapping_added_event.tenant_id)
+    log.debug(
+        "Processing Domain mapping added event: [tenant-id] " + str(domain_mapping_added_event.tenant_id) +
+        " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_added_event.domain_name +
+        " [application-context] " + domain_mapping_added_event.application_context
+    )
+
+    plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_added_event.application_id,
+                     "SUBSCRIPTION_SERVICE_NAME": domain_mapping_added_event.service_name,
+                     "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_added_event.domain_name,
+                     "SUBSCRIPTION_CLUSTER_ID": domain_mapping_added_event.cluster_id,
+                     "SUBSCRIPTION_TENANT_ID": int(domain_mapping_added_event.tenant_id),
+                     "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain,
+                     "SUBSCRIPTION_CONTEXT_PATH":
+                         domain_mapping_added_event.context_path}
+
+    execute_event_extendables(constants.DOMAIN_MAPPING_ADDED_EVENT, plugin_values)
+
+
+def on_domain_mapping_removed_event(domain_mapping_removed_event):
+    tenant_domain = find_tenant_domain(domain_mapping_removed_event.tenant_id)
+    log.info(
+        "Domain mapping removed event received: [tenant-id] " + str(domain_mapping_removed_event.tenant_id) +
+        " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_removed_event.domain_name
+    )
+
+    plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_removed_event.application_id,
+                     "SUBSCRIPTION_SERVICE_NAME": domain_mapping_removed_event.service_name,
+                     "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_removed_event.domain_name,
+                     "SUBSCRIPTION_CLUSTER_ID": domain_mapping_removed_event.cluster_id,
+                     "SUBSCRIPTION_TENANT_ID": int(domain_mapping_removed_event.tenant_id),
+                     "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain}
+
+    execute_event_extendables(constants.DOMAIN_MAPPING_REMOVED_EVENT, plugin_values)
+
+
+def on_copy_artifacts_extension(src, dest):
+    log.debug("Processing Copy artifacts extension...")
+    plugin_values = {"SOURCE": src, "DEST": dest}
+    execute_event_extendables("CopyArtifacts", plugin_values)
 
-        self.execute_event_extendables(constants.MEMBER_INITIALIZED_EVENT, {})
-
-    def on_complete_tenant_event(self, complete_tenant_event):
-        self.__log.debug("Processing Complete tenant event...")
-
-        tenant_list_json = complete_tenant_event.tenant_list_json
-        self.__log.debug("Complete tenants:" + json.dumps(tenant_list_json))
-
-        plugin_values = {"TENANT_LIST_JSON": json.dumps(tenant_list_json)}
-
-        self.execute_event_extendables(constants.COMPLETE_TENANT_EVENT, plugin_values)
-
-    def on_member_terminated_event(self, member_terminated_event):
-        self.__log.debug("Processing Member terminated event: [service] %s [cluster] %s [member] %s" %
-                         (member_terminated_event.service_name, member_terminated_event.cluster_id,
-                          member_terminated_event.member_id))
-
-        member_initialized = self.is_member_initialized_in_topology(
-            member_terminated_event.service_name,
-            member_terminated_event.cluster_id,
-            member_terminated_event.member_id
-        )
-
-        if not member_initialized:
-            self.__log.error("Member has not initialized, failed to execute member terminated event")
-            return
-
-        self.execute_event_extendables(constants.MEMBER_TERMINATED_EVENT, {})
-
-    def on_member_suspended_event(self, member_suspended_event):
-        self.__log.debug("Processing Member suspended event: [service] %s [cluster] %s [member] %s" %
-                         (member_suspended_event.service_name, member_suspended_event.cluster_id,
-                          member_suspended_event.member_id))
-
-        member_initialized = self.is_member_initialized_in_topology(
-            member_suspended_event.service_name,
-            member_suspended_event.cluster_id,
-            member_suspended_event.member_id
-        )
-
-        if not member_initialized:
-            self.__log.error("Member has not initialized, failed to execute member suspended event")
-            return
-
-        self.execute_event_extendables(constants.MEMBER_SUSPENDED_EVENT, {})
-
-    def on_member_started_event(self, member_started_event):
-        self.__log.debug("Processing Member started event: [service] %s [cluster] %s [member] %s" %
-                         (member_started_event.service_name, member_started_event.cluster_id,
-                          member_started_event.member_id))
-
-        member_initialized = self.is_member_initialized_in_topology(
-            member_started_event.service_name,
-            member_started_event.cluster_id,
-            member_started_event.member_id
-        )
-
-        if not member_initialized:
-            self.__log.error("Member has not initialized, failed to execute member started event")
-            return
-
-        self.execute_event_extendables(constants.MEMBER_STARTED_EVENT, {})
-
-    def start_server_extension(self):
-        self.__log.debug("Processing start server extension...")
-        service_name_in_payload = Config.service_name
-        cluster_id_in_payload = Config.cluster_id
-        member_id_in_payload = Config.member_id
-        member_initialized = self.is_member_initialized_in_topology(service_name_in_payload, cluster_id_in_payload,
-                                                                    member_id_in_payload)
-
-        if not member_initialized:
-            self.__log.error("Member has not initialized, failed to execute start server event")
-            return
-
-        self.execute_event_extendables("StartServers", {})
-
-    def volume_mount_extension(self, persistence_mappings_payload):
-        self.__log.debug("Processing volume mount extension...")
-        self.execute_event_extendables("VolumeMount", persistence_mappings_payload)
-
-    def on_domain_mapping_added_event(self, domain_mapping_added_event):
-        tenant_domain = EventHandler.find_tenant_domain(domain_mapping_added_event.tenant_id)
-        self.__log.debug(
-            "Processing Domain mapping added event: [tenant-id] " + str(domain_mapping_added_event.tenant_id) +
-            " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_added_event.domain_name +
-            " [application-context] " + domain_mapping_added_event.application_context
-        )
-
-        plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_added_event.application_id,
-                         "SUBSCRIPTION_SERVICE_NAME": domain_mapping_added_event.service_name,
-                         "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_added_event.domain_name,
-                         "SUBSCRIPTION_CLUSTER_ID": domain_mapping_added_event.cluster_id,
-                         "SUBSCRIPTION_TENANT_ID": int(domain_mapping_added_event.tenant_id),
-                         "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain,
-                         "SUBSCRIPTION_CONTEXT_PATH":
-                             domain_mapping_added_event.context_path}
-
-        self.execute_event_extendables(constants.DOMAIN_MAPPING_ADDED_EVENT, plugin_values)
-
-    def on_domain_mapping_removed_event(self, domain_mapping_removed_event):
-        tenant_domain = EventHandler.find_tenant_domain(domain_mapping_removed_event.tenant_id)
-        self.__log.info(
-            "Domain mapping removed event received: [tenant-id] " + str(domain_mapping_removed_event.tenant_id) +
-            " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_removed_event.domain_name
-        )
-
-        plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_removed_event.application_id,
-                         "SUBSCRIPTION_SERVICE_NAME": domain_mapping_removed_event.service_name,
-                         "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_removed_event.domain_name,
-                         "SUBSCRIPTION_CLUSTER_ID": domain_mapping_removed_event.cluster_id,
-                         "SUBSCRIPTION_TENANT_ID": int(domain_mapping_removed_event.tenant_id),
-                         "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain}
-
-        self.execute_event_extendables(constants.DOMAIN_MAPPING_REMOVED_EVENT, plugin_values)
-
-    def on_copy_artifacts_extension(self, src, dest):
-        self.__log.debug("Processing Copy artifacts extension...")
-        plugin_values = {"SOURCE": src, "DEST": dest}
-        self.execute_event_extendables("CopyArtifacts", plugin_values)
-
-    def on_tenant_subscribed_event(self, tenant_subscribed_event):
-        self.__log.debug(
-            "Processing Tenant subscribed event: [tenant] " + str(tenant_subscribed_event.tenant_id) +
-            " [service] " + tenant_subscribed_event.service_name + " [cluster] " + tenant_subscribed_event.cluster_ids
-        )
-
-        self.execute_event_extendables(constants.TENANT_SUBSCRIBED_EVENT, {})
-
-    def on_application_signup_removed_event(self, application_signup_removal_event):
-        self.__log.debug(
-            "Processing Tenant unsubscribed event: [tenant] " + str(application_signup_removal_event.tenantId) +
-            " [application ID] " + str(application_signup_removal_event.applicationId)
-        )
-
-        if Config.application_id == application_signup_removal_event.applicationId:
-            AgentGitHandler.remove_repo(application_signup_removal_event.tenantId)
-
-        self.execute_event_extendables(constants.APPLICATION_SIGNUP_REMOVAL_EVENT, {})
-
-    def cleanup(self, event):
-        self.__log.debug("Executing cleanup extension for event %s..." % event)
-        publisher.publish_maintenance_mode_event()
-        self.execute_event_extendables("clean", {})
-        publisher.publish_instance_ready_to_shutdown_event()
-
-    def execute_event_extendables(self, event, input_values):
-        """ Execute the extensions and plugins related to the event
-        :param event: The event name string
-        :param input_values: the values to be passed to the plugin
-        :return:
-        """
-        try:
-            input_values = EventHandler.add_common_input_values(input_values)
-        except Exception as e:
-            self.__log.error("Error while adding common input values for event extendables: %s" % e)
-        input_values["EVENT"] = event
-        self.__log.debug("Executing extensions for [event] %s with [input values] %s" % (event, input_values))
-        # Execute the extension
-        self.execute_extension_for_event(event, input_values)
-        # Execute the plugins
-        self.execute_plugins_for_event(event, input_values)
-
-    def execute_plugins_for_event(self, event, input_values):
-        """ For each plugin registered for the specified event, start a plugin execution thread
-       :param str event: The event name string
-       :param dict input_values: the values to be passed to the plugin
-       :return:
-       """
-        try:
-            plugins_for_event = Config.plugins.get(event)
-            if plugins_for_event is not None:
-                for plugin_info in plugins_for_event:
-                    self.__log.debug("Executing plugin %s for event %s" % (plugin_info.name, event))
-                    plugin_thread = PluginExecutor(plugin_info, input_values)
-                    plugin_thread.start()
-
-                    # block till plugin run completes.
-                    plugin_thread.join()
-            else:
-                self.__log.debug("No plugins registered for event %s" % event)
-        except Exception as e:
-            self.__log.exception("Error while executing plugin for event %s: %s" % (event, e))
-
-    def execute_extension_for_event(self, event, extension_values):
-        """ Execute the extension related to the event
-        :param event: The event name string
-        :param extension_values: the values to be passed to the plugin
-        :return:
-        """
-        try:
-            if Config.extension_executor is not None:
-                self.__log.debug("Executing extension for event [%s]" % event)
-                extension_thread = PluginExecutor(Config.extension_executor, extension_values)
-                extension_thread.start()
+
+def on_tenant_subscribed_event(tenant_subscribed_event):
+    log.debug(
+        "Processing Tenant subscribed event: [tenant] " + str(tenant_subscribed_event.tenant_id) +
+        " [service] " + tenant_subscribed_event.service_name + " [cluster] " + tenant_subscribed_event.cluster_ids
+    )
+
+    execute_event_extendables(constants.TENANT_SUBSCRIBED_EVENT, {})
+
+
+def on_application_signup_removed_event(application_signup_removal_event):
+    log.debug(
+        "Processing Tenant unsubscribed event: [tenant] " + str(application_signup_removal_event.tenantId) +
+        " [application ID] " + str(application_signup_removal_event.applicationId)
+    )
+
+    if Config.application_id == application_signup_removal_event.applicationId:
+        AgentGitHandler.remove_repo(application_signup_removal_event.tenantId)
+
+    execute_event_extendables(constants.APPLICATION_SIGNUP_REMOVAL_EVENT, {})
+
+
+def cleanup(event):
+    log.debug("Executing cleanup extension for event %s..." % event)
+    publisher.publish_maintenance_mode_event()
+    execute_event_extendables("clean", {})
+    publisher.publish_instance_ready_to_shutdown_event()
+
+
+def execute_event_extendables(event, input_values):
+    """ Execute the extensions and plugins related to the event
+    :param event: The event name string
+    :param input_values: the values to be passed to the plugin
+    :return:
+    """
+    try:
+        input_values = add_common_input_values(input_values)
+    except Exception as e:
+        log.error("Error while adding common input values for event extendables: %s" % e)
+    input_values["EVENT"] = event
+    log.debug("Executing extensions for [event] %s with [input values] %s" % (event, input_values))
+    # Execute the extension
+    execute_extension_for_event(event, input_values)
+    # Execute the plugins
+    execute_plugins_for_event(event, input_values)
+
+
+def execute_plugins_for_event(event, input_values):
+    """ For each plugin registered for the specified event, start a plugin execution thread
+   :param str event: The event name string
+   :param dict input_values: the values to be passed to the plugin
+   :return:
+   """
+    try:
+        plugins_for_event = Config.plugins.get(event)
+        if plugins_for_event is not None:
+            for plugin_info in plugins_for_event:
+                log.debug("Executing plugin %s for event %s" % (plugin_info.name, event))
+                plugin_thread = PluginExecutor(plugin_info, input_values)
+                plugin_thread.start()
 
                 # block till plugin run completes.
-                extension_thread.join()
-            else:
-                self.__log.debug("No extensions registered for event %s" % event)
-        except OSError as e:
-            self.__log.warn("No extension was found for event %s: %s" % (event, e))
-        except Exception as e:
-            self.__log.exception("Error while executing extension for event %s: %s" % (event, e))
-
-    def get_repo_path_for_tenant(self, tenant_id, git_local_repo_path, is_multitenant):
-        """ Finds the repository path for tenant to clone from the remote repository
-        :param tenant_id:
-        :param git_local_repo_path:
-        :param is_multitenant:
-        :return:
-        """
-        repo_path = ""
-
-        if is_multitenant:
-            if tenant_id == SUPER_TENANT_ID:
-                # super tenant, /repository/deploy/server/
-                super_tenant_repo_path = Config.super_tenant_repository_path
-                # "app_path"
-                repo_path += git_local_repo_path
-
-                if super_tenant_repo_path is not None and super_tenant_repo_path != "":
-                    super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.startswith("/") \
-                        else "/" + super_tenant_repo_path
-                    super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.endswith("/") \
-                        else super_tenant_repo_path + "/"
-                    # "app_path/repository/deploy/server/"
-                    repo_path += super_tenant_repo_path
-                else:
-                    # "app_path/repository/deploy/server/"
-                    repo_path += SUPER_TENANT_REPO_PATH
+                plugin_thread.join()
+        else:
+            log.debug("No plugins registered for event %s" % event)
+    except Exception as e:
+        log.exception("Error while executing plugin for event %s: %s" % (event, e))
+
 
+def execute_extension_for_event(event, extension_values):
+    """ Execute the extension related to the event
+    :param event: The event name string
+    :param extension_values: the values to be passed to the plugin
+    :return:
+    """
+    try:
+        if Config.extension_executor is not None:
+            log.debug("Executing extension for event [%s]" % event)
+            extension_thread = PluginExecutor(Config.extension_executor, extension_values)
+            extension_thread.start()
+
+            # block till plugin run completes.
+            extension_thread.join()
+        else:
+            log.debug("No extensions registered for event %s" % event)
+    except OSError as e:
+        log.warn("No extension was found for event %s: %s" % (event, e))
+    except Exception as e:
+        log.exception("Error while executing extension for event %s: %s" % (event, e))
+
+
+def get_repo_path_for_tenant(tenant_id, git_local_repo_path, is_multitenant):
+    """ Finds the repository path for tenant to clone from the remote repository
+    :param tenant_id:
+    :param git_local_repo_path:
+    :param is_multitenant:
+    :return:
+    """
+    repo_path = ""
+
+    if is_multitenant:
+        if tenant_id == SUPER_TENANT_ID:
+            # super tenant, /repository/deploy/server/
+            super_tenant_repo_path = Config.super_tenant_repository_path
+            # "app_path"
+            repo_path += git_local_repo_path
+
+            if super_tenant_repo_path is not None and super_tenant_repo_path != "":
+                super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.startswith("/") \
+                    else "/" + super_tenant_repo_path
+                super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.endswith("/") \
+                    else super_tenant_repo_path + "/"
+                # "app_path/repository/deploy/server/"
+                repo_path += super_tenant_repo_path
             else:
-                # normal tenant, /repository/tenants/tenant_id
-                tenant_repo_path = Config.tenant_repository_path
-                # "app_path"
-                repo_path += git_local_repo_path
-
-                if tenant_repo_path is not None and tenant_repo_path != "":
-                    tenant_repo_path = tenant_repo_path if tenant_repo_path.startswith("/") else "/" + tenant_repo_path
-                    tenant_repo_path = tenant_repo_path if tenant_repo_path.endswith("/") else tenant_repo_path + "/"
-                    # "app_path/repository/tenants/244653444"
-                    repo_path += tenant_repo_path + tenant_id
-                else:
-                    # "app_path/repository/tenants/244653444"
-                    repo_path += TENANT_REPO_PATH + tenant_id
-
-                    # tenant_dir_path = git_local_repo_path + AgentGitHandler.TENANT_REPO_PATH + tenant_id
-                    # GitUtils.create_dir(repo_path)
+                # "app_path/repository/deploy/server/"
+                repo_path += SUPER_TENANT_REPO_PATH
+
         else:
-            # not multi tenant, app_path
-            repo_path = git_local_repo_path
-
-        self.__log.debug("Repo path returned : %r" % repo_path)
-        return repo_path
-
-    def is_member_initialized_in_topology(self, service_name, cluster_id, member_id):
-        if self.member_exists_in_topology(service_name, cluster_id, member_id):
-            topology = TopologyContext.get_topology()
-            service = topology.get_service(service_name)
-            if service is None:
-                raise Exception("Service not found in topology [service] %s" % service_name)
-
-            cluster = service.get_cluster(cluster_id)
-            if cluster is None:
-                raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
-
-            member = cluster.get_member(member_id)
-            if member is None:
-                raise Exception("Member id not found in topology [member] %s" % member_id)
-
-            self.__log.info("Found member: " + member.to_json())
-            if member.status == MemberStatus.Initialized:
-                return True
-        return False
+            # normal tenant, /repository/tenants/tenant_id
+            tenant_repo_path = Config.tenant_repository_path
+            # "app_path"
+            repo_path += git_local_repo_path
+
+            if tenant_repo_path is not None and tenant_repo_path != "":
+                tenant_repo_path = tenant_repo_path if tenant_repo_path.startswith("/") else "/" + tenant_repo_path
+                tenant_repo_path = tenant_repo_path if tenant_repo_path.endswith("/") else tenant_repo_path + "/"
+                # "app_path/repository/tenants/244653444"
+                repo_path += tenant_repo_path + tenant_id
+            else:
+                # "app_path/repository/tenants/244653444"
+                repo_path += TENANT_REPO_PATH + tenant_id
+
+                # tenant_dir_path = git_local_repo_path + AgentGitHandler.TENANT_REPO_PATH + tenant_id
+                # GitUtils.create_dir(repo_path)
+    else:
+        # not multi tenant, app_path
+        repo_path = git_local_repo_path
 
-    def member_exists_in_topology(self, service_name, cluster_id, member_id):
+    log.debug("Repo path returned : %r" % repo_path)
+    return repo_path
+
+
+def is_member_initialized_in_topology(service_name, cluster_id, member_id):
+    if member_exists_in_topology(service_name, cluster_id, member_id):
         topology = TopologyContext.get_topology()
         service = topology.get_service(service_name)
         if service is None:
@@ -519,131 +524,149 @@ class EventHandler:
         if cluster is None:
             raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
 
-        activated_member = cluster.get_member(member_id)
-        if activated_member is None:
-            self.__log.error("Member id not found in topology [member] %s" % member_id)
-            return False
+        member = cluster.get_member(member_id)
+        if member is None:
+            raise Exception("Member id not found in topology [member] %s" % member_id)
 
-        return True
+        log.info("Found member: " + member.to_json())
+        if member.status == MemberStatus.Initialized:
+            return True
+    return False
 
-    @staticmethod
-    def mark_member_as_initialized(service_name, cluster_id, member_id):
-        topology = TopologyContext.get_topology()
-        service = topology.get_service(service_name)
+
+def member_exists_in_topology(service_name, cluster_id, member_id):
+    topology = TopologyContext.get_topology()
+    service = topology.get_service(service_name)
+    if service is None:
+        raise Exception("Service not found in topology [service] %s" % service_name)
+
+    cluster = service.get_cluster(cluster_id)
+    if cluster is None:
+        raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
+
+    activated_member = cluster.get_member(member_id)
+    if activated_member is None:
+        log.error("Member id not found in topology [member] %s" % member_id)
+        return False
+
+    return True
+
+
+def mark_member_as_initialized(service_name, cluster_id, member_id):
+    topology = TopologyContext.get_topology()
+    service = topology.get_service(service_name)
+    if service is None:
+        raise Exception("Service not found in topology [service] %s" % service_name)
+
+    cluster = service.get_cluster(cluster_id)
+    if cluster is None:
+        raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
+
+    member = cluster.get_member(member_id)
+    if member is None:
+        raise Exception("Member id not found in topology [member] %s" % member_id)
+    member.status = MemberStatus.Initialized
+
+
+def add_common_input_values(plugin_values):
+    """
+    Adds the common parameters to be used by the extension scripts
+    :param dict[str, str] plugin_values: Dictionary to be added
+    :return: Dictionary with updated parameters
+    :rtype: dict[str, str]
+    """
+    if plugin_values is None:
+        plugin_values = {}
+    elif type(plugin_values) != dict:
+        plugin_values = {"VALUE1": str(plugin_values)}
+
+    plugin_values["APPLICATION_PATH"] = Config.app_path
+    plugin_values["PARAM_FILE_PATH"] = Config.read_property(constants.PARAM_FILE_PATH, False)
+    plugin_values["PERSISTENCE_MAPPINGS"] = Config.persistence_mappings
+
+    lb_cluster_id_in_payload = Config.lb_cluster_id
+    lb_private_ip, lb_public_ip = get_lb_member_ip(lb_cluster_id_in_payload)
+    plugin_values["LB_IP"] = lb_private_ip if lb_private_ip is not None else Config.lb_private_ip
+    plugin_values["LB_PUBLIC_IP"] = lb_public_ip if lb_public_ip is not None else Config.lb_public_ip
+
+    topology = TopologyContext.get_topology()
+    if topology.initialized:
+        service = topology.get_service(Config.service_name)
         if service is None:
-            raise Exception("Service not found in topology [service] %s" % service_name)
+            raise Exception("Service not found in topology [service] %s" % Config.service_name)
 
-        cluster = service.get_cluster(cluster_id)
+        cluster = service.get_cluster(Config.cluster_id)
         if cluster is None:
-            raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
+            raise Exception("Cluster id not found in topology [cluster] %s" % Config.cluster_id)
 
-        member = cluster.get_member(member_id)
+        member = cluster.get_member(Config.member_id)
         if member is None:
-            raise Exception("Member id not found in topology [member] %s" % member_id)
-        member.status = MemberStatus.Initialized
-
-    @staticmethod
-    def add_common_input_values(plugin_values):
-        """
-        Adds the common parameters to be used by the extension scripts
-        :param dict[str, str] plugin_values: Dictionary to be added
-        :return: Dictionary with updated parameters
-        :rtype: dict[str, str]
-        """
-        if plugin_values is None:
-            plugin_values = {}
-        elif type(plugin_values) != dict:
-            plugin_values = {"VALUE1": str(plugin_values)}
-
-        plugin_values["APPLICATION_PATH"] = Config.app_path
-        plugin_values["PARAM_FILE_PATH"] = Config.read_property(constants.PARAM_FILE_PATH, False)
-        plugin_values["PERSISTENCE_MAPPINGS"] = Config.persistence_mappings
-
-        lb_cluster_id_in_payload = Config.lb_cluster_id
-        lb_private_ip, lb_public_ip = EventHandler.get_lb_member_ip(lb_cluster_id_in_payload)
-        plugin_values["LB_IP"] = lb_private_ip if lb_private_ip is not None else Config.lb_private_ip
-        plugin_values["LB_PUBLIC_IP"] = lb_public_ip if lb_public_ip is not None else Config.lb_public_ip
+            raise Exception("Member id not found in topology [member] %s" % Config.member_id)
 
-        topology = TopologyContext.get_topology()
-        if topology.initialized:
-            service = topology.get_service(Config.service_name)
-            if service is None:
-                raise Exception("Service not found in topology [service] %s" % Config.service_name)
-
-            cluster = service.get_cluster(Config.cluster_id)
-            if cluster is None:
-                raise Exception("Cluster id not found in topology [cluster] %s" % Config.cluster_id)
-
-            member = cluster.get_member(Config.member_id)
-            if member is None:
-                raise Exception("Member id not found in topology [member] %s" % Config.member_id)
-
-            EventHandler.add_properties(service.properties, plugin_values, "SERVICE_PROPERTY")
-            EventHandler.add_properties(cluster.properties, plugin_values, "CLUSTER_PROPERTY")
-            EventHandler.add_properties(member.properties, plugin_values, "MEMBER_PROPERTY")
-
-        plugin_values.update(Config.get_payload_params())
-
-        return EventHandler.clean_process_parameters(plugin_values)
-
-    @staticmethod
-    def add_properties(properties, params, prefix):
-        """
-        Adds the given property list to the parameters list with given prefix in the parameter name
-        :param dict[str, str] properties: service properties
-        :param dict[str, str] params:
-        :param str prefix:
-        :return: dict[str, str]
-        """
-        if properties is None or properties.items() is None:
-            return
-
-        for key in properties:
-            params[prefix + "_" + key] = str(properties[key])
-
-    @staticmethod
-    def get_lb_member_ip(lb_cluster_id):
-        topology = TopologyContext.get_topology()
-        services = topology.get_services()
-
-        for service in services:
-            clusters = service.get_clusters()
-            for cluster in clusters:
-                members = cluster.get_members()
-                for member in members:
-                    if member.cluster_id == lb_cluster_id:
-                        return member.member_default_private_ip, member.member_default_public_ip
-
-        return None, None
-
-    @staticmethod
-    def clean_process_parameters(params):
-        """
-        Removes any null valued parameters before passing them to the extension scripts
-        :param dict params:
-        :return: cleaned parameters
-        :rtype: dict
-        """
-        for key, value in params.items():
-            if value is None:
-                del params[key]
-
-        return params
-
-    @staticmethod
-    def find_tenant_domain(tenant_id):
-        tenant = TenantContext.get_tenant(tenant_id)
-        if tenant is None:
-            raise RuntimeError("Tenant could not be found: [tenant-id] %s" % str(tenant_id))
-
-        return tenant.tenant_domain
-
-    @staticmethod
-    def validate_repo_path(app_path):
-        # app path would be ex: /var/www, or /opt/server/data
-        return os.access(app_path, os.W_OK)
+        add_properties(service.properties, plugin_values, "SERVICE_PROPERTY")
+        add_properties(cluster.properties, plugin_values, "CLUSTER_PROPERTY")
+        add_properties(member.properties, plugin_values, "MEMBER_PROPERTY")
+
+    plugin_values.update(Config.get_payload_params())
+
+    return clean_process_parameters(plugin_values)
+
+
+def add_properties(properties, params, prefix):
+    """
+    Adds the given property list to the parameters list with given prefix in the parameter name
+    :param dict[str, str] properties: service properties
+    :param dict[str, str] params:
+    :param str prefix:
+    :return: dict[str, str]
+    """
+    if properties is None or properties.items() is None:
+        return
+
+    for key in properties:
+        params[prefix + "_" + key] = str(properties[key])
+
+
+def get_lb_member_ip(lb_cluster_id):
+    topology = TopologyContext.get_topology()
+    services = topology.get_services()
+
+    for service in services:
+        clusters = service.get_clusters()
+        for cluster in clusters:
+            members = cluster.get_members()
+            for member in members:
+                if member.cluster_id == lb_cluster_id:
+                    return member.member_default_private_ip, member.member_default_public_ip
+
+    return None, None
+
+
+def clean_process_parameters(params):
+    """
+    Removes any null valued parameters before passing them to the extension scripts
+    :param dict params:
+    :return: cleaned parameters
+    :rtype: dict
+    """
+    for key, value in params.items():
+        if value is None:
+            del params[key]
+
+    return params
+
+
+def find_tenant_domain(tenant_id):
+    tenant = TenantContext.get_tenant(tenant_id)
+    if tenant is None:
+        raise RuntimeError("Tenant could not be found: [tenant-id] %s" % str(tenant_id))
+
+    return tenant.tenant_domain
 
 
+def validate_repo_path(app_path):
+    # app path would be ex: /var/www, or /opt/server/data
+    return os.access(app_path, os.W_OK)
 
 
 class PluginExecutor(Thread):

http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
index a24650a..5b6190e 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
@@ -15,10 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import threading
-
+from threading import Thread
 import paho.mqtt.publish as publish
 import time
+from Queue import Queue, Empty
 
 import constants
 import healthstats
@@ -201,7 +201,7 @@ def get_publisher(topic):
     return publishers[topic]
 
 
-class EventPublisher:
+class EventPublisher(object):
     """
     Handles publishing events to topics to the provided message broker
     """
@@ -210,24 +210,9 @@ class EventPublisher:
         self.__topic = topic
         self.__log = LogFactory().get_log(__name__)
         self.__start_time = int(time.time())
+        self.__msg_queue = Queue()
 
     def publish(self, event):
-        publisher_thread = threading.Thread(target=self.__publish_event, args=(event,))
-        publisher_thread.start()
-
-    def __publish_event(self, event):
-        """
-        Publishes the given event to the message broker.
-
-        When a list of message brokers are given the event is published to the first message broker
-        available. Therefore the message brokers should share the data (ex: Sharing the KahaDB in ActiveMQ).
-
-        When the event cannot be published, it will be retried until the mb_publisher_timeout is exceeded.
-        This value is set in the agent.conf.
-
-        :param event:
-        :return: True if the event was published.
-        """
         if Config.mb_username is None:
             auth = None
         else:
@@ -244,20 +229,59 @@ class EventPublisher:
             for mb_url in Config.mb_urls:
                 mb_ip, mb_port = mb_url.split(":")
 
+                # start a thread to execute publish event
+                publisher_thread = Thread(target=self.__publish_event, args=(event, mb_ip, mb_port, auth, payload))
+                publisher_thread.start()
+
+                # give sometime for the thread to complete
+                time.sleep(5)
+
+                # check if thread is still running and notify
+                if publisher_thread.isAlive():
+                    self.__log.debug(
+                        "Event publishing timed out before succeeding. The message broker could be offline.")
+
+                # check if publish.single() succeeded
                 try:
-                    publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port, auth=auth)
-                    self.__log.debug("Event type: %s published to MB: %s:%s" % (str(event.__class__), mb_ip, mb_port))
+                    published = self.__msg_queue.get(block=False)
+                except Empty:
+                    published = False
+
+                if published:
                     return True
-                except:
-                    self.__log.debug(
-                        "Could not publish event to message broker %s:%s." % (mb_ip, mb_port))
 
+            # All the brokers on the list were offline
             self.__log.debug(
                 "Could not publish event to any of the provided message brokers. Retrying in %s seconds."
                 % retry_interval)
 
             time.sleep(retry_interval)
 
+        # Even publisher timeout exceeded
         self.__log.warn("Could not publish event to any of the provided message brokers before "
                         "the timeout [%s] exceeded. The event will be dropped." % Config.mb_publisher_timeout)
         return False
+
+    def __publish_event(self, event, mb_ip, mb_port, auth, payload):
+        """
+        Publishes the given event to the message broker.
+
+        When a list of message brokers are given the event is published to the first message broker
+        available. Therefore the message brokers should share the data (ex: Sharing the KahaDB in ActiveMQ).
+
+        When the event cannot be published, it will be retried until the mb_publisher_timeout is exceeded.
+        This value is set in the agent.conf.
+
+        :param event:
+        :return: True if the event was published.
+        """
+        try:
+            self.__log.debug("Publishing [event] %s to %s:%s" % (event.__class__.__name__, mb_ip, mb_port))
+            publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port, auth=auth)
+            self.__log.debug("[Event] %s published to MB: %s:%s" % (str(event.__class__.__name__), mb_ip, mb_port))
+            self.__msg_queue.put(True)
+        except Exception as err:
+            self.__log.debug(
+                "Could not publish [event] %s to message broker %s:%s. : %s"
+                % (str(event.__class__.__name__), mb_ip, mb_port, err))
+            self.__msg_queue.put(False)

http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
index ff5cef9..c5a6d2d 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
@@ -178,7 +178,6 @@ class EventExecutor(threading.Thread):
     def __init__(self, event_queue):
         threading.Thread.__init__(self)
         self.__event_queue = event_queue
-        # TODO: several handlers for one event
         self.__event_handlers = {}
         EventSubscriber.log = LogFactory().get_log(__name__)
 
@@ -191,10 +190,9 @@ class EventExecutor(threading.Thread):
                 try:
                     EventSubscriber.log.debug("Executing handler for event %r" % event)
                     handler(event_msg)
-                except:
-                    EventSubscriber.log.exception("Error processing %r event" % event)
+                except Exception as err:
+                    EventSubscriber.log.exception("Error processing %r event: %s" % (event, err))
             else:
-
                 EventSubscriber.log.debug("Event handler not found for event : %r" % event)
 
     def register_event_handler(self, event, handler):
@@ -226,7 +224,7 @@ class MessageBrokerHeartBeatChecker(AbstractAsyncScheduledTask):
         try:
             self.__mb_client.connect(self.__mb_ip, self.__mb_port, 60)
             self.__mb_client.disconnect()
-        except:
+        except Exception:
             self.__log.info(
                 "Message broker %s:%s cannot be reached. Disconnecting client..." % (self.__mb_ip, self.__mb_port))
             self.__connected_client.disconnect()

http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
index 2398099..ab4975a 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
@@ -67,6 +67,11 @@ public class ADCExtensionTestCase extends PythonAgentIntegrationTest {
         startServerSocket(8080);
     }
 
+    @Override
+    protected String getClassName() {
+        return this.getClass().getSimpleName();
+    }
+
     /**
      * TearDown method for test method testPythonCartridgeAgent
      */

http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
index 6e40dd6..05d5ba2 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
@@ -70,6 +70,11 @@ public class ADCMTAppTenantUserTestCase extends PythonAgentIntegrationTest {
         startServerSocket(8080);
     }
 
+    @Override
+    protected String getClassName() {
+        return this.getClass().getSimpleName();
+    }
+
     /**
      * TearDown method for test method testPythonCartridgeAgent
      */

http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
index 6f0b070..444a5e0 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
@@ -72,6 +72,11 @@ public class ADCMTAppTestCase extends PythonAgentIntegrationTest {
         startServerSocket(8080);
     }
 
+    @Override
+    protected String getClassName() {
+        return this.getClass().getSimpleName();
+    }
+
     /**
      * TearDown method for test method testPythonCartridgeAgent
      */

http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
index 0dc92be..dba6197 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
@@ -63,6 +63,11 @@ public class ADCTestCase extends PythonAgentIntegrationTest {
     public ADCTestCase() throws IOException {
     }
 
+    @Override
+    protected String getClassName() {
+        return this.getClass().getSimpleName();
+    }
+
     @BeforeMethod(alwaysRun = true)
     public void setupADCTest() throws Exception {
         log.info("Setting up ADCTestCase");

http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
index ea156b6..db21359 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
@@ -79,6 +79,11 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
         startServerSocket(8080);
     }
 
+    @Override
+    protected String getClassName() {
+        return this.getClass().getSimpleName();
+    }
+
     /**
      * TearDown method for test method testPythonCartridgeAgent
      */

http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
index 44d295b..ce13d3f 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
@@ -76,6 +76,11 @@ public class CEPHAModeTestCase extends PythonAgentIntegrationTest {
 
     }
 
+    @Override
+    protected String getClassName() {
+        return this.getClass().getSimpleName();
+    }
+
 
     @BeforeMethod(alwaysRun = true)
     public void setupCEPHAModeTest() throws Exception {

http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
index b1f4d8b..8c72f2d 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
@@ -42,6 +42,11 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
     public MessageBrokerHATestCase() throws IOException {
     }
 
+    @Override
+    protected String getClassName() {
+        return this.getClass().getSimpleName();
+    }
+
     private static final Log log = LogFactory.getLog(MessageBrokerHATestCase.class);
     private static final int HA_TEST_TIMEOUT = 300000;
     private static final String CLUSTER_ID = "php.php.domain";
@@ -169,10 +174,11 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
                                 MEMBER_ID);
                         publishEvent(instanceCleanupMemberEvent);
                         publishCleanupEvent = true;
-                        waitUntilCleanupEventIsReceivedAndStopDefaultMB();
+
+                        stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]);
                     }
 
-                    if (line.contains("Could not publish event to message broker localhost:1885.")) {
+                    if (line.contains("Could not publish [event] ")) {
                         log.info("Event publishing to default message broker failed and the next option is tried.");
                         exit = true;
                     }
@@ -186,26 +192,6 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
         log.info("MessageBrokerHATestCase publisher test completed successfully.");
     }
 
-    private void waitUntilCleanupEventIsReceivedAndStopDefaultMB() {
-        boolean eventReceived = false;
-        List<String> outputLines = new ArrayList<>();
-
-        while (!eventReceived) {
-            List<String> newLines = getNewLines(outputLines, outputStream.toString());
-            if (newLines.size() > 0) {
-                for (String line : newLines) {
-                    if (line.contains("Message received: instance/notifier/InstanceCleanupMemberEvent")) {
-                        // take down the default broker
-                        stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]);
-                        eventReceived = true;
-                    }
-                }
-            }
-            log.info("Waiting until cleanup event is received by PCA...");
-        }
-        log.info("Cleanup event is received by PCA.");
-    }
-
     private void assertAgentActivation() {
         pcaActivated = false;
         instanceActivated = false;