You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2014/10/11 11:09:49 UTC
[24/50] [abbrv] git commit: Completed DefaultExtensionHandler and
extension util paths Fixed minor bugs Implemented event handler methods
Completed DefaultExtensionHandler and extension util paths
Fixed minor bugs
Implemented event handler methods
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/71f827d0
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/71f827d0
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/71f827d0
Branch: refs/heads/master
Commit: 71f827d0f22bd6e9178e84fde5328ca2f6ef48be
Parents: ccd6ab5
Author: Chamila de Alwis <ch...@wso2.com>
Authored: Fri Sep 26 20:07:04 2014 +0530
Committer: Nirmal Fernando <ni...@gmail.com>
Committed: Sat Oct 11 14:38:29 2014 +0530
----------------------------------------------------------------------
.../cartridge-agent/agent.py | 61 ++--
.../modules/artifactmgt/git/agentgithandler.py | 31 +-
.../modules/artifactmgt/git/gitrepository.py | 5 +-
.../modules/event/tenant/events.py | 8 +-
.../extensions/defaultextensionhandler.py | 302 ++++++++++++++++---
.../modules/util/cartridgeagentutils.py | 5 +-
.../modules/util/extensionutils.py | 174 +++++++++--
7 files changed, 481 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/agent.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/agent.py b/tools/python-cartridge-agent/cartridge-agent/agent.py
index 60e9805..8ac6803 100644
--- a/tools/python-cartridge-agent/cartridge-agent/agent.py
+++ b/tools/python-cartridge-agent/cartridge-agent/agent.py
@@ -63,8 +63,6 @@ class CartridgeAgent(threading.Thread):
self.extension_handler.on_instance_activated_event()
cartridgeagentpublisher.publish_instance_activated_event()
- else:
- pass
persistence_mappping_payload = CartridgeAgentConfiguration.persistence_mappings
if persistence_mappping_payload is not None:
@@ -139,13 +137,28 @@ class CartridgeAgent(threading.Thread):
self.log.info("Cartridge Agent topology receiver thread started")
def on_member_activated(self, msg):
- raise NotImplementedError
+ self.log.debug("Member activated event received: %r" % msg.payload)
+ event_obj = MemberActivatedEvent.create_from_json(msg.payload)
+ try:
+ self.extension_handler.on_member_activated_event(event_obj)
+ except:
+ self.log.exception("Error processing member terminated event")
def on_member_terminated(self, msg):
- raise NotImplementedError
+ self.log.debug("Member terminated event received: %r" % msg.payload)
+ event_obj = MemberTerminatedEvent.create_from_json(msg.payload)
+ try:
+ self.extension_handler.on_member_terminated_event(event_obj)
+ except:
+ self.log.exception("Error processing member terminated event")
def on_member_suspended(self, msg):
- raise NotImplementedError
+ self.log.debug("Member suspended event received: %r" % msg.payload)
+ event_obj = MemberSuspendedEvent.create_from_json(msg.payload)
+ try:
+ self.extension_handler.on_member_suspended_event(event_obj)
+ except:
+ self.log.exception("Error processing member suspended event")
def on_complete_topology(self, msg):
if not self.__topology_context_initialized:
@@ -154,14 +167,19 @@ class CartridgeAgent(threading.Thread):
TopologyContext.update(event_obj.topology)
self.__topology_context_initialized = True
try:
- self.extension_handler.onCompleteTopologyEvent(event_obj)
+ self.extension_handler.on_complete_topology_event(event_obj)
except:
self.log.exception("Error processing complete topology event")
else:
self.log.info("Complete topology event updating task disabled")
def on_member_started(self, msg):
- raise NotImplementedError
+ self.log.debug("Member started event received: %r" % msg.payload)
+ event_obj = MemberStartedEvent.create_from_json(msg.payload)
+ try:
+ self.extension_handler.on_member_started_event(event_obj)
+ except:
+ self.log.exception("Error processing member started event")
def register_tenant_event_listeners(self):
self.log.debug("Starting tenant event message receiver thread")
@@ -175,31 +193,20 @@ class CartridgeAgent(threading.Thread):
self.log.info("Tenant event message receiver thread started")
def on_subscription_domain_added(self, msg):
- self.log.debug("Subscription domain added event received")
+ self.log.debug("Subscription domain added event received : %r" % msg.payload)
event_obj = SubscriptionDomainAddedEvent.create_from_json(msg.payload)
try:
- self.extension_handler.onSubscriptionDomainAddedEvent(event_obj)
+ self.extension_handler.on_subscription_domain_added_event(event_obj)
except:
self.log.exception("Error processing subscription domains added event")
- # extensionutils.execute_subscription_domain_added_extension(
- # event_obj.tenant_id,
- # self.find_tenant_domain(event_obj.tenant_id),
- # event_obj.domain_name,
- # event_obj.application_context
- # )
def on_subscription_domain_removed(self, msg):
- self.log.debug("Subscription domain removed event received")
+ self.log.debug("Subscription domain removed event received : %r" % msg.payload)
event_obj = SubscriptionDomainRemovedEvent.create_from_json(msg.payload)
try:
- self.extension_handler.onSubscriptionDomainRemovedEvent(event_obj)
+ self.extension_handler.on_subscription_domain_removed_event(event_obj)
except:
self.log.exception("Error processing subscription domains removed event")
- # extensionutils.execute_subscription_domain_removed_extension(
- # event_obj.tenant_id,
- # self.find_tenant_domain(event_obj.tenant_id),
- # event_obj.domain_name
- # )
def on_complete_tenant(self, msg):
if not self.__tenant_context_initialized:
@@ -208,7 +215,7 @@ class CartridgeAgent(threading.Thread):
TenantContext.update(event_obj.tenants)
try:
- self.extension_handler.onCompleteTenantEvent(event_obj)
+ self.extension_handler.on_complete_tenant_event(event_obj)
self.__tenant_context_initialized = True
except:
self.log.exception("Error processing complete tenant event")
@@ -216,18 +223,18 @@ class CartridgeAgent(threading.Thread):
self.log.info("Complete tenant event updating task disabled")
def on_tenant_subscribed(self, msg):
- self.log.debug("Tenant subscribed event received")
+ self.log.debug("Tenant subscribed event received: %r" % msg.payload)
event_obj = TenantSubscribedEvent.create_from_json(msg.payload)
try:
- self.extension_handler.onTenantSubscribedEvent(event_obj)
+ self.extension_handler.on_tenant_subscribed_event(event_obj)
except:
self.log.exception("Error processing tenant subscribed event")
def on_tenant_unsubscribed(self, msg):
- self.log.debug("Tenant unSubscribed event received")
+ self.log.debug("Tenant unSubscribed event received: %r" % msg.payload)
event_obj = TenantUnsubscribedEvent.create_from_json(msg.payload)
try:
- self.extension_handler.onTenantUnSubscribedEvent(event_obj)
+ self.extension_handler.on_tenant_unsubscribed_event(event_obj)
except:
self.log.exception("Error processing tenant unSubscribed event")
http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py b/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py
index eda0f92..6671c48 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py
@@ -5,7 +5,7 @@ from git import *
from gitrepository import GitRepository
from ... config.cartridgeagentconfiguration import CartridgeAgentConfiguration
-from ... util import cartridgeagentutils
+from ... util import cartridgeagentutils, extensionutils, cartridgeagentconstants
from ... util.asyncscheduledtask import AsyncScheduledTask
from ... artifactmgt.repositoryinformation import RepositoryInformation
from ... extensions.defaultextensionhandler import DefaultExtensionHandler
@@ -184,6 +184,12 @@ class AgentGitHandler:
@staticmethod
def get_repo_context(tenant_id):
+ """
+
+ :param int tenant_id:
+ :return: GitRepository object
+ :rtype: GitRepository
+ """
if tenant_id in AgentGitHandler.__git_repositories:
return AgentGitHandler.__git_repositories[tenant_id]
@@ -290,6 +296,29 @@ class AgentGitHandler:
else:
AgentGitHandler.log.info("Artifact Synchronization Task for path %r already scheduled" % repo_context.local_repo_path)
+ @staticmethod
+ def remove_repo(tenant_id):
+ repo_context = AgentGitHandler.get_repo_context(tenant_id)
+
+ #stop artifact update task
+ repo_context.scheduled_update_task.terminate()
+
+ #remove git contents
+ cartridgeagentutils.delete_folder_tree(repo_context.local_repo_path)
+
+ AgentGitHandler.remove_repo_context(tenant_id)
+
+ if tenant_id == -1234:
+ if CartridgeAgentConfiguration.is_multitenant:
+ extensionutils.execute_copy_artifact_extension(
+ cartridgeagentconstants.SUPERTENANT_TEMP_PATH,
+ CartridgeAgentConfiguration.app_path + "/repository/deployment/server/"
+ )
+
+ AgentGitHandler.log.info("git repository deleted for tenant %r" % repo_context.tenant_id)
+
+ return True
+
class ArtifactUpdateTask(Thread):
http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/gitrepository.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/gitrepository.py b/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/gitrepository.py
index 0e60e2f..92929b5 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/gitrepository.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/gitrepository.py
@@ -1,3 +1,5 @@
+from ...util.asyncscheduledtask import AsyncScheduledTask
+
class GitRepository:
def __init__(self):
@@ -11,4 +13,5 @@ class GitRepository:
self.repo_password = None
self.is_multitenant = False
self.commit_enabled = False
- self.scheduled_update_task = None
\ No newline at end of file
+ self.scheduled_update_task = None
+ """:type : AsyncScheduledTask """
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/event/tenant/events.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/event/tenant/events.py b/tools/python-cartridge-agent/cartridge-agent/modules/event/tenant/events.py
index 55a21a2..ce27f66 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/event/tenant/events.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/event/tenant/events.py
@@ -50,6 +50,7 @@ class CompleteTenantEvent:
def __init__(self):
self.tenants = []
+ self.tenant_list_json = None
@staticmethod
def create_from_json(json_str):
@@ -57,9 +58,10 @@ class CompleteTenantEvent:
instance = CompleteTenantEvent()
instance.tenants = []
- temp_tenants = json_obj["tenants"] if "tenants" in json_obj else None
- if temp_tenants is not None:
- for tenant_str in temp_tenants:
+ tenants_str = json_obj["tenants"] if "tenants" in json_obj else None
+ instance.tenant_list_json = tenants_str
+ if tenants_str is not None:
+ for tenant_str in tenants_str:
tenant_obj = Tenant(int(tenant_str["tenantId"]), tenant_str["tenantDomain"])
for service_name in tenant_str["serviceNameSubscriptionMap"]:
sub_str = tenant_str["serviceNameSubscriptionMap"][service_name]
http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
index b9b1b13..0b6e523 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
@@ -1,12 +1,14 @@
import logging
+import time
from ..artifactmgt.git.agentgithandler import AgentGitHandler
from ..artifactmgt.repositoryinformation import RepositoryInformation
from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration
-from ..util import extensionutils, cartridgeagentconstants, cartridgeagentutils
+from ..util import extensionutils
from ..publisher import cartridgeagentpublisher
from ..exception.parameternotfoundexception import ParameterNotFoundException
from ..topology.topologycontext import *
+from ..tenant.tenantcontext import *
class DefaultExtensionHandler:
@@ -101,9 +103,8 @@ class DefaultExtensionHandler:
update_interval)
def on_artifact_update_scheduler_event(self, tenant_id):
- env_params = {}
- env_params["STRATOS_ARTIFACT_UPDATED_TENANT_ID"] = tenant_id
- env_params["STRATOS_ARTIFACT_UPDATED_SCHEDULER"] = True
+ env_params = {"STRATOS_ARTIFACT_UPDATED_TENANT_ID": tenant_id, "STRATOS_ARTIFACT_UPDATED_SCHEDULER": True}
+
extensionutils.execute_artifacts_updated_extension(env_params)
def on_instance_cleanup_cluster_event(self, instanceCleanupClusterEvent):
@@ -114,13 +115,12 @@ class DefaultExtensionHandler:
def on_member_activated_event(self, member_activated_event):
self.log.info("Member activated event received: [service] %r [cluster] %r [member] %r"
- % (
- member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id))
+ % (member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id))
- consistant = extensionutils.check_topology_consistency(member_activated_event.service_name,
+ topology_consistent = extensionutils.check_topology_consistency(member_activated_event.service_name,
member_activated_event.cluster_id,
member_activated_event.member_id)
- if not consistant:
+ if not topology_consistent:
self.log.error("Topology is inconsistent...failed to execute member activated event")
return
@@ -132,13 +132,13 @@ class DefaultExtensionHandler:
if extensionutils.is_relevant_member_event(member_activated_event.service_name,
member_activated_event.cluster_id, lb_cluster_id):
- env_params = {}
- env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_IP"] = member_activated_event.member_ip
- env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_ID"] = member_activated_event.member_id
- env_params["STRATOS_MEMBER_ACTIVATED_CLUSTER_ID"] = member_activated_event.cluster_id
- env_params["STRATOS_MEMBER_ACTIVATED_LB_CLUSTER_ID"] = lb_cluster_id
- env_params["STRATOS_MEMBER_ACTIVATED_NETWORK_PARTITION_ID"] = member_activated_event.network_partition_id
- env_params["STRATOS_MEMBER_ACTIVATED_SERVICE_NAME"] = member_activated_event.service_name
+
+ env_params = {"STRATOS_MEMBER_ACTIVATED_MEMBER_IP": member_activated_event.member_ip,
+ "STRATOS_MEMBER_ACTIVATED_MEMBER_ID": member_activated_event.member_id,
+ "STRATOS_MEMBER_ACTIVATED_CLUSTER_ID": member_activated_event.cluster_id,
+ "STRATOS_MEMBER_ACTIVATED_LB_CLUSTER_ID": lb_cluster_id,
+ "STRATOS_MEMBER_ACTIVATED_NETWORK_PARTITION_ID": member_activated_event.network_partition_id,
+ "STRATOS_MEMBER_ACTIVATED_SERVICE_NAME": member_activated_event.service_name}
ports = member_activated_event.port_map.values()
ports_str = ""
@@ -147,11 +147,7 @@ class DefaultExtensionHandler:
env_params["STRATOS_MEMBER_ACTIVATED_PORTS"] = ports_str
- members = cluster.get_members()
- member_list_json = ""
- for member in members:
- member_list_json += member.json_str + ","
- env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_LIST_JSON"] = member_list_json[:-1] # removing last comma
+ env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_LIST_JSON"] = cluster.member_list_json
member_ips = extensionutils.get_lb_member_ip(lb_cluster_id)
if member_ips is not None and len(member_ips) > 1:
@@ -167,7 +163,8 @@ class DefaultExtensionHandler:
clustered = CartridgeAgentConfiguration.is_clustered
if member.properties is not None and member.properties[
- cartridgeagentconstants.CLUSTERING_PRIMARY_KEY] == "true" and clustered is not None and clustered:
+ cartridgeagentconstants.CLUSTERING_PRIMARY_KEY] == "true" and clustered is not None and clustered:
+
self.log.debug(" If WK member is re-spawned, update axis2.xml ")
has_wk_ip_changed = True
@@ -194,20 +191,179 @@ class DefaultExtensionHandler:
else:
self.log.debug("Member activated event is not relevant...skipping agent extension")
- def onCompleteTopologyEvent(self, completeTopologyEvent):
- pass
+ def on_complete_topology_event(self, complete_topology_event):
+ self.log.debug("Complete topology event received")
+
+ service_name_in_payload = CartridgeAgentConfiguration.service_name
+ cluster_id_in_payload = CartridgeAgentConfiguration.cluster_id
+ member_id_in_payload = CartridgeAgentConfiguration.member_id
+
+ extensionutils.check_topology_consistency(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
+
+ topology = complete_topology_event.get_topology()
+ service = topology.get_service(service_name_in_payload)
+ cluster = service.get_cluster(cluster_id_in_payload)
+
+ env_params = {"STRATOS_TOPOLOGY_JSON": topology.json_str, "STRATOS_MEMBER_LIST_JSON": cluster.member_list_json}
+
+ extensionutils.execute_complete_topology_extension(env_params)
+
+ def on_complete_tenant_event(self, complete_tenant_event):
+ self.log.debug("Complete tenant event received")
+
+ tenant_list_json = complete_tenant_event.tenant_list_json
+ self.log.debug("Complete tenants:" + tenant_list_json)
+
+ env_params = {"STRATOS_TENANT_LIST_JSON": tenant_list_json}
+
+ extensionutils.execute_complete_tenant_extension(env_params)
+
+ def on_member_terminated_event(self, member_terminated_event):
+ self.log.info("Member terminated event received: [service] " + member_terminated_event.service_name +
+ " [cluster] " + member_terminated_event.cluster_id + " [member] " + member_terminated_event.member_id)
+
+ topology_consistent = extensionutils.check_topology_consistency(
+ member_terminated_event.service_name,
+ member_terminated_event.cluster_id,
+ member_terminated_event.member_id
+ )
+
+ if not topology_consistent:
+ self.log.error("Topology is inconsistent...failed to execute member terminated event")
+ return
+
+ topology = TopologyContext.get_topology()
+ service = topology.get_service(member_terminated_event.service_name)
+ cluster = service.get_cluster(member_terminated_event.cluster_id)
+ terminated_member = cluster.get_member(member_terminated_event.member_id)
+ lb_cluster_id = cluster.get_member(member_terminated_event.cluster_id).lb_cluster_id
+
+ #check whether terminated member is within the same cluster, LB cluster or service group
+ if extensionutils.is_relevant_member_event(
+ member_terminated_event.service_name,
+ member_terminated_event.cluster_id,
+ lb_cluster_id):
+
+ env_params = {"STRATOS_MEMBER_TERMINATED_MEMBER_IP": terminated_member.member_ip,
+ "STRATOS_MEMBER_TERMINATED_MEMBER_ID": member_terminated_event.member_id,
+ "STRATOS_MEMBER_TERMINATED_CLUSTER_ID": member_terminated_event.cluster_id,
+ "STRATOS_MEMBER_TERMINATED_LB_CLUSTER_ID": lb_cluster_id,
+ "STRATOS_MEMBER_TERMINATED_NETWORK_PARTITION_ID": member_terminated_event.network_partition_id,
+ "STRATOS_MEMBER_TERMINATED_SERVICE_NAME": member_terminated_event.service_name,
+ "STRATOS_MEMBER_TERMINATED_MEMBER_LIST_JSON": cluster.member_list_json,
+ "STRATOS_TOPOLOGY_JSON": topology.json_str}
+
+ member_ips = extensionutils.get_lb_member_ip(lb_cluster_id)
+ if member_ips is not None and len(member_ips) > 1:
+ env_params["STRATOS_MEMBER_TERMINATED_LB_IP"] = member_ips[0]
+ env_params["STRATOS_MEMBER_TERMINATED_LB_PUBLIC_IP"] = member_ips[1]
+
+ extensionutils.add_properties(service.properties, env_params, "MEMBER_TERMINATED_SERVICE_PROPERTY")
+ extensionutils.add_properties(cluster.properties, env_params, "MEMBER_TERMINATED_CLUSTER_PROPERTY")
+ extensionutils.add_properties(terminated_member.properties, env_params, "MEMBER_TERMINATED_MEMBER_PROPERTY")
+
+ extensionutils.execute_member_terminated_extension(env_params)
+
+ else:
+ self.log.debug("Member terminated event is not relevant...skipping agent extension")
+
+ def on_member_suspended_event(self, member_suspended_event):
+ self.log.info("Member suspended event received: [service] " + member_suspended_event.service_name +
+ " [cluster] " + member_suspended_event.cluster_id + " [member] " + member_suspended_event.member_id)
+
+ topology_consistent = extensionutils.check_topology_consistency(
+ member_suspended_event.service_name,
+ member_suspended_event.cluster_id,
+ member_suspended_event.member_id
+ )
+
+ if not topology_consistent:
+ self.log.error("Topology is inconsistent...failed to execute member suspended event")
+ return
+
+ topology = TopologyContext.get_topology()
+ service = topology.get_service(member_suspended_event.service_name)
+ cluster = service.get_cluster(member_suspended_event.cluster_id)
+ suspended_member = cluster.get_member(member_suspended_event.member_id)
+ lb_cluster_id = cluster.get_member(member_suspended_event.cluster_id).lb_cluster_id
+
+ #check whether suspended member is within the same cluster, LB cluster or service group
+ if extensionutils.is_relevant_member_event(
+ member_suspended_event.service_name,
+ member_suspended_event.cluster_id,
+ lb_cluster_id):
+
+ env_params = {"STRATOS_MEMBER_SUSPENDED_MEMBER_IP": member_suspended_event.member_ip,
+ "STRATOS_MEMBER_SUSPENDED_MEMBER_ID": member_suspended_event.member_id,
+ "STRATOS_MEMBER_SUSPENDED_CLUSTER_ID": member_suspended_event.cluster_id,
+ "STRATOS_MEMBER_SUSPENDED_LB_CLUSTER_ID": lb_cluster_id,
+ "STRATOS_MEMBER_SUSPENDED_NETWORK_PARTITION_ID": member_suspended_event.network_partition_id,
+ "STRATOS_MEMBER_SUSPENDED_SERVICE_NAME": member_suspended_event.service_name,
+ "STRATOS_MEMBER_SUSPENDED_MEMBER_LIST_JSON": cluster.member_list_json,
+ "STRATOS_TOPOLOGY_JSON": topology.json_str}
+
+ member_ips = extensionutils.get_lb_member_ip(lb_cluster_id)
+ if member_ips is not None and len(member_ips) > 1:
+ env_params["STRATOS_MEMBER_SUSPENDED_LB_IP"] = member_ips[0]
+ env_params["STRATOS_MEMBER_SUSPENDED_LB_PUBLIC_IP"] = member_ips[1]
- def onCompleteTenantEvent(self, completeTenantEvent):
- pass
+ extensionutils.add_properties(service.properties, env_params, "MEMBER_SUSPENDED_SERVICE_PROPERTY")
+ extensionutils.add_properties(cluster.properties, env_params, "MEMBER_SUSPENDED_CLUSTER_PROPERTY")
+ extensionutils.add_properties(suspended_member.properties, env_params, "MEMBER_SUSPENDED_MEMBER_PROPERTY")
- def onMemberTerminatedEvent(self, memberTerminatedEvent):
- pass
+ extensionutils.execute_member_suspended_extension(env_params)
+
+ else:
+ self.log.debug("Member suspended event is not relevant...skipping agent extension")
- def onMemberSuspendedEvent(self, memberSuspendedEvent):
- pass
+ def on_member_started_event(self, member_started_event):
+ self.log.info("Member started event received: [service] " + member_started_event.service_name +
+ " [cluster] " + member_started_event.cluster_id + " [member] " + member_started_event.member_id)
- def onMemberStartedEvent(self, memberStartedEvent):
- pass
+ topology_consistent = extensionutils.check_topology_consistency(
+ member_started_event.service_name,
+ member_started_event.cluster_id,
+ member_started_event.member_id
+ )
+
+ if not topology_consistent:
+ self.log.error("Topology is inconsistent...failed to execute member started event")
+ return
+
+ topology = TopologyContext.get_topology()
+ service = topology.get_service(member_started_event.service_name)
+ cluster = service.get_cluster(member_started_event.cluster_id)
+ started_member = cluster.get_member(member_started_event.member_id)
+ lb_cluster_id = cluster.get_member(member_started_event.cluster_id).lb_cluster_id
+
+ #check whether started member is within the same cluster, LB cluster or service group
+ if extensionutils.is_relevant_member_event(
+ member_started_event.service_name,
+ member_started_event.cluster_id,
+ lb_cluster_id):
+
+ env_params = {"STRATOS_MEMBER_STARTED_MEMBER_IP": member_started_event.member_ip,
+ "STRATOS_MEMBER_STARTED_MEMBER_ID": member_started_event.member_id,
+ "STRATOS_MEMBER_STARTED_CLUSTER_ID": member_started_event.cluster_id,
+ "STRATOS_MEMBER_STARTED_LB_CLUSTER_ID": lb_cluster_id,
+ "STRATOS_MEMBER_STARTED_NETWORK_PARTITION_ID": member_started_event.network_partition_id,
+ "STRATOS_MEMBER_STARTED_SERVICE_NAME": member_started_event.service_name,
+ "STRATOS_MEMBER_STARTED_MEMBER_LIST_JSON": cluster.member_list_json,
+ "STRATOS_TOPOLOGY_JSON": topology.json_str}
+
+ member_ips = extensionutils.get_lb_member_ip(lb_cluster_id)
+ if member_ips is not None and len(member_ips) > 1:
+ env_params["STRATOS_MEMBER_STARTED_LB_IP"] = member_ips[0]
+ env_params["STRATOS_MEMBER_STARTED_LB_PUBLIC_IP"] = member_ips[1]
+
+ extensionutils.add_properties(service.properties, env_params, "MEMBER_STARTED_SERVICE_PROPERTY")
+ extensionutils.add_properties(cluster.properties, env_params, "MEMBER_STARTED_CLUSTER_PROPERTY")
+ extensionutils.add_properties(started_member.properties, env_params, "MEMBER_STARTED_MEMBER_PROPERTY")
+
+ extensionutils.execute_member_started_extension(env_params)
+
+ else:
+ self.log.debug("Member started event is not relevant...skipping agent extension")
def start_server_extension(self):
#wait until complete topology message is received to get LB IP
@@ -254,20 +410,59 @@ class DefaultExtensionHandler:
def volume_mount_extension(self, persistence_mappings_payload):
extensionutils.execute_volume_mount_extension(persistence_mappings_payload)
- def onSubscriptionDomainAddedEvent(self, subscriptionDomainAddedEvent):
- pass
-
- def onSubscriptionDomainRemovedEvent(self, subscriptionDomainRemovedEvent):
- pass
-
- def onCopyArtifactsExtension(self, src, des):
- pass
-
- def onTenantSubscribedEvent(self, tenantSubscribedEvent):
- pass
+ def on_subscription_domain_added_event(self, subscription_domain_added_event):
+ tenant_domain = self.find_tenant_domain(subscription_domain_added_event.tenant_id)
+ self.log.info(
+ "Subscription domain added event received: [tenant-id] " + subscription_domain_added_event.tenant_id +
+ " [tenant-domain] " + tenant_domain + " [domain-name] " + subscription_domain_added_event.domain_name +
+ " [application-context] " + subscription_domain_added_event.application_context
+ )
+
+ env_params = {"STRATOS_SUBSCRIPTION_SERVICE_NAME": subscription_domain_added_event.service_name,
+ "STRATOS_SUBSCRIPTION_DOMAIN_NAME": subscription_domain_added_event.domain_name,
+ "STRATOS_SUBSCRIPTION_TENANT_ID": int(subscription_domain_added_event.tenant_id),
+ "STRATOS_SUBSCRIPTION_TENANT_DOMAIN": tenant_domain,
+ "STRATOS_SUBSCRIPTION_APPLICATION_CONTEXT": subscription_domain_added_event.application_context}
+
+ extensionutils.execute_subscription_domain_added_extension(env_params)
+
+ def on_subscription_domain_removed_event(self, subscriptionDomainRemovedEvent):
+ tenant_domain = self.find_tenant_domain(subscriptionDomainRemovedEvent.tenant_id)
+ self.log.info(
+ "Subscription domain removed event received: [tenant-id] " + subscriptionDomainRemovedEvent.tenant_id +
+ " [tenant-domain] " + tenant_domain + " [domain-name] " + subscriptionDomainRemovedEvent.domain_name
+ )
+
+ env_params = {"STRATOS_SUBSCRIPTION_SERVICE_NAME": subscriptionDomainRemovedEvent.service_name,
+ "STRATOS_SUBSCRIPTION_DOMAIN_NAME": subscriptionDomainRemovedEvent.domain_name,
+ "STRATOS_SUBSCRIPTION_TENANT_ID": int(subscriptionDomainRemovedEvent.tenant_id),
+ "STRATOS_SUBSCRIPTION_TENANT_DOMAIN": tenant_domain}
+
+ extensionutils.execute_subscription_domain_removed_extension(env_params)
+
+ def on_copy_artifacts_extension(self, src, des):
+ extensionutils.execute_copy_artifact_extension(src, des)
+
+ def on_tenant_subscribed_event(self, tenant_subscribed_event):
+ self.log.info(
+ "Tenant subscribed event received: [tenant] " + tenant_subscribed_event.tenant_id +
+ " [service] " + tenant_subscribed_event.service_name + " [cluster] " + tenant_subscribed_event.cluster_ids
+ )
+
+ extensionutils.execute_tenant_subscribed_extension({})
+
+ def on_tenant_unsubscribed_event(self, tenant_unsubscribed_event):
+ self.log.info(
+ "Tenant unsubscribed event received: [tenant] " + tenant_unsubscribed_event.tenant_id +
+ " [service] " + tenant_unsubscribed_event.service_name + " [cluster] " + tenant_unsubscribed_event.cluster_ids
+ )
- def onTenantUnSubscribedEvent(self, tenantUnSubscribedEvent):
- pass
+ try:
+ if CartridgeAgentConfiguration.service_name == tenant_unsubscribed_event.service_name:
+ AgentGitHandler.remove_repo(tenant_unsubscribed_event.tenant_id)
+ except:
+ self.log.exception()
+ extensionutils.execute_tenant_unsubscribed_extension({})
def cleanup(self):
self.log.info("Executing cleaning up the data in the cartridge instance...")
@@ -517,4 +712,21 @@ class DefaultExtensionHandler:
if "MIN_COUNT" in member.properties:
return int(member.properties["MIN_COUNT"])
- return 1
\ No newline at end of file
+ return 1
+
+ def find_tenant_domain(self, tenant_id):
+ tenant = TenantContext.get_tenant(tenant_id)
+ if tenant is None:
+ raise RuntimeError("Tenant could not be found: [tenant-id] %r" % tenant_id)
+
+ return tenant.tenant_domain
+
+ def wait_for_wk_members(self, env_params):
+ min_count = int(CartridgeAgentConfiguration.min_count)
+ is_wk_member_group_ready = False
+ while not is_wk_member_group_ready:
+ self.log.info("Waiting for %r well known members..." % min_count)
+
+ time.sleep(5)
+
+ is_wk_member_group_ready = self.is_wk_member_group_ready(env_params, min_count)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
index 7a84e4d..444f9ac 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
@@ -76,10 +76,7 @@ def wait_until_ports_active(ip_address, ports):
if duration > ports_check_timeout:
return
- try:
- time.sleep(5)
- except:
- pass
+ time.sleep(5)
log.info("Ports activated: [ip] %r [ports] %r" % (ip_address, ports))
http://git-wip-us.apache.org/repos/asf/stratos/blob/71f827d0/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
index e29b614..3ca4cc0 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
@@ -64,12 +64,34 @@ def execute_artifacts_updated_extension(env_params):
log.exception("Could not execute artifacts updated extension")
-def execute_subscription_domain_added_extension(tenant_id, tenant_domain, domain_name, application_context):
- raise NotImplementedError
+def execute_subscription_domain_added_extension(env_params):
+ try:
+ log.debug("Executing subscription domain added extension")
+
+ script_name = cartridgeagentconstants.SUBSCRIPTION_DOMAIN_ADDED_SCRIPT
+ command = prepare_command(script_name)
+ env_params = add_payload_parameters(env_params)
+ env_params = clean_process_parameters(env_params)
+
+ output, errors = execute_command(command, env_params)
+ log.debug("Subscription domain added script returned: %r" % output)
+ except:
+ log.exception("Could not execute subscription domain added extension")
+
+
+def execute_subscription_domain_removed_extension(env_params):
+ try:
+ log.debug("Executing subscription domain removed extension")
+ script_name = cartridgeagentconstants.SUBSCRIPTION_DOMAIN_REMOVED_SCRIPT
+ command = prepare_command(script_name)
+ env_params = add_payload_parameters(env_params)
+ env_params = clean_process_parameters(env_params)
-def execute_subscription_domain_removed_extension(tenant_id, tenant_domain, domain_name):
- raise NotImplementedError
+ output, errors = execute_command(command, env_params)
+ log.debug("Subscription domain removed script returned: %r" % output)
+ except:
+ log.exception("Could not execute subscription domain removed extension")
def execute_start_servers_extension(env_params):
@@ -87,6 +109,110 @@ def execute_start_servers_extension(env_params):
log.exception("Could not execute start servers extension")
+def execute_complete_topology_extension(env_params):
+ try:
+ log.debug("Executing complete topology extension")
+
+ script_name = cartridgeagentconstants.COMPLETE_TOPOLOGY_SCRIPT
+ command = prepare_command(script_name)
+ env_params = add_payload_parameters(env_params)
+ env_params = clean_process_parameters(env_params)
+
+ output, errors = execute_command(command, env_params)
+ log.debug("Complete topology script returned: %r" % output)
+ except:
+ log.exception("Could not execute complete topology extension")
+
+
+def execute_complete_tenant_extension(env_params):
+ try:
+ log.debug("Executing complete tenant extension")
+
+ script_name = cartridgeagentconstants.COMPLETE_TENANT_SCRIPT
+ command = prepare_command(script_name)
+ env_params = add_payload_parameters(env_params)
+ env_params = clean_process_parameters(env_params)
+
+ output, errors = execute_command(command, env_params)
+ log.debug("Complete tenant script returned: %r" % output)
+ except:
+ log.exception("Could not execute complete tenant extension")
+
+
+def execute_tenant_subscribed_extension(env_params):
+ try:
+ log.debug("Executing tenant subscribed extension")
+
+ script_name = cartridgeagentconstants.TENANT_SUBSCRIBED_SCRIPT
+ command = prepare_command(script_name)
+ env_params = add_payload_parameters(env_params)
+ env_params = clean_process_parameters(env_params)
+
+ output, errors = execute_command(command, env_params)
+ log.debug("Tenant subscribed script returned: %r" % output)
+ except:
+ log.exception("Could not execute tenant subscribed extension")
+
+
+def execute_tenant_unsubscribed_extension(env_params):
+ try:
+ log.debug("Executing tenant unsubscribed extension")
+
+ script_name = cartridgeagentconstants.TENANT_UNSUBSCRIBED_SCRIPT
+ command = prepare_command(script_name)
+ env_params = add_payload_parameters(env_params)
+ env_params = clean_process_parameters(env_params)
+
+ output, errors = execute_command(command, env_params)
+ log.debug("Tenant unsubscribed script returned: %r" % output)
+ except:
+ log.exception("Could not execute tenant unsubscribed extension")
+
+
+def execute_member_terminated_extension(env_params):
+ try:
+ log.debug("Executing member terminated extension")
+
+ script_name = cartridgeagentconstants.MEMBER_TERMINATED_SCRIPT
+ command = prepare_command(script_name)
+ env_params = add_payload_parameters(env_params)
+ env_params = clean_process_parameters(env_params)
+
+ output, errors = execute_command(command, env_params)
+ log.debug("Member terminated script returned: %r" % output)
+ except:
+ log.exception("Could not execute member terminated extension")
+
+
+def execute_member_suspended_extension(env_params):
+ try:
+ log.debug("Executing member suspended extension")
+
+ script_name = cartridgeagentconstants.MEMBER_SUSPENDED_SCRIPT
+ command = prepare_command(script_name)
+ env_params = add_payload_parameters(env_params)
+ env_params = clean_process_parameters(env_params)
+
+ output, errors = execute_command(command, env_params)
+ log.debug("Member suspended script returned: %r" % output)
+ except:
+ log.exception("Could not execute member suspended extension")
+
+def execute_member_started_extension(env_params):
+ try:
+ log.debug("Executing member started extension")
+
+ script_name = cartridgeagentconstants.MEMBER_STARTED_SCRIPT
+ command = prepare_command(script_name)
+ env_params = add_payload_parameters(env_params)
+ env_params = clean_process_parameters(env_params)
+
+ output, errors = execute_command(command, env_params)
+ log.debug("Member started script returned: %r" % output)
+ except:
+ log.exception("Could not execute member started extension")
+
+
def wait_for_complete_topology():
while not TopologyContext.topology.initialized:
log.info("Waiting for complete topology event...")
@@ -221,38 +347,38 @@ def clean_process_parameters(params):
return params
-def add_payload_parameters(params):
- params["STRATOS_APP_PATH"] = CartridgeAgentConfiguration.app_path
- params["STRATOS_PARAM_FILE_PATH"] = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.PARAM_FILE_PATH)
- params["STRATOS_SERVICE_NAME"] = CartridgeAgentConfiguration.service_name
- params["STRATOS_TENANT_ID"] = CartridgeAgentConfiguration.tenant_id
- params["STRATOS_CARTRIDGE_KEY"] = CartridgeAgentConfiguration.cartridge_key
- params["STRATOS_LB_CLUSTER_ID"] = CartridgeAgentConfiguration.lb_cluster_id
- params["STRATOS_CLUSTER_ID"] = CartridgeAgentConfiguration.cluster_id
- params["STRATOS_NETWORK_PARTITION_ID"] = CartridgeAgentConfiguration.network_partition_id
- params["STRATOS_PARTITION_ID"] = CartridgeAgentConfiguration.partition_id
- params["STRATOS_PERSISTENCE_MAPPINGS"] = CartridgeAgentConfiguration.persistence_mappings
- params["STRATOS_REPO_URL"] = CartridgeAgentConfiguration.repo_url
+def add_payload_parameters(env_params):
+ env_params["STRATOS_APP_PATH"] = CartridgeAgentConfiguration.app_path
+ env_params["STRATOS_PARAM_FILE_PATH"] = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.PARAM_FILE_PATH)
+ env_params["STRATOS_SERVICE_NAME"] = CartridgeAgentConfiguration.service_name
+ env_params["STRATOS_TENANT_ID"] = CartridgeAgentConfiguration.tenant_id
+ env_params["STRATOS_CARTRIDGE_KEY"] = CartridgeAgentConfiguration.cartridge_key
+ env_params["STRATOS_LB_CLUSTER_ID"] = CartridgeAgentConfiguration.lb_cluster_id
+ env_params["STRATOS_CLUSTER_ID"] = CartridgeAgentConfiguration.cluster_id
+ env_params["STRATOS_NETWORK_PARTITION_ID"] = CartridgeAgentConfiguration.network_partition_id
+ env_params["STRATOS_PARTITION_ID"] = CartridgeAgentConfiguration.partition_id
+ env_params["STRATOS_PERSISTENCE_MAPPINGS"] = CartridgeAgentConfiguration.persistence_mappings
+ env_params["STRATOS_REPO_URL"] = CartridgeAgentConfiguration.repo_url
lb_cluster_id_in_payload = CartridgeAgentConfiguration.lb_cluster_id
member_ips = get_lb_member_ip(lb_cluster_id_in_payload)
if member_ips is not None:
- params["STRATOS_LB_IP"] = member_ips[0]
- params["STRATOS_LB_PUBLIC_IP"] = member_ips[1]
+ env_params["STRATOS_LB_IP"] = member_ips[0]
+ env_params["STRATOS_LB_PUBLIC_IP"] = member_ips[1]
else:
- params["STRATOS_LB_IP"] = CartridgeAgentConfiguration.lb_private_ip
- params["STRATOS_LB_PUBLIC_IP"] = CartridgeAgentConfiguration.lb_public_ip
+ env_params["STRATOS_LB_IP"] = CartridgeAgentConfiguration.lb_private_ip
+ env_params["STRATOS_LB_PUBLIC_IP"] = CartridgeAgentConfiguration.lb_public_ip
topology = TopologyContext.get_topology()
if topology.initialized:
service = topology.get_service(CartridgeAgentConfiguration.service_name)
cluster = service.get_cluster(CartridgeAgentConfiguration.cluster_id)
member_id_in_payload = CartridgeAgentConfiguration.member_id
- add_properties(service.properties, params, "SERVICE_PROPERTY")
- add_properties(cluster.properties, params, "CLUSTER_PROPERTY")
- add_properties(cluster.get_member(member_id_in_payload).properties, params, "MEMBER_PROPERTY")
+ add_properties(service.properties, env_params, "SERVICE_PROPERTY")
+ add_properties(cluster.properties, env_params, "CLUSTER_PROPERTY")
+ add_properties(cluster.get_member(member_id_in_payload).properties, env_params, "MEMBER_PROPERTY")
- return params
+ return env_params
def add_properties(properties, params, prefix):