You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2014/10/11 11:09:49 UTC

[24/50] [abbrv] git commit: Completed DefaultExtensionHandler and extension util paths Fixed minor bugs Implemented event handler methods

Completed DefaultExtensionHandler and extension util paths
Fixed minor bugs
Implemented event handler methods


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/71f827d0
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/71f827d0
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/71f827d0

Branch: refs/heads/master
Commit: 71f827d0f22bd6e9178e84fde5328ca2f6ef48be
Parents: ccd6ab5
Author: Chamila de Alwis <ch...@wso2.com>
Authored: Fri Sep 26 20:07:04 2014 +0530
Committer: Nirmal Fernando <ni...@gmail.com>
Committed: Sat Oct 11 14:38:29 2014 +0530

----------------------------------------------------------------------
 .../cartridge-agent/agent.py                    |  61 ++--
 .../modules/artifactmgt/git/agentgithandler.py  |  31 +-
 .../modules/artifactmgt/git/gitrepository.py    |   5 +-
 .../modules/event/tenant/events.py              |   8 +-
 .../extensions/defaultextensionhandler.py       | 302 ++++++++++++++++---
 .../modules/util/cartridgeagentutils.py         |   5 +-
 .../modules/util/extensionutils.py              | 174 +++++++++--
 7 files changed, 481 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/agent.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/agent.py b/tools/python-cartridge-agent/cartridge-agent/agent.py
index 60e9805..8ac6803 100644
--- a/tools/python-cartridge-agent/cartridge-agent/agent.py
+++ b/tools/python-cartridge-agent/cartridge-agent/agent.py
@@ -63,8 +63,6 @@ class CartridgeAgent(threading.Thread):
             self.extension_handler.on_instance_activated_event()
 
             cartridgeagentpublisher.publish_instance_activated_event()
-        else:
-            pass
 
         persistence_mappping_payload = CartridgeAgentConfiguration.persistence_mappings
         if persistence_mappping_payload is not None:
@@ -139,13 +137,28 @@ class CartridgeAgent(threading.Thread):
         self.log.info("Cartridge Agent topology receiver thread started")
 
     def on_member_activated(self, msg):
-        raise NotImplementedError
+        self.log.debug("Member activated event received: %r" % msg.payload)
+        event_obj = MemberActivatedEvent.create_from_json(msg.payload)
+        try:
+            self.extension_handler.on_member_activated_event(event_obj)
+        except:
+            self.log.exception("Error processing member terminated event")
 
     def on_member_terminated(self, msg):
-        raise NotImplementedError
+        self.log.debug("Member terminated event received: %r" % msg.payload)
+        event_obj = MemberTerminatedEvent.create_from_json(msg.payload)
+        try:
+            self.extension_handler.on_member_terminated_event(event_obj)
+        except:
+            self.log.exception("Error processing member terminated event")
 
     def on_member_suspended(self, msg):
-        raise NotImplementedError
+        self.log.debug("Member suspended event received: %r" % msg.payload)
+        event_obj = MemberSuspendedEvent.create_from_json(msg.payload)
+        try:
+            self.extension_handler.on_member_suspended_event(event_obj)
+        except:
+            self.log.exception("Error processing member suspended event")
 
     def on_complete_topology(self, msg):
         if not self.__topology_context_initialized:
@@ -154,14 +167,19 @@ class CartridgeAgent(threading.Thread):
             TopologyContext.update(event_obj.topology)
             self.__topology_context_initialized = True
             try:
-                self.extension_handler.onCompleteTopologyEvent(event_obj)
+                self.extension_handler.on_complete_topology_event(event_obj)
             except:
                 self.log.exception("Error processing complete topology event")
         else:
             self.log.info("Complete topology event updating task disabled")
 
     def on_member_started(self, msg):
-        raise NotImplementedError
+        self.log.debug("Member started event received: %r" % msg.payload)
+        event_obj = MemberStartedEvent.create_from_json(msg.payload)
+        try:
+            self.extension_handler.on_member_started_event(event_obj)
+        except:
+            self.log.exception("Error processing member started event")
 
     def register_tenant_event_listeners(self):
         self.log.debug("Starting tenant event message receiver thread")
@@ -175,31 +193,20 @@ class CartridgeAgent(threading.Thread):
         self.log.info("Tenant event message receiver thread started")
 
     def on_subscription_domain_added(self, msg):
-        self.log.debug("Subscription domain added event received")
+        self.log.debug("Subscription domain added event received : %r" % msg.payload)
         event_obj = SubscriptionDomainAddedEvent.create_from_json(msg.payload)
         try:
-            self.extension_handler.onSubscriptionDomainAddedEvent(event_obj)
+            self.extension_handler.on_subscription_domain_added_event(event_obj)
         except:
             self.log.exception("Error processing subscription domains added event")
-        # extensionutils.execute_subscription_domain_added_extension(
-        #     event_obj.tenant_id,
-        #     self.find_tenant_domain(event_obj.tenant_id),
-        #     event_obj.domain_name,
-        #     event_obj.application_context
-        # )
 
     def on_subscription_domain_removed(self, msg):
-        self.log.debug("Subscription domain removed event received")
+        self.log.debug("Subscription domain removed event received : %r" % msg.payload)
         event_obj = SubscriptionDomainRemovedEvent.create_from_json(msg.payload)
         try:
-            self.extension_handler.onSubscriptionDomainRemovedEvent(event_obj)
+            self.extension_handler.on_subscription_domain_removed_event(event_obj)
         except:
             self.log.exception("Error processing subscription domains removed event")
-        # extensionutils.execute_subscription_domain_removed_extension(
-        #     event_obj.tenant_id,
-        #     self.find_tenant_domain(event_obj.tenant_id),
-        #     event_obj.domain_name
-        # )
 
     def on_complete_tenant(self, msg):
         if not self.__tenant_context_initialized:
@@ -208,7 +215,7 @@ class CartridgeAgent(threading.Thread):
             TenantContext.update(event_obj.tenants)
 
             try:
-                self.extension_handler.onCompleteTenantEvent(event_obj)
+                self.extension_handler.on_complete_tenant_event(event_obj)
                 self.__tenant_context_initialized = True
             except:
                 self.log.exception("Error processing complete tenant event")
@@ -216,18 +223,18 @@ class CartridgeAgent(threading.Thread):
             self.log.info("Complete tenant event updating task disabled")
 
     def on_tenant_subscribed(self, msg):
-        self.log.debug("Tenant subscribed event received")
+        self.log.debug("Tenant subscribed event received: %r" % msg.payload)
         event_obj = TenantSubscribedEvent.create_from_json(msg.payload)
         try:
-            self.extension_handler.onTenantSubscribedEvent(event_obj)
+            self.extension_handler.on_tenant_subscribed_event(event_obj)
         except:
             self.log.exception("Error processing tenant subscribed event")
 
     def on_tenant_unsubscribed(self, msg):
-        self.log.debug("Tenant unSubscribed event received")
+        self.log.debug("Tenant unSubscribed event received: %r" % msg.payload)
         event_obj = TenantUnsubscribedEvent.create_from_json(msg.payload)
         try:
-            self.extension_handler.onTenantUnSubscribedEvent(event_obj)
+            self.extension_handler.on_tenant_unsubscribed_event(event_obj)
         except:
             self.log.exception("Error processing tenant unSubscribed event")
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py b/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py
index eda0f92..6671c48 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py
@@ -5,7 +5,7 @@ from git import *
 
 from gitrepository import GitRepository
 from ... config.cartridgeagentconfiguration import CartridgeAgentConfiguration
-from ... util import cartridgeagentutils
+from ... util import cartridgeagentutils, extensionutils, cartridgeagentconstants
 from ... util.asyncscheduledtask import AsyncScheduledTask
 from ... artifactmgt.repositoryinformation import RepositoryInformation
 from ... extensions.defaultextensionhandler import DefaultExtensionHandler
@@ -184,6 +184,12 @@ class AgentGitHandler:
 
     @staticmethod
     def get_repo_context(tenant_id):
+        """
+
+        :param int tenant_id:
+        :return: GitRepository object
+        :rtype: GitRepository
+        """
         if tenant_id in AgentGitHandler.__git_repositories:
             return AgentGitHandler.__git_repositories[tenant_id]
 
@@ -290,6 +296,29 @@ class AgentGitHandler:
         else:
             AgentGitHandler.log.info("Artifact Synchronization Task for path %r already scheduled" % repo_context.local_repo_path)
 
+    @staticmethod
+    def remove_repo(tenant_id):
+        repo_context = AgentGitHandler.get_repo_context(tenant_id)
+
+        #stop artifact update task
+        repo_context.scheduled_update_task.terminate()
+
+        #remove git contents
+        cartridgeagentutils.delete_folder_tree(repo_context.local_repo_path)
+
+        AgentGitHandler.remove_repo_context(tenant_id)
+
+        if tenant_id == -1234:
+            if CartridgeAgentConfiguration.is_multitenant:
+                extensionutils.execute_copy_artifact_extension(
+                    cartridgeagentconstants.SUPERTENANT_TEMP_PATH,
+                    CartridgeAgentConfiguration.app_path + "/repository/deployment/server/"
+                )
+
+        AgentGitHandler.log.info("git repository deleted for tenant %r" % repo_context.tenant_id)
+
+        return True
+
 
 class ArtifactUpdateTask(Thread):
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/gitrepository.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/gitrepository.py b/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/gitrepository.py
index 0e60e2f..92929b5 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/gitrepository.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/gitrepository.py
@@ -1,3 +1,5 @@
+from ...util.asyncscheduledtask import AsyncScheduledTask
+
 class GitRepository:
 
     def __init__(self):
@@ -11,4 +13,5 @@ class GitRepository:
         self.repo_password = None
         self.is_multitenant = False
         self.commit_enabled = False
-        self.scheduled_update_task = None
\ No newline at end of file
+        self.scheduled_update_task = None
+        """:type : AsyncScheduledTask """
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/event/tenant/events.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/event/tenant/events.py b/tools/python-cartridge-agent/cartridge-agent/modules/event/tenant/events.py
index 55a21a2..ce27f66 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/event/tenant/events.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/event/tenant/events.py
@@ -50,6 +50,7 @@ class CompleteTenantEvent:
 
     def __init__(self):
         self.tenants = []
+        self.tenant_list_json = None
 
     @staticmethod
     def create_from_json(json_str):
@@ -57,9 +58,10 @@ class CompleteTenantEvent:
         instance = CompleteTenantEvent()
         instance.tenants = []
 
-        temp_tenants = json_obj["tenants"] if "tenants" in json_obj else None
-        if temp_tenants is not None:
-            for tenant_str in temp_tenants:
+        tenants_str = json_obj["tenants"] if "tenants" in json_obj else None
+        instance.tenant_list_json = tenants_str
+        if tenants_str is not None:
+            for tenant_str in tenants_str:
                 tenant_obj = Tenant(int(tenant_str["tenantId"]), tenant_str["tenantDomain"])
                 for service_name in tenant_str["serviceNameSubscriptionMap"]:
                     sub_str = tenant_str["serviceNameSubscriptionMap"][service_name]

http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
index b9b1b13..0b6e523 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
@@ -1,12 +1,14 @@
 import logging
+import time
 
 from ..artifactmgt.git.agentgithandler import AgentGitHandler
 from ..artifactmgt.repositoryinformation import RepositoryInformation
 from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration
-from ..util import extensionutils, cartridgeagentconstants, cartridgeagentutils
+from ..util import extensionutils
 from ..publisher import cartridgeagentpublisher
 from ..exception.parameternotfoundexception import ParameterNotFoundException
 from ..topology.topologycontext import *
+from ..tenant.tenantcontext import *
 
 
 class DefaultExtensionHandler:
@@ -101,9 +103,8 @@ class DefaultExtensionHandler:
                                                                         update_interval)
 
     def on_artifact_update_scheduler_event(self, tenant_id):
-        env_params = {}
-        env_params["STRATOS_ARTIFACT_UPDATED_TENANT_ID"] = tenant_id
-        env_params["STRATOS_ARTIFACT_UPDATED_SCHEDULER"] = True
+        env_params = {"STRATOS_ARTIFACT_UPDATED_TENANT_ID": tenant_id, "STRATOS_ARTIFACT_UPDATED_SCHEDULER": True}
+
         extensionutils.execute_artifacts_updated_extension(env_params)
 
     def on_instance_cleanup_cluster_event(self, instanceCleanupClusterEvent):
@@ -114,13 +115,12 @@ class DefaultExtensionHandler:
 
     def on_member_activated_event(self, member_activated_event):
         self.log.info("Member activated event received: [service] %r [cluster] %r [member] %r"
-                      % (
-            member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id))
+            % (member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id))
 
-        consistant = extensionutils.check_topology_consistency(member_activated_event.service_name,
+        topology_consistent = extensionutils.check_topology_consistency(member_activated_event.service_name,
                                                                member_activated_event.cluster_id,
                                                                member_activated_event.member_id)
-        if not consistant:
+        if not topology_consistent:
             self.log.error("Topology is inconsistent...failed to execute member activated event")
             return
 
@@ -132,13 +132,13 @@ class DefaultExtensionHandler:
 
         if extensionutils.is_relevant_member_event(member_activated_event.service_name,
                                                    member_activated_event.cluster_id, lb_cluster_id):
-            env_params = {}
-            env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_IP"] = member_activated_event.member_ip
-            env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_ID"] = member_activated_event.member_id
-            env_params["STRATOS_MEMBER_ACTIVATED_CLUSTER_ID"] = member_activated_event.cluster_id
-            env_params["STRATOS_MEMBER_ACTIVATED_LB_CLUSTER_ID"] = lb_cluster_id
-            env_params["STRATOS_MEMBER_ACTIVATED_NETWORK_PARTITION_ID"] = member_activated_event.network_partition_id
-            env_params["STRATOS_MEMBER_ACTIVATED_SERVICE_NAME"] = member_activated_event.service_name
+
+            env_params = {"STRATOS_MEMBER_ACTIVATED_MEMBER_IP": member_activated_event.member_ip,
+                          "STRATOS_MEMBER_ACTIVATED_MEMBER_ID": member_activated_event.member_id,
+                          "STRATOS_MEMBER_ACTIVATED_CLUSTER_ID": member_activated_event.cluster_id,
+                          "STRATOS_MEMBER_ACTIVATED_LB_CLUSTER_ID": lb_cluster_id,
+                          "STRATOS_MEMBER_ACTIVATED_NETWORK_PARTITION_ID": member_activated_event.network_partition_id,
+                          "STRATOS_MEMBER_ACTIVATED_SERVICE_NAME": member_activated_event.service_name}
 
             ports = member_activated_event.port_map.values()
             ports_str = ""
@@ -147,11 +147,7 @@ class DefaultExtensionHandler:
 
             env_params["STRATOS_MEMBER_ACTIVATED_PORTS"] = ports_str
 
-            members = cluster.get_members()
-            member_list_json = ""
-            for member in members:
-                member_list_json += member.json_str + ","
-            env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_LIST_JSON"] = member_list_json[:-1]  # removing last comma
+            env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_LIST_JSON"] = cluster.member_list_json
 
             member_ips = extensionutils.get_lb_member_ip(lb_cluster_id)
             if member_ips is not None and len(member_ips) > 1:
@@ -167,7 +163,8 @@ class DefaultExtensionHandler:
             clustered = CartridgeAgentConfiguration.is_clustered
 
             if member.properties is not None and member.properties[
-                cartridgeagentconstants.CLUSTERING_PRIMARY_KEY] == "true" and clustered is not None and clustered:
+                    cartridgeagentconstants.CLUSTERING_PRIMARY_KEY] == "true" and clustered is not None and clustered:
+
                 self.log.debug(" If WK member is re-spawned, update axis2.xml ")
 
                 has_wk_ip_changed = True
@@ -194,20 +191,179 @@ class DefaultExtensionHandler:
         else:
             self.log.debug("Member activated event is not relevant...skipping agent extension")
 
-    def onCompleteTopologyEvent(self, completeTopologyEvent):
-        pass
+    def on_complete_topology_event(self, complete_topology_event):
+        self.log.debug("Complete topology event received")
+
+        service_name_in_payload = CartridgeAgentConfiguration.service_name
+        cluster_id_in_payload = CartridgeAgentConfiguration.cluster_id
+        member_id_in_payload = CartridgeAgentConfiguration.member_id
+
+        extensionutils.check_topology_consistency(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
+
+        topology = complete_topology_event.get_topology()
+        service = topology.get_service(service_name_in_payload)
+        cluster = service.get_cluster(cluster_id_in_payload)
+
+        env_params = {"STRATOS_TOPOLOGY_JSON": topology.json_str, "STRATOS_MEMBER_LIST_JSON": cluster.member_list_json}
+
+        extensionutils.execute_complete_topology_extension(env_params)
+
+    def on_complete_tenant_event(self, complete_tenant_event):
+        self.log.debug("Complete tenant event received")
+
+        tenant_list_json = complete_tenant_event.tenant_list_json
+        self.log.debug("Complete tenants:" + tenant_list_json)
+
+        env_params = {"STRATOS_TENANT_LIST_JSON": tenant_list_json}
+
+        extensionutils.execute_complete_tenant_extension(env_params)
+
+    def on_member_terminated_event(self, member_terminated_event):
+        self.log.info("Member terminated event received: [service] " + member_terminated_event.service_name +
+                      " [cluster] " + member_terminated_event.cluster_id + " [member] " + member_terminated_event.member_id)
+
+        topology_consistent = extensionutils.check_topology_consistency(
+            member_terminated_event.service_name,
+            member_terminated_event.cluster_id,
+            member_terminated_event.member_id
+        )
+
+        if not topology_consistent:
+            self.log.error("Topology is inconsistent...failed to execute member terminated event")
+            return
+
+        topology = TopologyContext.get_topology()
+        service = topology.get_service(member_terminated_event.service_name)
+        cluster = service.get_cluster(member_terminated_event.cluster_id)
+        terminated_member = cluster.get_member(member_terminated_event.member_id)
+        lb_cluster_id = cluster.get_member(member_terminated_event.cluster_id).lb_cluster_id
+
+        #check whether terminated member is within the same cluster, LB cluster or service group
+        if extensionutils.is_relevant_member_event(
+                member_terminated_event.service_name,
+                member_terminated_event.cluster_id,
+                lb_cluster_id):
+
+            env_params = {"STRATOS_MEMBER_TERMINATED_MEMBER_IP": terminated_member.member_ip,
+                          "STRATOS_MEMBER_TERMINATED_MEMBER_ID": member_terminated_event.member_id,
+                          "STRATOS_MEMBER_TERMINATED_CLUSTER_ID": member_terminated_event.cluster_id,
+                          "STRATOS_MEMBER_TERMINATED_LB_CLUSTER_ID": lb_cluster_id,
+                          "STRATOS_MEMBER_TERMINATED_NETWORK_PARTITION_ID": member_terminated_event.network_partition_id,
+                          "STRATOS_MEMBER_TERMINATED_SERVICE_NAME": member_terminated_event.service_name,
+                          "STRATOS_MEMBER_TERMINATED_MEMBER_LIST_JSON": cluster.member_list_json,
+                          "STRATOS_TOPOLOGY_JSON": topology.json_str}
+
+            member_ips = extensionutils.get_lb_member_ip(lb_cluster_id)
+            if member_ips is not None and len(member_ips) > 1:
+                env_params["STRATOS_MEMBER_TERMINATED_LB_IP"] = member_ips[0]
+                env_params["STRATOS_MEMBER_TERMINATED_LB_PUBLIC_IP"] = member_ips[1]
+
+            extensionutils.add_properties(service.properties, env_params, "MEMBER_TERMINATED_SERVICE_PROPERTY")
+            extensionutils.add_properties(cluster.properties, env_params, "MEMBER_TERMINATED_CLUSTER_PROPERTY")
+            extensionutils.add_properties(terminated_member.properties, env_params, "MEMBER_TERMINATED_MEMBER_PROPERTY")
+
+            extensionutils.execute_member_terminated_extension(env_params)
+
+        else:
+            self.log.debug("Member terminated event is not relevant...skipping agent extension")
+
+    def on_member_suspended_event(self, member_suspended_event):
+        self.log.info("Member suspended event received: [service] " + member_suspended_event.service_name +
+                      " [cluster] " + member_suspended_event.cluster_id + " [member] " + member_suspended_event.member_id)
+
+        topology_consistent = extensionutils.check_topology_consistency(
+            member_suspended_event.service_name,
+            member_suspended_event.cluster_id,
+            member_suspended_event.member_id
+        )
+
+        if not topology_consistent:
+            self.log.error("Topology is inconsistent...failed to execute member suspended event")
+            return
+
+        topology = TopologyContext.get_topology()
+        service = topology.get_service(member_suspended_event.service_name)
+        cluster = service.get_cluster(member_suspended_event.cluster_id)
+        suspended_member = cluster.get_member(member_suspended_event.member_id)
+        lb_cluster_id = cluster.get_member(member_suspended_event.cluster_id).lb_cluster_id
+
+        #check whether suspended member is within the same cluster, LB cluster or service group
+        if extensionutils.is_relevant_member_event(
+                member_suspended_event.service_name,
+                member_suspended_event.cluster_id,
+                lb_cluster_id):
+
+            env_params = {"STRATOS_MEMBER_SUSPENDED_MEMBER_IP": member_suspended_event.member_ip,
+                          "STRATOS_MEMBER_SUSPENDED_MEMBER_ID": member_suspended_event.member_id,
+                          "STRATOS_MEMBER_SUSPENDED_CLUSTER_ID": member_suspended_event.cluster_id,
+                          "STRATOS_MEMBER_SUSPENDED_LB_CLUSTER_ID": lb_cluster_id,
+                          "STRATOS_MEMBER_SUSPENDED_NETWORK_PARTITION_ID": member_suspended_event.network_partition_id,
+                          "STRATOS_MEMBER_SUSPENDED_SERVICE_NAME": member_suspended_event.service_name,
+                          "STRATOS_MEMBER_SUSPENDED_MEMBER_LIST_JSON": cluster.member_list_json,
+                          "STRATOS_TOPOLOGY_JSON": topology.json_str}
+
+            member_ips = extensionutils.get_lb_member_ip(lb_cluster_id)
+            if member_ips is not None and len(member_ips) > 1:
+                env_params["STRATOS_MEMBER_SUSPENDED_LB_IP"] = member_ips[0]
+                env_params["STRATOS_MEMBER_SUSPENDED_LB_PUBLIC_IP"] = member_ips[1]
 
-    def onCompleteTenantEvent(self, completeTenantEvent):
-        pass
+            extensionutils.add_properties(service.properties, env_params, "MEMBER_SUSPENDED_SERVICE_PROPERTY")
+            extensionutils.add_properties(cluster.properties, env_params, "MEMBER_SUSPENDED_CLUSTER_PROPERTY")
+            extensionutils.add_properties(suspended_member.properties, env_params, "MEMBER_SUSPENDED_MEMBER_PROPERTY")
 
-    def onMemberTerminatedEvent(self, memberTerminatedEvent):
-        pass
+            extensionutils.execute_member_suspended_extension(env_params)
+
+        else:
+            self.log.debug("Member suspended event is not relevant...skipping agent extension")
 
-    def onMemberSuspendedEvent(self, memberSuspendedEvent):
-        pass
+    def on_member_started_event(self, member_started_event):
+        self.log.info("Member started event received: [service] " + member_started_event.service_name +
+                      " [cluster] " + member_started_event.cluster_id + " [member] " + member_started_event.member_id)
 
-    def onMemberStartedEvent(self, memberStartedEvent):
-        pass
+        topology_consistent = extensionutils.check_topology_consistency(
+            member_started_event.service_name,
+            member_started_event.cluster_id,
+            member_started_event.member_id
+        )
+
+        if not topology_consistent:
+            self.log.error("Topology is inconsistent...failed to execute member started event")
+            return
+
+        topology = TopologyContext.get_topology()
+        service = topology.get_service(member_started_event.service_name)
+        cluster = service.get_cluster(member_started_event.cluster_id)
+        started_member = cluster.get_member(member_started_event.member_id)
+        lb_cluster_id = cluster.get_member(member_started_event.cluster_id).lb_cluster_id
+
+        #check whether started member is within the same cluster, LB cluster or service group
+        if extensionutils.is_relevant_member_event(
+                member_started_event.service_name,
+                member_started_event.cluster_id,
+                lb_cluster_id):
+
+            env_params = {"STRATOS_MEMBER_STARTED_MEMBER_IP": member_started_event.member_ip,
+                          "STRATOS_MEMBER_STARTED_MEMBER_ID": member_started_event.member_id,
+                          "STRATOS_MEMBER_STARTED_CLUSTER_ID": member_started_event.cluster_id,
+                          "STRATOS_MEMBER_STARTED_LB_CLUSTER_ID": lb_cluster_id,
+                          "STRATOS_MEMBER_STARTED_NETWORK_PARTITION_ID": member_started_event.network_partition_id,
+                          "STRATOS_MEMBER_STARTED_SERVICE_NAME": member_started_event.service_name,
+                          "STRATOS_MEMBER_STARTED_MEMBER_LIST_JSON": cluster.member_list_json,
+                          "STRATOS_TOPOLOGY_JSON": topology.json_str}
+
+            member_ips = extensionutils.get_lb_member_ip(lb_cluster_id)
+            if member_ips is not None and len(member_ips) > 1:
+                env_params["STRATOS_MEMBER_STARTED_LB_IP"] = member_ips[0]
+                env_params["STRATOS_MEMBER_STARTED_LB_PUBLIC_IP"] = member_ips[1]
+
+            extensionutils.add_properties(service.properties, env_params, "MEMBER_STARTED_SERVICE_PROPERTY")
+            extensionutils.add_properties(cluster.properties, env_params, "MEMBER_STARTED_CLUSTER_PROPERTY")
+            extensionutils.add_properties(started_member.properties, env_params, "MEMBER_STARTED_MEMBER_PROPERTY")
+
+            extensionutils.execute_member_started_extension(env_params)
+
+        else:
+            self.log.debug("Member started event is not relevant...skipping agent extension")
 
     def start_server_extension(self):
         #wait until complete topology message is received to get LB IP
@@ -254,20 +410,59 @@ class DefaultExtensionHandler:
     def volume_mount_extension(self, persistence_mappings_payload):
         extensionutils.execute_volume_mount_extension(persistence_mappings_payload)
 
-    def onSubscriptionDomainAddedEvent(self, subscriptionDomainAddedEvent):
-        pass
-
-    def onSubscriptionDomainRemovedEvent(self, subscriptionDomainRemovedEvent):
-        pass
-
-    def onCopyArtifactsExtension(self, src, des):
-        pass
-
-    def onTenantSubscribedEvent(self, tenantSubscribedEvent):
-        pass
+    def on_subscription_domain_added_event(self, subscription_domain_added_event):
+        tenant_domain = self.find_tenant_domain(subscription_domain_added_event.tenant_id)
+        self.log.info(
+            "Subscription domain added event received: [tenant-id] " + subscription_domain_added_event.tenant_id +
+            " [tenant-domain] " + tenant_domain + " [domain-name] " + subscription_domain_added_event.domain_name +
+            " [application-context] " + subscription_domain_added_event.application_context
+        )
+
+        env_params = {"STRATOS_SUBSCRIPTION_SERVICE_NAME": subscription_domain_added_event.service_name,
+                      "STRATOS_SUBSCRIPTION_DOMAIN_NAME": subscription_domain_added_event.domain_name,
+                      "STRATOS_SUBSCRIPTION_TENANT_ID": int(subscription_domain_added_event.tenant_id),
+                      "STRATOS_SUBSCRIPTION_TENANT_DOMAIN": tenant_domain,
+                      "STRATOS_SUBSCRIPTION_APPLICATION_CONTEXT": subscription_domain_added_event.application_context}
+
+        extensionutils.execute_subscription_domain_added_extension(env_params)
+
+    def on_subscription_domain_removed_event(self, subscriptionDomainRemovedEvent):
+        tenant_domain = self.find_tenant_domain(subscriptionDomainRemovedEvent.tenant_id)
+        self.log.info(
+            "Subscription domain removed event received: [tenant-id] " + subscriptionDomainRemovedEvent.tenant_id +
+            " [tenant-domain] " + tenant_domain + " [domain-name] " + subscriptionDomainRemovedEvent.domain_name
+        )
+
+        env_params = {"STRATOS_SUBSCRIPTION_SERVICE_NAME": subscriptionDomainRemovedEvent.service_name,
+                      "STRATOS_SUBSCRIPTION_DOMAIN_NAME": subscriptionDomainRemovedEvent.domain_name,
+                      "STRATOS_SUBSCRIPTION_TENANT_ID": int(subscriptionDomainRemovedEvent.tenant_id),
+                      "STRATOS_SUBSCRIPTION_TENANT_DOMAIN": tenant_domain}
+
+        extensionutils.execute_subscription_domain_removed_extension(env_params)
+
+    def on_copy_artifacts_extension(self, src, des):
+        extensionutils.execute_copy_artifact_extension(src, des)
+
+    def on_tenant_subscribed_event(self, tenant_subscribed_event):
+        self.log.info(
+            "Tenant subscribed event received: [tenant] " + tenant_subscribed_event.tenant_id +
+            " [service] " + tenant_subscribed_event.service_name + " [cluster] " + tenant_subscribed_event.cluster_ids
+        )
+
+        extensionutils.execute_tenant_subscribed_extension({})
+
+    def on_tenant_unsubscribed_event(self, tenant_unsubscribed_event):
+        self.log.info(
+            "Tenant unsubscribed event received: [tenant] " + tenant_unsubscribed_event.tenant_id +
+            " [service] " + tenant_unsubscribed_event.service_name + " [cluster] " + tenant_unsubscribed_event.cluster_ids
+        )
 
-    def onTenantUnSubscribedEvent(self, tenantUnSubscribedEvent):
-        pass
+        try:
+            if CartridgeAgentConfiguration.service_name == tenant_unsubscribed_event.service_name:
+                AgentGitHandler.remove_repo(tenant_unsubscribed_event.tenant_id)
+        except:
+            self.log.exception()
+        extensionutils.execute_tenant_unsubscribed_extension({})
 
     def cleanup(self):
         self.log.info("Executing cleaning up the data in the cartridge instance...")
@@ -517,4 +712,21 @@ class DefaultExtensionHandler:
         if "MIN_COUNT" in member.properties:
             return int(member.properties["MIN_COUNT"])
 
-        return 1
\ No newline at end of file
+        return 1
+
+    def find_tenant_domain(self, tenant_id):
+        tenant = TenantContext.get_tenant(tenant_id)
+        if tenant is None:
+            raise RuntimeError("Tenant could not be found: [tenant-id] %r" % tenant_id)
+
+        return tenant.tenant_domain
+
+    def wait_for_wk_members(self, env_params):
+        min_count = int(CartridgeAgentConfiguration.min_count)
+        is_wk_member_group_ready = False
+        while not is_wk_member_group_ready:
+            self.log.info("Waiting for %r well known members..." % min_count)
+
+            time.sleep(5)
+
+            is_wk_member_group_ready = self.is_wk_member_group_ready(env_params, min_count)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
index 7a84e4d..444f9ac 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
@@ -76,10 +76,7 @@ def wait_until_ports_active(ip_address, ports):
         if duration > ports_check_timeout:
             return
 
-        try:
-            time.sleep(5)
-        except:
-            pass
+        time.sleep(5)
 
     log.info("Ports activated: [ip] %r [ports] %r" % (ip_address, ports))
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
index e29b614..3ca4cc0 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
@@ -64,12 +64,34 @@ def execute_artifacts_updated_extension(env_params):
         log.exception("Could not execute artifacts updated extension")
 
 
-def execute_subscription_domain_added_extension(tenant_id, tenant_domain, domain_name, application_context):
-    raise NotImplementedError
+def execute_subscription_domain_added_extension(env_params):
+    try:
+        log.debug("Executing subscription domain added extension")
+
+        script_name = cartridgeagentconstants.SUBSCRIPTION_DOMAIN_ADDED_SCRIPT
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Subscription domain added script returned: %r" % output)
+    except:
+        log.exception("Could not execute subscription domain added extension")
+
+
+def execute_subscription_domain_removed_extension(env_params):
+    try:
+        log.debug("Executing subscription domain removed extension")
 
+        script_name = cartridgeagentconstants.SUBSCRIPTION_DOMAIN_REMOVED_SCRIPT
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
 
-def execute_subscription_domain_removed_extension(tenant_id, tenant_domain, domain_name):
-    raise NotImplementedError
+        output, errors = execute_command(command, env_params)
+        log.debug("Subscription domain removed script returned: %r" % output)
+    except:
+        log.exception("Could not execute subscription domain removed extension")
 
 
 def execute_start_servers_extension(env_params):
@@ -87,6 +109,110 @@ def execute_start_servers_extension(env_params):
         log.exception("Could not execute start servers extension")
 
 
+def execute_complete_topology_extension(env_params):
+    try:
+        log.debug("Executing complete topology extension")
+
+        script_name = cartridgeagentconstants.COMPLETE_TOPOLOGY_SCRIPT
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Complete topology script returned: %r" % output)
+    except:
+        log.exception("Could not execute complete topology extension")
+
+
+def execute_complete_tenant_extension(env_params):
+    try:
+        log.debug("Executing complete tenant extension")
+
+        script_name = cartridgeagentconstants.COMPLETE_TENANT_SCRIPT
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Complete tenant script returned: %r" % output)
+    except:
+        log.exception("Could not execute complete tenant extension")
+
+
+def execute_tenant_subscribed_extension(env_params):
+    try:
+        log.debug("Executing tenant subscribed extension")
+
+        script_name = cartridgeagentconstants.TENANT_SUBSCRIBED_SCRIPT
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Tenant subscribed script returned: %r" % output)
+    except:
+        log.exception("Could not execute tenant subscribed extension")
+
+
+def execute_tenant_unsubscribed_extension(env_params):
+    try:
+        log.debug("Executing tenant unsubscribed extension")
+
+        script_name = cartridgeagentconstants.TENANT_UNSUBSCRIBED_SCRIPT
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Tenant unsubscribed script returned: %r" % output)
+    except:
+        log.exception("Could not execute tenant unsubscribed extension")
+
+
+def execute_member_terminated_extension(env_params):
+    try:
+        log.debug("Executing member terminated extension")
+
+        script_name = cartridgeagentconstants.MEMBER_TERMINATED_SCRIPT
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Member terminated script returned: %r" % output)
+    except:
+        log.exception("Could not execute member terminated extension")
+
+
+def execute_member_suspended_extension(env_params):
+    try:
+        log.debug("Executing member suspended extension")
+
+        script_name = cartridgeagentconstants.MEMBER_SUSPENDED_SCRIPT
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Member suspended script returned: %r" % output)
+    except:
+        log.exception("Could not execute member suspended extension")
+
+def execute_member_started_extension(env_params):
+    try:
+        log.debug("Executing member started extension")
+
+        script_name = cartridgeagentconstants.MEMBER_STARTED_SCRIPT
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Member started script returned: %r" % output)
+    except:
+        log.exception("Could not execute member started extension")
+
+
 def wait_for_complete_topology():
     while not TopologyContext.topology.initialized:
         log.info("Waiting for complete topology event...")
@@ -221,38 +347,38 @@ def clean_process_parameters(params):
     return params
 
 
-def add_payload_parameters(params):
-    params["STRATOS_APP_PATH"] = CartridgeAgentConfiguration.app_path
-    params["STRATOS_PARAM_FILE_PATH"] = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.PARAM_FILE_PATH)
-    params["STRATOS_SERVICE_NAME"] = CartridgeAgentConfiguration.service_name
-    params["STRATOS_TENANT_ID"] = CartridgeAgentConfiguration.tenant_id
-    params["STRATOS_CARTRIDGE_KEY"] = CartridgeAgentConfiguration.cartridge_key
-    params["STRATOS_LB_CLUSTER_ID"] = CartridgeAgentConfiguration.lb_cluster_id
-    params["STRATOS_CLUSTER_ID"] = CartridgeAgentConfiguration.cluster_id
-    params["STRATOS_NETWORK_PARTITION_ID"] = CartridgeAgentConfiguration.network_partition_id
-    params["STRATOS_PARTITION_ID"] = CartridgeAgentConfiguration.partition_id
-    params["STRATOS_PERSISTENCE_MAPPINGS"] = CartridgeAgentConfiguration.persistence_mappings
-    params["STRATOS_REPO_URL"] = CartridgeAgentConfiguration.repo_url
+def add_payload_parameters(env_params):
+    env_params["STRATOS_APP_PATH"] = CartridgeAgentConfiguration.app_path
+    env_params["STRATOS_PARAM_FILE_PATH"] = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.PARAM_FILE_PATH)
+    env_params["STRATOS_SERVICE_NAME"] = CartridgeAgentConfiguration.service_name
+    env_params["STRATOS_TENANT_ID"] = CartridgeAgentConfiguration.tenant_id
+    env_params["STRATOS_CARTRIDGE_KEY"] = CartridgeAgentConfiguration.cartridge_key
+    env_params["STRATOS_LB_CLUSTER_ID"] = CartridgeAgentConfiguration.lb_cluster_id
+    env_params["STRATOS_CLUSTER_ID"] = CartridgeAgentConfiguration.cluster_id
+    env_params["STRATOS_NETWORK_PARTITION_ID"] = CartridgeAgentConfiguration.network_partition_id
+    env_params["STRATOS_PARTITION_ID"] = CartridgeAgentConfiguration.partition_id
+    env_params["STRATOS_PERSISTENCE_MAPPINGS"] = CartridgeAgentConfiguration.persistence_mappings
+    env_params["STRATOS_REPO_URL"] = CartridgeAgentConfiguration.repo_url
 
     lb_cluster_id_in_payload = CartridgeAgentConfiguration.lb_cluster_id
     member_ips = get_lb_member_ip(lb_cluster_id_in_payload)
     if member_ips is not None:
-        params["STRATOS_LB_IP"] = member_ips[0]
-        params["STRATOS_LB_PUBLIC_IP"] = member_ips[1]
+        env_params["STRATOS_LB_IP"] = member_ips[0]
+        env_params["STRATOS_LB_PUBLIC_IP"] = member_ips[1]
     else:
-        params["STRATOS_LB_IP"] = CartridgeAgentConfiguration.lb_private_ip
-        params["STRATOS_LB_PUBLIC_IP"] = CartridgeAgentConfiguration.lb_public_ip
+        env_params["STRATOS_LB_IP"] = CartridgeAgentConfiguration.lb_private_ip
+        env_params["STRATOS_LB_PUBLIC_IP"] = CartridgeAgentConfiguration.lb_public_ip
 
     topology = TopologyContext.get_topology()
     if topology.initialized:
         service = topology.get_service(CartridgeAgentConfiguration.service_name)
         cluster = service.get_cluster(CartridgeAgentConfiguration.cluster_id)
         member_id_in_payload = CartridgeAgentConfiguration.member_id
-        add_properties(service.properties, params, "SERVICE_PROPERTY")
-        add_properties(cluster.properties, params, "CLUSTER_PROPERTY")
-        add_properties(cluster.get_member(member_id_in_payload).properties, params, "MEMBER_PROPERTY")
+        add_properties(service.properties, env_params, "SERVICE_PROPERTY")
+        add_properties(cluster.properties, env_params, "CLUSTER_PROPERTY")
+        add_properties(cluster.get_member(member_id_in_payload).properties, env_params, "MEMBER_PROPERTY")
 
-    return params
+    return env_params
 
 
 def add_properties(properties, params, prefix):