You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2014/10/11 11:09:46 UTC

[21/50] [abbrv] git commit: Added clustering informaiton handling to the default extension handler Style changes to the default extension handler code Added get_service method to topology object

Added clustering informaiton handling to the default extension handler
Style changes to the default extension handler code
Added get_service method to topology object


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/6b8fc6e4
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/6b8fc6e4
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/6b8fc6e4

Branch: refs/heads/master
Commit: 6b8fc6e4c36442bdf76b99a9b42e3ea57d453796
Parents: f535242
Author: Chamila de Alwis <ch...@wso2.com>
Authored: Fri Sep 26 16:19:15 2014 +0530
Committer: Nirmal Fernando <ni...@gmail.com>
Committed: Sat Oct 11 14:38:29 2014 +0530

----------------------------------------------------------------------
 .../extensions/defaultextensionhandler.py       | 302 +++++++++++++++++--
 .../modules/topology/topologycontext.py         |  12 +
 .../modules/util/extensionutils.py              |   6 +-
 3 files changed, 288 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/6b8fc6e4/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
index de841ce..9135a8a 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py
@@ -1,12 +1,12 @@
 import logging
 
-from .. artifactmgt.git.agentgithandler import AgentGitHandler
-from .. artifactmgt.repositoryinformation import RepositoryInformation
-from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration
-from .. util import extensionutils, cartridgeagentconstants, cartridgeagentutils
-from .. publisher import cartridgeagentpublisher
-from .. exception.parameternotfoundexception import ParameterNotFoundException
-from .. topology.topologycontext import *
+from ..artifactmgt.git.agentgithandler import AgentGitHandler
+from ..artifactmgt.repositoryinformation import RepositoryInformation
+from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration
+from ..util import extensionutils, cartridgeagentconstants, cartridgeagentutils
+from ..publisher import cartridgeagentpublisher
+from ..exception.parameternotfoundexception import ParameterNotFoundException
+from ..topology.topologycontext import *
 
 
 class DefaultExtensionHandler:
@@ -35,7 +35,8 @@ class DefaultExtensionHandler:
 
     def on_artifact_updated_event(self, artifacts_updated_event):
         self.log.info("Artifact update event received: [tenant] %r [cluster] %r [status] %r" %
-                      (artifacts_updated_event.tenant_id, artifacts_updated_event.cluster_id, artifacts_updated_event.status))
+                      (artifacts_updated_event.tenant_id, artifacts_updated_event.cluster_id,
+                       artifacts_updated_event.status))
 
         cluster_id_event = str(artifacts_updated_event.cluster_id).strip()
         cluster_id_payload = CartridgeAgentConfiguration.cluster_id
@@ -58,10 +59,10 @@ class DefaultExtensionHandler:
             repo_info = RepositoryInformation(repo_url, repo_username, repo_password, local_repo_path, tenant_id,
                                               is_multitenant, commit_enabled)
 
-            #checkout code
+            # checkout code
             checkout_result = AgentGitHandler.checkout(repo_info)
-            #repo_context = checkout_result["repo_context"]
-            #execute artifact updated extension
+            # repo_context = checkout_result["repo_context"]
+            # execute artifact updated extension
             env_params = {"STRATOS_ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id,
                           "STRATOS_ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id,
                           "STRATOS_ARTIFACT_UPDATED_REPO_URL": artifacts_updated_event.repo_url,
@@ -72,7 +73,7 @@ class DefaultExtensionHandler:
             extensionutils.execute_artifacts_updated_extension(env_params)
 
             if checkout_result["subscribe_run"]:
-                #publish instanceActivated
+                # publish instanceActivated
                 cartridgeagentpublisher.publish_instance_activated_event()
 
             update_artifacts = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ENABLE_ARTIFACT_UPDATE)
@@ -82,7 +83,8 @@ class DefaultExtensionHandler:
                 auto_checkout = CartridgeAgentConfiguration.is_checkout_enabled()
 
                 try:
-                    update_interval = len(CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ARTIFACT_UPDATE_INTERVAL))
+                    update_interval = len(
+                        CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ARTIFACT_UPDATE_INTERVAL))
                 except ParameterNotFoundException:
                     self.log.exception("Invalid artifact sync interval specified ")
                     update_interval = 10
@@ -95,7 +97,8 @@ class DefaultExtensionHandler:
                 self.log.info("Auto Commit is turned %r " % "on" if auto_commit else "off")
                 self.log.info("Auto Checkout is turned %r " % "on" if auto_checkout else "off")
 
-                AgentGitHandler.schedule_artifact_update_scheduled_task(repo_info, auto_checkout, auto_commit, update_interval)
+                AgentGitHandler.schedule_artifact_update_scheduled_task(repo_info, auto_checkout, auto_commit,
+                                                                        update_interval)
 
     def on_artifact_update_scheduler_event(self, tenant_id):
         env_params = {}
@@ -111,20 +114,24 @@ class DefaultExtensionHandler:
 
     def on_member_activated_event(self, member_activated_event):
         self.log.info("Member activated event received: [service] %r [cluster] %r [member] %r"
-                      % (member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id))
+                      % (
+            member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id))
 
-        consistant = extensionutils.check_topology_consistency(member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id)
+        consistant = extensionutils.check_topology_consistency(member_activated_event.service_name,
+                                                               member_activated_event.cluster_id,
+                                                               member_activated_event.member_id)
         if not consistant:
             self.log.error("Topology is inconsistent...failed to execute member activated event")
             return
 
         topology = TopologyContext.get_topology()
-        service = topology.service_map[member_activated_event.service_name]
+        service = topology.get_service(member_activated_event.service_name)
         cluster = service.get_cluster(member_activated_event.cluster_id)
         member = cluster.get_member(member_activated_event.member_id)
         lb_cluster_id = member.lb_cluster_id
 
-        if extensionutils.is_relevant_member_event(member_activated_event.service_name, member_activated_event.cluster_id, lb_cluster_id)
+        if extensionutils.is_relevant_member_event(member_activated_event.service_name,
+                                                   member_activated_event.cluster_id, lb_cluster_id):
             env_params = {}
             env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_IP"] = member_activated_event.member_ip
             env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_ID"] = member_activated_event.member_id
@@ -144,7 +151,7 @@ class DefaultExtensionHandler:
             member_list_json = ""
             for member in members:
                 member_list_json += member.json_str + ","
-            env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_LIST_JSON"] = member_list_json[:-1] # removing last comma
+            env_params["STRATOS_MEMBER_ACTIVATED_MEMBER_LIST_JSON"] = member_list_json[:-1]  # removing last comma
 
             member_ips = extensionutils.get_lb_member_ip(lb_cluster_id)
             if member_ips is not None and len(member_ips) > 1:
@@ -153,13 +160,14 @@ class DefaultExtensionHandler:
 
             env_params["STRATOS_TOPOLOGY_JSON"] = topology.json_str
 
-            extensionutils.add_properties(service.properties, env_params,  "MEMBER_ACTIVATED_SERVICE_PROPERTY")
-            extensionutils.add_properties(cluster.properties, env_params,  "MEMBER_ACTIVATED_CLUSTER_PROPERTY")
-            extensionutils.add_properties(member.properties, env_params,  "MEMBER_ACTIVATED_MEMBER_PROPERTY")
+            extensionutils.add_properties(service.properties, env_params, "MEMBER_ACTIVATED_SERVICE_PROPERTY")
+            extensionutils.add_properties(cluster.properties, env_params, "MEMBER_ACTIVATED_CLUSTER_PROPERTY")
+            extensionutils.add_properties(member.properties, env_params, "MEMBER_ACTIVATED_MEMBER_PROPERTY")
 
             clustered = CartridgeAgentConfiguration.is_clustered
 
-            if member.properties is not None and member.properties[cartridgeagentconstants.CLUSTERING_PRIMARY_KEY] == "true" and clustered is not None and clustered:
+            if member.properties is not None and member.properties[
+                cartridgeagentconstants.CLUSTERING_PRIMARY_KEY] == "true" and clustered is not None and clustered:
                 self.log.debug(" If WK member is re-spawned, update axis2.xml ")
 
                 has_wk_ip_changed = True
@@ -211,15 +219,15 @@ class DefaultExtensionHandler:
         # member_id_in_payload = CartridgeAgentConfiguration.member_id()
         #
         # try:
-        #     consistant = extensionutils.check_topology_consistency(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
+        # consistant = extensionutils.check_topology_consistency(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
         #
-        #     if not consistant:
-        #         self.log.error("Topology is inconsistent...failed to execute start server event")
-        #         return
+        # if not consistant:
+        # self.log.error("Topology is inconsistent...failed to execute start server event")
+        # return
         #
         #
         # except:
-        #     self.log.exception("Error processing start servers event")
+        # self.log.exception("Error processing start servers event")
         # finally:
         #     pass
 
@@ -253,4 +261,240 @@ class DefaultExtensionHandler:
         cartridgeagentpublisher.publish_instance_ready_to_shutdown_event()
 
     def is_wk_member_group_ready(self, env_params, min_count):
-        raise NotImplementedError
\ No newline at end of file
+        topology = TopologyContext.get_topology()
+        if topology is None or not topology.initialized:
+            return False
+
+        service_group_in_payload = CartridgeAgentConfiguration.service_group
+        if service_group_in_payload is not None:
+            env_params["STRATOS_SERVICE_GROUP"] = service_group_in_payload
+
+        # clustering logic for apimanager
+        if service_group_in_payload is not None and service_group_in_payload == "apim":
+            # handle apistore and publisher case
+            if CartridgeAgentConfiguration.service_name == "apistore" or \
+                    CartridgeAgentConfiguration.service_name == "publisher":
+
+                apistore_cluster_collection = topology.get_service("apistore").get_clusters()
+                publisher_cluster_collection = topology.get_service("publisher").get_clusters()
+
+                apistore_member_list = []
+                for member in apistore_cluster_collection[0].get_members():
+                    if member.status == MemberStatus.Starting or member.status == MemberStatus.Activated:
+                        apistore_member_list.append(member)
+                        self.wk_members.append(member)
+
+                if len(apistore_member_list) == 0:
+                    self.log.debug("API Store members not yet created")
+                    return False
+
+                apistore_member = apistore_member_list[0]
+                env_params["STRATOS_WK_APISTORE_MEMBER_IP"] = apistore_member.member_ip
+                self.log.debug("STRATOS_WK_APISTORE_MEMBER_IP: %r" % apistore_member.member_ip)
+
+                publisher_member_list = []
+                for member in publisher_cluster_collection[0].get_members():
+                    if member.status == MemberStatus.Starting or member.status == MemberStatus.Activated:
+                        publisher_member_list.append(member)
+                        self.wk_members.append(member)
+
+                if len(publisher_member_list) == 0:
+                    self.log.debug("API Publisher members not yet created")
+
+                publisher_member = publisher_member_list[0]
+                env_params["STRATOS_WK_PUBLISHER_MEMBER_IP"] = publisher_member.member_ip
+                self.log.debug("STRATOS_WK_PUBLISHER_MEMBER_IP: %r" % publisher_member.member_ip)
+
+                return True
+
+            elif CartridgeAgentConfiguration.service_name == "gatewaymgt" or \
+                    CartridgeAgentConfiguration.service_name == "gateway":
+
+                if CartridgeAgentConfiguration.deployment is not None:
+                    # check if deployment is Manager Worker separated
+                    if CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower() or \
+                            CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower():
+
+                        self.log.info("Deployment pattern for the node: %r" % CartridgeAgentConfiguration.deployment)
+                        env_params["DEPLOYMENT"] = CartridgeAgentConfiguration.deployment
+                        # check if WKA members of Manager Worker separated deployment is ready
+                        return self.is_manager_worker_WKA_group_ready(env_params)
+
+            elif CartridgeAgentConfiguration.service_name == "keymanager":
+                return True
+
+        else:
+            if CartridgeAgentConfiguration.deployment is not None:
+                # check if deployment is Manager Worker separated
+                if CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower() or \
+                        CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower():
+
+                    self.log.info("Deployment pattern for the node: %r" % CartridgeAgentConfiguration.deployment)
+                    env_params["DEPLOYMENT"] = CartridgeAgentConfiguration.deployment
+                    # check if WKA members of Manager Worker separated deployment is ready
+                    return self.is_manager_worker_WKA_group_ready(env_params)
+
+            service_name_in_payload = CartridgeAgentConfiguration.service_name
+            cluster_id_in_payload = CartridgeAgentConfiguration.cluster_id
+            service = topology.get_service(service_name_in_payload)
+            cluster = service.get_cluster(cluster_id_in_payload)
+
+            wk_members = []
+            for member in cluster.get_members():
+                if member.properties is not None and \
+                        "PRIMARY" in member.properties and member.properties["PRIMARY"].lower() == "true" and \
+                        (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated):
+
+                    wk_members.append(member)
+                    self.wk_members.append(member)
+                    self.log.debug("Found WKA: STRATOS_WK_MEMBER_IP: " + member.member_ip)
+
+            if len(wk_members) >= min_count:
+                idx = 0
+                for member in wk_members:
+                    env_params["STRATOS_WK_MEMBER_" + idx + "_IP"] = member.member_ip
+                    self.log.debug("STRATOS_WK_MEMBER_" + idx + "_IP:" + member.member_ip)
+
+                    idx += 1
+
+                return True
+
+        return False
+
+    # generic worker manager separated clustering logic
+    def is_manager_worker_WKA_group_ready(self, env_params):
+
+        # for this, we need both manager cluster service name and worker cluster service name
+        manager_service_name = CartridgeAgentConfiguration.manager_service_name
+        worker_service_name = CartridgeAgentConfiguration.worker_service_name
+
+        # managerServiceName and workerServiceName both should not be null /empty
+        if manager_service_name is None or manager_service_name.strip() == "":
+            self.log.error("Manager service name [ " + manager_service_name + " ] is invalid")
+            return False
+
+        if worker_service_name is None or worker_service_name.strip() == "":
+            self.log.error("Worker service name [ " + worker_service_name + " ] is invalid")
+            return False
+
+        min_manager_instances_available = False
+        min_worker_instances_available = False
+
+        topology = TopologyContext.get_topology()
+        manager_service = topology.get_service(manager_service_name)
+        worker_service = topology.get_service(worker_service_name)
+
+        if manager_service is None:
+            self.log.warn("Service [ " + manager_service_name + " ] is not found")
+            return False
+
+        if worker_service is None:
+            self.log.warn("Service [ " + worker_service_name + " ] is not found")
+            return False
+
+        # manager clusters
+        manager_clusters = manager_service.get_clusters()
+        if manager_clusters is None or len(manager_clusters) == 0:
+            self.log.warn("No clusters found for service [ " + manager_service_name + " ]")
+            return False
+
+        manager_min_instance_count = 1
+        manager_min_instance_count_found = False
+
+        manager_wka_members = []
+        for member in manager_clusters[0].get_members():
+            if member.properties is not None and \
+                    "PRIMARY" in member.properties and member.properties["PRIMARY"].lower() == "true" and \
+                    (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated):
+
+                manager_wka_members.append(member)
+                self.wk_members.append(member)
+
+                # get the min instance count
+                if not manager_min_instance_count_found:
+                    manager_min_instance_count = self.get_min_instance_count_from_member(member)
+                    manager_min_instance_count_found = True
+                    self.log.info("Manager min instance count: " + manager_min_instance_count)
+
+        if len(manager_wka_members) >= manager_min_instance_count:
+            min_manager_instances_available = True
+            idx = 0
+            for member in manager_wka_members:
+                env_params["STRATOS_WK_MANAGER_MEMBER_" + idx + "_IP"] = member.member_ip
+                self.log.debug("STRATOS_WK_MANAGER_MEMBER_" + idx + "_IP: " + member.member_ip)
+                idx += 1
+
+            env_params["STRATOS_WK_MANAGER_MEMBER_COUNT"] = int(manager_min_instance_count)
+
+        # If all the manager members are non primary and is greate than or equal to mincount,
+        # minManagerInstancesAvailable should be true
+        all_managers_non_primary = True
+        for member in manager_clusters[0].get_members():
+            # get the min instance count
+            if not manager_min_instance_count_found:
+                manager_min_instance_count = self.get_min_instance_count_from_member(member)
+                manager_min_instance_count_found = True
+                self.log.info(
+                    "Manager min instance count when allManagersNonPrimary true : " + manager_min_instance_count)
+
+            if member.properties is not None and "PRIMARY" in member.properties and \
+                            member.properties["PRIMARY"].lower() == "true":
+                all_managers_non_primary = False
+                break
+
+        self.log.debug(
+            " allManagerNonPrimary & managerMinInstanceCount [" + all_managers_non_primary +
+            "], [" + manager_min_instance_count + "] ")
+
+        if all_managers_non_primary and len(manager_clusters) >= manager_min_instance_count:
+            min_manager_instances_available = True
+
+        # worker cluster
+        worker_clusters = worker_service.get_clusters()
+        if worker_clusters is None or len(worker_clusters) == 0:
+            self.log.warn("No clusters found for service [ " + worker_service_name + " ]")
+            return False
+
+        worker_min_instance_count = 1
+        worker_min_instance_count_found = False
+
+        worker_wka_members = []
+        for member in worker_clusters[0].get_members():
+            self.log.debug("Checking member : " + member.member_id)
+
+            if member.properties is not None and "PRIMARY" in member.properties and \
+                    member.properties["PRIMARY"].lower() == "true" and \
+                    (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated):
+
+                self.log.debug("Added worker member " + member.member_id)
+
+                worker_wka_members.append(member)
+                self.wk_members.append(member)
+
+                # get the min instance count
+                if not worker_min_instance_count_found:
+                    worker_min_instance_count = self.get_min_instance_count_from_member(member)
+                    worker_min_instance_count_found = True
+                    self.log.debug("Worker min instance count: " + worker_min_instance_count)
+
+        if len(worker_wka_members) >= worker_min_instance_count:
+            min_worker_instances_available = True
+            idx = 0
+            for member in worker_wka_members:
+                env_params["STRATOS_WK_WORKER_MEMBER_" + idx + "_IP"] = member.member_ip
+                self.log.debug("STRATOS_WK_WORKER_MEMBER_" + idx + "_IP: " + member.member_ip)
+                idx += 1
+
+            env_params["STRATOS_WK_WORKER_MEMBER_COUNT"] = int(worker_min_instance_count)
+
+        self.log.debug(
+            " Returnning values minManagerInstancesAvailable && minWorkerInstancesAvailable [" +
+            min_manager_instances_available + "],  [" + min_worker_instances_available + "] ")
+
+        return min_manager_instances_available and min_worker_instances_available
+
+    def get_min_instance_count_from_member(self, member):
+        if "MIN_COUNT" in member.properties:
+            return int(member.properties["MIN_COUNT"])
+
+        return 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/6b8fc6e4/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py b/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
index 3ce4379..7fee50d 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
@@ -31,6 +31,18 @@ class Topology:
     def get_services(self):
         return self.service_map.values()
 
+    def get_service(self, service_name):
+        """
+
+        :param str service_name: service name to be retrieved
+        :return: Service object of the service
+        :rtype: Service
+        """
+        if service_name in self.service_map:
+            return self.service_map[service_name]
+
+        return None
+
     def add_service(self, service):
         self.service_map[service.service_name] = service
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/6b8fc6e4/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
index d1305b9..267c608 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
@@ -69,7 +69,7 @@ def wait_for_complete_topology():
 
 def check_topology_consistency(service_name, cluster_id, member_id):
     topology = TopologyContext.get_topology()
-    service = topology.service_map[service_name]
+    service = topology.get_service(service_name)
     if service is None:
         log.error("Service not found in topology [service] %r" % service_name)
         return False
@@ -104,7 +104,7 @@ def is_relevant_member_event(service_name, cluster_id, lb_cluster_id):
 
     service_group_in_payload = CartridgeAgentConfiguration.service_group
     if service_group_in_payload is not None:
-        service_properties = topology.service_map[service_name].properties
+        service_properties = topology.get_service(service_name).properties
         if service_properties is None:
             return False
 
@@ -211,7 +211,7 @@ def add_payload_parameters(params):
 
     topology = TopologyContext.get_topology()
     if topology.initialized:
-        service = topology.service_map[CartridgeAgentConfiguration.service_name]
+        service = topology.get_service(CartridgeAgentConfiguration.service_name)
         cluster = service.get_cluster(CartridgeAgentConfiguration.cluster_id)
         member_id_in_payload = CartridgeAgentConfiguration.member_id
         add_properties(service.properties, params, "SERVICE_PROPERTY")