You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2014/10/11 11:09:46 UTC
[21/50] [abbrv] git commit: Added clustering informaiton handling to
the default extension handler Style changes to the default extension handler
code Added get_service method to topology object
Added clustering informaiton handling to the default extension handler
Style changes to the default extension handler code
Added get_service method to topology object
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/6b8fc6e4
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/6b8fc6e4
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/6b8fc6e4
Branch: refs/heads/master
Commit: 6b8fc6e4c36442bdf76b99a9b42e3ea57d453796
Parents: f535242
Author: Chamila de Alwis <ch...@wso2.com>
Authored: Fri Sep 26 16:19:15 2014 +0530
Committer: Nirmal Fernando <ni...@gmail.com>
Committed: Sat Oct 11 14:38:29 2014 +0530
----------------------------------------------------------------------
.../extensions/defaultextensionhandler.py | 302 +++++++++++++++++--
.../modules/topology/topologycontext.py | 12 +
.../modules/util/extensionutils.py | 6 +-
3 files changed, 288 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/6b8fc6e4/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
index de841ce..9135a8a 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
@@ -1,12 +1,12 @@
import logging
-from .. artifactmgt.git.agentgithandler import AgentGitHandler
-from .. artifactmgt.repositoryinformation import RepositoryInformation
-from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration
-from .. util import extensionutils, cartridgeagentconstants, cartridgeagentutils
-from .. publisher import cartridgeagentpublisher
-from .. exception.parameternotfoundexception import ParameterNotFoundException
-from .. topology.topologycontext import *
+from ..artifactmgt.git.agentgithandler import AgentGitHandler
+from ..artifactmgt.repositoryinformation import RepositoryInformation
+from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration
+from ..util import extensionutils, cartridgeagentconstants, cartridgeagentutils
+from ..publisher import cartridgeagentpublisher
+from ..exception.parameternotfoundexception import ParameterNotFoundException
+from ..topology.topologycontext import *
class DefaultExtensionHandler:
@@ -35,7 +35,8 @@ class DefaultExtensionHandler:
def on_artifact_updated_event(self, artifacts_updated_event):
self.log.info("Artifact update event received: [tenant] %r [cluster] %r [status] %r" %
- (artifacts_updated_event.tenant_id, artifacts_updated_event.cluster_id, artifacts_updated_event.status))
+ (artifacts_updated_event.tenant_id, artifacts_updated_event.cluster_id,
+ artifacts_updated_event.status))
cluster_id_event = str(artifacts_updated_event.cluster_id).strip()
cluster_id_payload = CartridgeAgentConfiguration.cluster_id
@@ -58,10 +59,10 @@ class DefaultExtensionHandler:
repo_info = RepositoryInformation(repo_url, repo_username, repo_password, local_repo_path, tenant_id,
is_multitenant, commit_enabled)
- #checkout code
+ # checkout code
checkout_result = AgentGitHandler.checkout(repo_info)
- #repo_context = checkout_result["repo_context"]
- #execute artifact updated extension
+ # repo_context = checkout_result["repo_context"]
+ # execute artifact updated extension
env_params = {"STRATOS_ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id,
"STRATOS_ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id,
"STRATOS_ARTIFACT_UPDATED_REPO_URL": artifacts_updated_event.repo_url,
@@ -72,7 +73,7 @@ class DefaultExtensionHandler:
extensionutils.execute_artifacts_updated_extension(env_params)
if checkout_result["subscribe_run"]:
- #publish instanceActivated
+ # publish instanceActivated
cartridgeagentpublisher.publish_instance_activated_event()
update_artifacts = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ENABLE_ARTIFACT_UPDATE)
@@ -82,7 +83,8 @@ class DefaultExtensionHandler:
auto_checkout = CartridgeAgentConfiguration.is_checkout_enabled()
try:
- update_interval = len(CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ARTIFACT_UPDATE_INTERVAL))
+ update_interval = len(
+ CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ARTIFACT_UPDATE_INTERVAL))
except ParameterNotFoundException:
self.log.exception("Invalid artifact sync interval specified ")
update_interval = 10
@@ -95,7 +97,8 @@ class DefaultExtensionHandler:
self.log.info("Auto Commit is turned %r " % "on" if auto_commit else "off")
self.log.info("Auto Checkout is turned %r " % "on" if auto_checkout else "off")
- AgentGitHandler.schedule_artifact_update_scheduled_task(repo_info, auto_checkout, auto_commit, update_interval)
+ AgentGitHandler.schedule_artifact_update_scheduled_task(repo_info, auto_checkout, auto_commit,
+ update_interval)
def on_artifact_update_scheduler_event(self, tenant_id):
env_params = {}
@@ -111,20 +114,24 @@ class DefaultExtensionHandler:
def on_member_activated_event(self, member_activated_event):
self.log.info("Member activated event received: [service] %r [cluster] %r [member] %r"
- % (member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id))
+ % (
+ member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id))
- consistant = extensionutils.check_topology_consistency(member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id)
+ consistant = extensionutils.check_topology_consistency(member_activated_event.service_name,
+ member_activated_event.cluster_id,
+ member_activated_event.member_id)
if not consistant:
self.log.error("Topology is inconsistent...failed to execute member activated event")
return
topology = TopologyContext.get_topology()
- service = topology.service_map[member_activated_event.service_name]
+ service = topology.get_service(member_activated_event.service_name)
cluster = service.get_cluster(member_activated_event.cluster_id)
member = cluster.get_member(member_activated_event.member_id)
lb_cluster_id = member.lb_cluster_id
- if extensionutils.is_relevant_member_event(member_activated_event.service_name, member_activated_event.cluster_id, lb_cluster_id)
+ if extensionutils.is_relevant_member_event(member_activated_event.service_name,
+ member_activated_event.cluster_id, lb_cluster_id):
env_params = {}
env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_IP"] = member_activated_event.member_ip
env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_ID"] = member_activated_event.member_id
@@ -144,7 +151,7 @@ class DefaultExtensionHandler:
member_list_json = ""
for member in members:
member_list_json += member.json_str + ","
- env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_LIST_JSON"] = member_list_json[:-1] # removing last comma
+ env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_LIST_JSON"] = member_list_json[:-1] # removing last comma
member_ips = extensionutils.get_lb_member_ip(lb_cluster_id)
if member_ips is not None and len(member_ips) > 1:
@@ -153,13 +160,14 @@ class DefaultExtensionHandler:
env_params["STRATOS_TOPOLOGY_JSON"] = topology.json_str
- extensionutils.add_properties(service.properties, env_params, "MEMBER_ACTIVATED_SERVICE_PROPERTY")
- extensionutils.add_properties(cluster.properties, env_params, "MEMBER_ACTIVATED_CLUSTER_PROPERTY")
- extensionutils.add_properties(member.properties, env_params, "MEMBER_ACTIVATED_MEMBER_PROPERTY")
+ extensionutils.add_properties(service.properties, env_params, "MEMBER_ACTIVATED_SERVICE_PROPERTY")
+ extensionutils.add_properties(cluster.properties, env_params, "MEMBER_ACTIVATED_CLUSTER_PROPERTY")
+ extensionutils.add_properties(member.properties, env_params, "MEMBER_ACTIVATED_MEMBER_PROPERTY")
clustered = CartridgeAgentConfiguration.is_clustered
- if member.properties is not None and member.properties[cartridgeagentconstants.CLUSTERING_PRIMARY_KEY] == "true" and clustered is not None and clustered:
+ if member.properties is not None and member.properties[
+ cartridgeagentconstants.CLUSTERING_PRIMARY_KEY] == "true" and clustered is not None and clustered:
self.log.debug(" If WK member is re-spawned, update axis2.xml ")
has_wk_ip_changed = True
@@ -211,15 +219,15 @@ class DefaultExtensionHandler:
# member_id_in_payload = CartridgeAgentConfiguration.member_id()
#
# try:
- # consistant = extensionutils.check_topology_consistency(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
+ # consistant = extensionutils.check_topology_consistency(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
#
- # if not consistant:
- # self.log.error("Topology is inconsistent...failed to execute start server event")
- # return
+ # if not consistant:
+ # self.log.error("Topology is inconsistent...failed to execute start server event")
+ # return
#
#
# except:
- # self.log.exception("Error processing start servers event")
+ # self.log.exception("Error processing start servers event")
# finally:
# pass
@@ -253,4 +261,240 @@ class DefaultExtensionHandler:
cartridgeagentpublisher.publish_instance_ready_to_shutdown_event()
def is_wk_member_group_ready(self, env_params, min_count):
- raise NotImplementedError
\ No newline at end of file
+ topology = TopologyContext.get_topology()
+ if topology is None or not topology.initialized:
+ return False
+
+ service_group_in_payload = CartridgeAgentConfiguration.service_group
+ if service_group_in_payload is not None:
+ env_params["STRATOS_SERVICE_GROUP"] = service_group_in_payload
+
+ # clustering logic for apimanager
+ if service_group_in_payload is not None and service_group_in_payload == "apim":
+ # handle apistore and publisher case
+ if CartridgeAgentConfiguration.service_name == "apistore" or \
+ CartridgeAgentConfiguration.service_name == "publisher":
+
+ apistore_cluster_collection = topology.get_service("apistore").get_clusters()
+ publisher_cluster_collection = topology.get_service("publisher").get_clusters()
+
+ apistore_member_list = []
+ for member in apistore_cluster_collection[0].get_members():
+ if member.status == MemberStatus.Starting or member.status == MemberStatus.Activated:
+ apistore_member_list.append(member)
+ self.wk_members.append(member)
+
+ if len(apistore_member_list) == 0:
+ self.log.debug("API Store members not yet created")
+ return False
+
+ apistore_member = apistore_member_list[0]
+ env_params["STRATOS_WK_APISTORE_MEMBER_IP"] = apistore_member.member_ip
+ self.log.debug("STRATOS_WK_APISTORE_MEMBER_IP: %r" % apistore_member.member_ip)
+
+ publisher_member_list = []
+ for member in publisher_cluster_collection[0].get_members():
+ if member.status == MemberStatus.Starting or member.status == MemberStatus.Activated:
+ publisher_member_list.append(member)
+ self.wk_members.append(member)
+
+ if len(publisher_member_list) == 0:
+ self.log.debug("API Publisher members not yet created")
+
+ publisher_member = publisher_member_list[0]
+ env_params["STRATOS_WK_PUBLISHER_MEMBER_IP"] = publisher_member.member_ip
+ self.log.debug("STRATOS_WK_PUBLISHER_MEMBER_IP: %r" % publisher_member.member_ip)
+
+ return True
+
+ elif CartridgeAgentConfiguration.service_name == "gatewaymgt" or \
+ CartridgeAgentConfiguration.service_name == "gateway":
+
+ if CartridgeAgentConfiguration.deployment is not None:
+ # check if deployment is Manager Worker separated
+ if CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower() or \
+ CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower():
+
+ self.log.info("Deployment pattern for the node: %r" % CartridgeAgentConfiguration.deployment)
+ env_params["DEPLOYMENT"] = CartridgeAgentConfiguration.deployment
+ # check if WKA members of Manager Worker separated deployment is ready
+ return self.is_manager_worker_WKA_group_ready(env_params)
+
+ elif CartridgeAgentConfiguration.service_name == "keymanager":
+ return True
+
+ else:
+ if CartridgeAgentConfiguration.deployment is not None:
+ # check if deployment is Manager Worker separated
+ if CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower() or \
+ CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower():
+
+ self.log.info("Deployment pattern for the node: %r" % CartridgeAgentConfiguration.deployment)
+ env_params["DEPLOYMENT"] = CartridgeAgentConfiguration.deployment
+ # check if WKA members of Manager Worker separated deployment is ready
+ return self.is_manager_worker_WKA_group_ready(env_params)
+
+ service_name_in_payload = CartridgeAgentConfiguration.service_name
+ cluster_id_in_payload = CartridgeAgentConfiguration.cluster_id
+ service = topology.get_service(service_name_in_payload)
+ cluster = service.get_cluster(cluster_id_in_payload)
+
+ wk_members = []
+ for member in cluster.get_members():
+ if member.properties is not None and \
+ "PRIMARY" in member.properties and member.properties["PRIMARY"].lower() == "true" and \
+ (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated):
+
+ wk_members.append(member)
+ self.wk_members.append(member)
+ self.log.debug("Found WKA: STRATOS_WK_MEMBER_IP: " + member.member_ip)
+
+ if len(wk_members) >= min_count:
+ idx = 0
+ for member in wk_members:
+ env_params["STRATOS_WK_MEMBER_" + idx + "_IP"] = member.member_ip
+ self.log.debug("STRATOS_WK_MEMBER_" + idx + "_IP:" + member.member_ip)
+
+ idx += 1
+
+ return True
+
+ return False
+
+ # generic worker manager separated clustering logic
+ def is_manager_worker_WKA_group_ready(self, env_params):
+
+ # for this, we need both manager cluster service name and worker cluster service name
+ manager_service_name = CartridgeAgentConfiguration.manager_service_name
+ worker_service_name = CartridgeAgentConfiguration.worker_service_name
+
+ # managerServiceName and workerServiceName both should not be null /empty
+ if manager_service_name is None or manager_service_name.strip() == "":
+ self.log.error("Manager service name [ " + manager_service_name + " ] is invalid")
+ return False
+
+ if worker_service_name is None or worker_service_name.strip() == "":
+ self.log.error("Worker service name [ " + worker_service_name + " ] is invalid")
+ return False
+
+ min_manager_instances_available = False
+ min_worker_instances_available = False
+
+ topology = TopologyContext.get_topology()
+ manager_service = topology.get_service(manager_service_name)
+ worker_service = topology.get_service(worker_service_name)
+
+ if manager_service is None:
+ self.log.warn("Service [ " + manager_service_name + " ] is not found")
+ return False
+
+ if worker_service is None:
+ self.log.warn("Service [ " + worker_service_name + " ] is not found")
+ return False
+
+ # manager clusters
+ manager_clusters = manager_service.get_clusters()
+ if manager_clusters is None or len(manager_clusters) == 0:
+ self.log.warn("No clusters found for service [ " + manager_service_name + " ]")
+ return False
+
+ manager_min_instance_count = 1
+ manager_min_instance_count_found = False
+
+ manager_wka_members = []
+ for member in manager_clusters[0].get_members():
+ if member.properties is not None and \
+ "PRIMARY" in member.properties and member.properties["PRIMARY"].lower() == "true" and \
+ (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated):
+
+ manager_wka_members.append(member)
+ self.wk_members.append(member)
+
+ # get the min instance count
+ if not manager_min_instance_count_found:
+ manager_min_instance_count = self.get_min_instance_count_from_member(member)
+ manager_min_instance_count_found = True
+ self.log.info("Manager min instance count: " + manager_min_instance_count)
+
+ if len(manager_wka_members) >= manager_min_instance_count:
+ min_manager_instances_available = True
+ idx = 0
+ for member in manager_wka_members:
+ env_params["STRATOS_WK_MANAGER_MEMBER_" + idx + "_IP"] = member.member_ip
+ self.log.debug("STRATOS_WK_MANAGER_MEMBER_" + idx + "_IP: " + member.member_ip)
+ idx += 1
+
+ env_params["STRATOS_WK_MANAGER_MEMBER_COUNT"] = int(manager_min_instance_count)
+
+ # If all the manager members are non primary and is greate than or equal to mincount,
+ # minManagerInstancesAvailable should be true
+ all_managers_non_primary = True
+ for member in manager_clusters[0].get_members():
+ # get the min instance count
+ if not manager_min_instance_count_found:
+ manager_min_instance_count = self.get_min_instance_count_from_member(member)
+ manager_min_instance_count_found = True
+ self.log.info(
+ "Manager min instance count when allManagersNonPrimary true : " + manager_min_instance_count)
+
+ if member.properties is not None and "PRIMARY" in member.properties and \
+ member.properties["PRIMARY"].lower() == "true":
+ all_managers_non_primary = False
+ break
+
+ self.log.debug(
+ " allManagerNonPrimary & managerMinInstanceCount [" + all_managers_non_primary +
+ "], [" + manager_min_instance_count + "] ")
+
+ if all_managers_non_primary and len(manager_clusters) >= manager_min_instance_count:
+ min_manager_instances_available = True
+
+ # worker cluster
+ worker_clusters = worker_service.get_clusters()
+ if worker_clusters is None or len(worker_clusters) == 0:
+ self.log.warn("No clusters found for service [ " + worker_service_name + " ]")
+ return False
+
+ worker_min_instance_count = 1
+ worker_min_instance_count_found = False
+
+ worker_wka_members = []
+ for member in worker_clusters[0].get_members():
+ self.log.debug("Checking member : " + member.member_id)
+
+ if member.properties is not None and "PRIMARY" in member.properties and \
+ member.properties["PRIMARY"].lower() == "true" and \
+ (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated):
+
+ self.log.debug("Added worker member " + member.member_id)
+
+ worker_wka_members.append(member)
+ self.wk_members.append(member)
+
+ # get the min instance count
+ if not worker_min_instance_count_found:
+ worker_min_instance_count = self.get_min_instance_count_from_member(member)
+ worker_min_instance_count_found = True
+ self.log.debug("Worker min instance count: " + worker_min_instance_count)
+
+ if len(worker_wka_members) >= worker_min_instance_count:
+ min_worker_instances_available = True
+ idx = 0
+ for member in worker_wka_members:
+ env_params["STRATOS_WK_WORKER_MEMBER_" + idx + "_IP"] = member.member_ip
+ self.log.debug("STRATOS_WK_WORKER_MEMBER_" + idx + "_IP: " + member.member_ip)
+ idx += 1
+
+ env_params["STRATOS_WK_WORKER_MEMBER_COUNT"] = int(worker_min_instance_count)
+
+ self.log.debug(
+ " Returnning values minManagerInstancesAvailable && minWorkerInstancesAvailable [" +
+ min_manager_instances_available + "], [" + min_worker_instances_available + "] ")
+
+ return min_manager_instances_available and min_worker_instances_available
+
+ def get_min_instance_count_from_member(self, member):
+ if "MIN_COUNT" in member.properties:
+ return int(member.properties["MIN_COUNT"])
+
+ return 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/6b8fc6e4/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py b/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
index 3ce4379..7fee50d 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
@@ -31,6 +31,18 @@ class Topology:
def get_services(self):
return self.service_map.values()
+ def get_service(self, service_name):
+ """
+
+ :param str service_name: service name to be retrieved
+ :return: Service object of the service
+ :rtype: Service
+ """
+ if service_name in self.service_map:
+ return self.service_map[service_name]
+
+ return None
+
def add_service(self, service):
self.service_map[service.service_name] = service
http://git-wip-us.apache.org/repos/asf/stratos/blob/6b8fc6e4/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
index d1305b9..267c608 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
@@ -69,7 +69,7 @@ def wait_for_complete_topology():
def check_topology_consistency(service_name, cluster_id, member_id):
topology = TopologyContext.get_topology()
- service = topology.service_map[service_name]
+ service = topology.get_service(service_name)
if service is None:
log.error("Service not found in topology [service] %r" % service_name)
return False
@@ -104,7 +104,7 @@ def is_relevant_member_event(service_name, cluster_id, lb_cluster_id):
service_group_in_payload = CartridgeAgentConfiguration.service_group
if service_group_in_payload is not None:
- service_properties = topology.service_map[service_name].properties
+ service_properties = topology.get_service(service_name).properties
if service_properties is None:
return False
@@ -211,7 +211,7 @@ def add_payload_parameters(params):
topology = TopologyContext.get_topology()
if topology.initialized:
- service = topology.service_map[CartridgeAgentConfiguration.service_name]
+ service = topology.get_service(CartridgeAgentConfiguration.service_name)
cluster = service.get_cluster(CartridgeAgentConfiguration.cluster_id)
member_id_in_payload = CartridgeAgentConfiguration.member_id
add_properties(service.properties, params, "SERVICE_PROPERTY")