You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ch...@apache.org on 2015/11/30 14:44:19 UTC
[2/5] stratos git commit: PCA - Publish MB events using a thread and
timeout after 5 seconds. Improved PCA structure and removed unnecessary
threading PCA Live Test - Improved logging, improved MB HA test case
PCA - Publish MB events using a thread and timeout after 5 seconds. Improved PCA structure and removed unnecessary threading
PCA Live Test - Improved logging, improved MB HA test case
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d1912180
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d1912180
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d1912180
Branch: refs/heads/stratos-4.1.x
Commit: d19121809154d42998ddc0c85722600f03f7225f
Parents: e722ff3
Author: Chamila de Alwis <ch...@apache.org>
Authored: Fri Nov 27 14:36:42 2015 +0530
Committer: Chamila de Alwis <ch...@apache.org>
Committed: Mon Nov 30 19:13:55 2015 +0530
----------------------------------------------------------------------
.../cartridge.agent/cartridge.agent/agent.py | 62 +-
.../modules/event/eventhandler.py | 1181 +++++++++---------
.../cartridge.agent/publisher.py | 72 +-
.../cartridge.agent/subscriber.py | 8 +-
.../integration/tests/ADCExtensionTestCase.java | 5 +
.../tests/ADCMTAppTenantUserTestCase.java | 5 +
.../integration/tests/ADCMTAppTestCase.java | 5 +
.../agent/integration/tests/ADCTestCase.java | 5 +
.../integration/tests/AgentStartupTestCase.java | 5 +
.../integration/tests/CEPHAModeTestCase.java | 5 +
.../tests/MessageBrokerHATestCase.java | 30 +-
.../tests/PythonAgentIntegrationTest.java | 14 +-
.../MessageBrokerHATestCase/agent.conf | 2 +-
.../src/test/resources/log4j.properties | 2 +-
.../src/test/resources/test-suite-all.xml | 1 +
.../src/test/resources/test-suite-smoke.xml | 2 +
16 files changed, 734 insertions(+), 670 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
index 959568b..6b81dff 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
@@ -16,22 +16,19 @@
# specific language governing permissions and limitations
# under the License.
-import threading
-
import publisher
from logpublisher import *
from modules.event.application.signup.events import *
from modules.event.domain.mapping.events import *
-from modules.event.eventhandler import EventHandler
+import modules.event.eventhandler as event_handler
from modules.event.instance.notifier.events import *
from modules.event.tenant.events import *
from modules.event.topology.events import *
from subscriber import EventSubscriber
-class CartridgeAgent(threading.Thread):
+class CartridgeAgent(object):
def __init__(self):
- threading.Thread.__init__(self)
Config.initialize_config()
self.__tenant_context_initialized = False
self.__log_publish_manager = None
@@ -47,9 +44,7 @@ class CartridgeAgent(threading.Thread):
self.__app_topic_subscriber = EventSubscriber(constants.APPLICATION_SIGNUP, mb_urls, mb_uname, mb_pwd)
self.__topology_event_subscriber = EventSubscriber(constants.TOPOLOGY_TOPIC, mb_urls, mb_uname, mb_pwd)
- self.__event_handler = EventHandler()
-
- def run(self):
+ def run_agent(self):
self.__log.info("Starting Cartridge Agent...")
# Start topology event receiver thread
@@ -58,7 +53,7 @@ class CartridgeAgent(threading.Thread):
if Config.lvs_virtual_ip is None or str(Config.lvs_virtual_ip).strip() == "":
self.__log.debug("LVS Virtual IP is not defined")
else:
- self.__event_handler.create_dummy_interface()
+ event_handler.create_dummy_interface()
# request complete topology event from CC by publishing CompleteTopologyRequestEvent
publisher.publish_complete_topology_request_event()
@@ -84,14 +79,14 @@ class CartridgeAgent(threading.Thread):
publisher.publish_complete_tenant_request_event()
# Execute instance started shell script
- self.__event_handler.on_instance_started_event()
+ event_handler.on_instance_started_event()
# Publish instance started event
publisher.publish_instance_started_event()
# Execute start servers extension
try:
- self.__event_handler.start_server_extension()
+ event_handler.start_server_extension()
except Exception as e:
self.__log.exception("Error processing start servers event: %s" % e)
@@ -100,7 +95,7 @@ class CartridgeAgent(threading.Thread):
if repo_url is None or str(repo_url).strip() == "":
self.__log.info("No artifact repository found")
publisher.publish_instance_activated_event()
- self.__event_handler.on_instance_activated_event()
+ event_handler.on_instance_activated_event()
else:
# instance activated event will be published in artifact updated event handler
self.__log.info(
@@ -109,7 +104,7 @@ class CartridgeAgent(threading.Thread):
persistence_mapping_payload = Config.persistence_mappings
if persistence_mapping_payload is not None:
- self.__event_handler.volume_mount_extension(persistence_mapping_payload)
+ event_handler.volume_mount_extension(persistence_mapping_payload)
# start log publishing thread
if DataPublisherConfiguration.get_instance().enabled:
@@ -198,14 +193,14 @@ class CartridgeAgent(threading.Thread):
def on_artifact_updated(self, msg):
event_obj = ArtifactUpdatedEvent.create_from_json(msg.payload)
- self.__event_handler.on_artifact_updated_event(event_obj)
+ event_handler.on_artifact_updated_event(event_obj)
def on_instance_cleanup_member(self, msg):
member_in_payload = Config.member_id
event_obj = InstanceCleanupMemberEvent.create_from_json(msg.payload)
member_in_event = event_obj.member_id
if member_in_payload == member_in_event:
- self.__event_handler.on_instance_cleanup_member_event()
+ event_handler.on_instance_cleanup_member_event()
def on_instance_cleanup_cluster(self, msg):
event_obj = InstanceCleanupClusterEvent.create_from_json(msg.payload)
@@ -215,7 +210,7 @@ class CartridgeAgent(threading.Thread):
instance_in_event = event_obj.cluster_instance_id
if cluster_in_event == cluster_in_payload and instance_in_payload == instance_in_event:
- self.__event_handler.on_instance_cleanup_cluster_event()
+ event_handler.on_instance_cleanup_cluster_event()
def on_member_created(self, msg):
self.__log.debug("Member created event received: %r" % msg.payload)
@@ -227,7 +222,7 @@ class CartridgeAgent(threading.Thread):
if not TopologyContext.topology.initialized:
return
- self.__event_handler.on_member_initialized_event(event_obj)
+ event_handler.on_member_initialized_event(event_obj)
def on_member_activated(self, msg):
self.__log.debug("Member activated event received: %r" % msg.payload)
@@ -235,7 +230,7 @@ class CartridgeAgent(threading.Thread):
return
event_obj = MemberActivatedEvent.create_from_json(msg.payload)
- self.__event_handler.on_member_activated_event(event_obj)
+ event_handler.on_member_activated_event(event_obj)
def on_member_terminated(self, msg):
self.__log.debug("Member terminated event received: %r" % msg.payload)
@@ -243,7 +238,7 @@ class CartridgeAgent(threading.Thread):
return
event_obj = MemberTerminatedEvent.create_from_json(msg.payload)
- self.__event_handler.on_member_terminated_event(event_obj)
+ event_handler.on_member_terminated_event(event_obj)
def on_member_suspended(self, msg):
self.__log.debug("Member suspended event received: %r" % msg.payload)
@@ -251,7 +246,7 @@ class CartridgeAgent(threading.Thread):
return
event_obj = MemberSuspendedEvent.create_from_json(msg.payload)
- self.__event_handler.on_member_suspended_event(event_obj)
+ event_handler.on_member_suspended_event(event_obj)
def on_complete_topology(self, msg):
event_obj = CompleteTopologyEvent.create_from_json(msg.payload)
@@ -259,7 +254,7 @@ class CartridgeAgent(threading.Thread):
if not TopologyContext.topology.initialized:
self.__log.info("Topology initialized from complete topology event")
TopologyContext.topology.initialized = True
- self.__event_handler.on_complete_topology_event(event_obj)
+ event_handler.on_complete_topology_event(event_obj)
self.__log.debug("Topology context updated with [topology] %r" % event_obj.topology.json_str)
@@ -269,17 +264,17 @@ class CartridgeAgent(threading.Thread):
return
event_obj = MemberStartedEvent.create_from_json(msg.payload)
- self.__event_handler.on_member_started_event(event_obj)
+ event_handler.on_member_started_event(event_obj)
def on_domain_mapping_added(self, msg):
self.__log.debug("Subscription domain added event received : %r" % msg.payload)
event_obj = DomainMappingAddedEvent.create_from_json(msg.payload)
- self.__event_handler.on_domain_mapping_added_event(event_obj)
+ event_handler.on_domain_mapping_added_event(event_obj)
def on_domain_mapping_removed(self, msg):
self.__log.debug("Subscription domain removed event received : %r" % msg.payload)
event_obj = DomainMappingRemovedEvent.create_from_json(msg.payload)
- self.__event_handler.on_domain_mapping_removed_event(event_obj)
+ event_handler.on_domain_mapping_removed_event(event_obj)
def on_complete_tenant(self, msg):
event_obj = CompleteTenantEvent.create_from_json(msg.payload)
@@ -287,19 +282,19 @@ class CartridgeAgent(threading.Thread):
if not self.__tenant_context_initialized:
self.__log.info("Tenant context initialized from complete tenant event")
self.__tenant_context_initialized = True
- self.__event_handler.on_complete_tenant_event(event_obj)
+ event_handler.on_complete_tenant_event(event_obj)
self.__log.debug("Tenant context updated with [tenant list] %r" % event_obj.tenant_list_json)
def on_tenant_subscribed(self, msg):
self.__log.debug("Tenant subscribed event received: %r" % msg.payload)
event_obj = TenantSubscribedEvent.create_from_json(msg.payload)
- self.__event_handler.on_tenant_subscribed_event(event_obj)
+ event_handler.on_tenant_subscribed_event(event_obj)
def on_application_signup_removed(self, msg):
self.__log.debug("Application signup removed event received: %r" % msg.payload)
event_obj = ApplicationSignUpRemovedEvent.create_from_json(msg.payload)
- self.__event_handler.on_application_signup_removed_event(event_obj)
+ event_handler.on_application_signup_removed_event(event_obj)
def wait_for_complete_topology(self):
while not TopologyContext.topology.initialized:
@@ -308,17 +303,12 @@ class CartridgeAgent(threading.Thread):
self.__log.info("Complete topology event received")
-def main():
- cartridge_agent = CartridgeAgent()
+if __name__ == "__main__":
log = LogFactory().get_log(__name__)
-
try:
log.info("Starting Stratos cartridge agent...")
- cartridge_agent.start()
+ cartridge_agent = CartridgeAgent()
+ cartridge_agent.run_agent()
except Exception as e:
log.exception("Cartridge Agent Exception: %r" % e)
- cartridge_agent.terminate()
-
-
-if __name__ == "__main__":
- main()
+ # cartridge_agent.terminate()
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
index 6e2aa4f..f8b0c2b 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
@@ -21,8 +21,6 @@ from threading import Thread
import publisher
from entity import *
-import constants
-from config import Config
from ..artifactmgt.git.agentgithandler import *
from ..artifactmgt.repository import Repository
from ..util import cartridgeagentutils
@@ -31,485 +29,492 @@ from ..util.log import LogFactory
SUPER_TENANT_ID = "-1234"
SUPER_TENANT_REPO_PATH = "/repository/deployment/server/"
TENANT_REPO_PATH = "/repository/tenants/"
+log = LogFactory().get_log(__name__)
+
+"""
+Event execution related logic
+"""
+
+
+def on_instance_started_event():
+ log.debug("Processing instance started event...")
+ # TODO: copy artifacts extension
+ execute_event_extendables(constants.INSTANCE_STARTED_EVENT, {})
+
+
+def create_dummy_interface():
+ log.debug("Processing lvs dummy interface creation...")
+ lvs_vip = Config.lvs_virtual_ip.split("|")
+ log.debug("LVS dummy interface creation values %s %s " % (lvs_vip[0], lvs_vip[1]))
+ execute_event_extendables(
+ constants.CREATE_LVS_DUMMY_INTERFACE,
+ {"EVENT": constants.CREATE_LVS_DUMMY_INTERFACE,
+ "LVS_DUMMY_VIRTUAL_IP": lvs_vip[0],
+ "LVS_SUBNET_MASK": lvs_vip[1]}
+ )
+
+
+def on_instance_activated_event():
+ log.debug("Processing instance activated event...")
+ execute_event_extendables(constants.INSTANCE_ACTIVATED_EVENT, {})
+
+
+def on_artifact_updated_event(artifacts_updated_event):
+ log.debug(
+ "Processing artifact updated event for [tenant] %s [cluster] %s [status] %s"
+ % (str(artifacts_updated_event.tenant_id), artifacts_updated_event.cluster_id, artifacts_updated_event.status))
+
+ cluster_id_event = str(artifacts_updated_event.cluster_id).strip()
+ cluster_id_payload = Config.cluster_id
+ repo_url = str(artifacts_updated_event.repo_url).strip()
+
+ if repo_url == "":
+ log.error("Repository URL is empty. Failed to process artifact updated event.")
+ return
+
+ if cluster_id_payload is None or cluster_id_payload == "":
+ log.error("Cluster ID in payload is empty. Failed to process artifact updated event.")
+ return
+
+ if cluster_id_payload != cluster_id_event:
+ log.debug("Cluster ID in artifact updated event does not match. Skipping event handler.")
+ return
+
+ repo_password = None
+ if artifacts_updated_event.repo_password is not None:
+ secret = Config.cartridge_key
+ repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret)
+
+ if Config.app_path is None:
+ log.error("Repository path is empty. Failed to process artifact updated event.")
+ return
+
+ repo_username = artifacts_updated_event.repo_username
+ tenant_id = artifacts_updated_event.tenant_id
+ is_multitenant = Config.is_multiTenant
+ commit_enabled = artifacts_updated_event.commit_enabled
+
+ # create repo object
+ local_repo_path = get_repo_path_for_tenant(str(tenant_id), Config.app_path, is_multitenant)
+ repo_info = Repository(repo_url, repo_username, repo_password, local_repo_path, tenant_id, commit_enabled)
+ log.info("Executing checkout job on artifact updated event...")
+
+ try:
+ Config.artifact_checkout_plugin.plugin_object.checkout(repo_info)
+ except Exception as e:
+ log.exception(
+ "Checkout job on artifact updated event failed for tenant: %s %s" % (repo_info.tenant_id, e))
+
+ # execute artifact updated extension
+ plugin_values = {"ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id,
+ "ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id,
+ "ARTIFACT_UPDATED_REPO_URL": artifacts_updated_event.repo_url,
+ "ARTIFACT_UPDATED_REPO_PASSWORD": artifacts_updated_event.repo_password,
+ "ARTIFACT_UPDATED_REPO_USERNAME": artifacts_updated_event.repo_username,
+ "ARTIFACT_UPDATED_STATUS": artifacts_updated_event.status}
+
+ try:
+ execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, plugin_values)
+ except Exception as e:
+ log.exception("Could not execute plugins for artifact updated event: %s" % e)
+
+ if not Config.activated:
+ # publish instance activated event if not yet activated
+ publisher.publish_instance_activated_event()
+ on_instance_activated_event()
+
+ update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, True)
+ auto_commit = Config.is_commits_enabled
+ auto_checkout = Config.is_checkout_enabled
+ log.info("ADC configuration: [update_artifacts] %s, [auto-commit] %s, [auto-checkout] %s"
+ % (update_artifacts, auto_commit, auto_checkout))
+
+ if update_artifacts:
+ try:
+ update_interval = int(Config.artifact_update_interval)
+ except ValueError:
+ log.exception("Invalid artifact sync interval specified: %s" % ValueError)
+ update_interval = 10
+
+ log.info("Artifact updating task enabled, update interval: %s seconds" % update_interval)
+
+ log.info("Auto Commit is turned %s " % ("on" if auto_commit else "off"))
+ log.info("Auto Checkout is turned %s " % ("on" if auto_checkout else "off"))
+
+ AgentGitHandler.schedule_artifact_update_task(
+ repo_info,
+ auto_checkout,
+ auto_commit,
+ update_interval)
+
+
+def on_instance_cleanup_cluster_event():
+ log.debug("Processing instance cleanup cluster event...")
+ cleanup(constants.INSTANCE_CLEANUP_CLUSTER_EVENT)
+
+
+def on_instance_cleanup_member_event():
+ log.debug("Processing instance cleanup member event...")
+ cleanup(constants.INSTANCE_CLEANUP_MEMBER_EVENT)
+
+
+def on_member_activated_event(member_activated_event):
+ log.debug(
+ "Processing Member activated event: [service] %r [cluster] %r [member] %r"
+ % (member_activated_event.service_name,
+ member_activated_event.cluster_id,
+ member_activated_event.member_id))
+
+ member_initialized = is_member_initialized_in_topology(
+ member_activated_event.service_name,
+ member_activated_event.cluster_id,
+ member_activated_event.member_id)
+
+ if not member_initialized:
+ log.error("Member has not initialized, failed to execute member activated event")
+ return
+
+ execute_event_extendables(constants.MEMBER_ACTIVATED_EVENT, {})
+
+
+def on_complete_topology_event(complete_topology_event):
+ log.debug("Processing Complete topology event...")
+
+ service_name_in_payload = Config.service_name
+ cluster_id_in_payload = Config.cluster_id
+ member_id_in_payload = Config.member_id
+
+ if not Config.initialized:
+ member_initialized = is_member_initialized_in_topology(
+ service_name_in_payload,
+ cluster_id_in_payload,
+ member_id_in_payload)
+ if member_initialized:
+ # Set cartridge agent as initialized since member is available and it is in initialized state
+ Config.initialized = True
+ log.info(
+ "Member initialized [member id] %s, [cluster-id] %s, [service] %s"
+ % (member_id_in_payload, cluster_id_in_payload, service_name_in_payload))
-class EventHandler:
+ topology = complete_topology_event.get_topology()
+ service = topology.get_service(service_name_in_payload)
+ if service is None:
+ raise Exception("Service not found in topology [service] %s" % service_name_in_payload)
+
+ cluster = service.get_cluster(cluster_id_in_payload)
+ if cluster is None:
+ raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id_in_payload)
+
+ plugin_values = {"TOPOLOGY_JSON": json.dumps(topology.json_str),
+ "MEMBER_LIST_JSON": json.dumps(cluster.member_list_json)}
+
+ execute_event_extendables(constants.COMPLETE_TOPOLOGY_EVENT, plugin_values)
+
+
+def on_member_initialized_event(member_initialized_event):
"""
- Event execution related logic
+ Member initialized event is sent by cloud controller once volume attachment and
+ ip address allocation is completed successfully
+ :param member_initialized_event:
+ :return:
"""
+ log.debug("Processing Member initialized event...")
+ service_name_in_payload = Config.service_name
+ cluster_id_in_payload = Config.cluster_id
+ member_id_in_payload = Config.member_id
+
+ if not Config.initialized and member_id_in_payload == member_initialized_event.member_id:
+ member_exists = member_exists_in_topology(
+ service_name_in_payload,
+ cluster_id_in_payload,
+ member_id_in_payload)
+
+ log.debug("Member exists: %s" % member_exists)
+
+ if member_exists:
+ Config.initialized = True
+ mark_member_as_initialized(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
+ log.info("Instance marked as initialized on member initialized event")
+ else:
+ raise Exception("Member [member-id] %s not found in topology while processing member initialized "
+ "event. [Topology] %s" % (member_id_in_payload, TopologyContext.get_topology()))
- def __init__(self):
- self.__log = LogFactory().get_log(__name__)
+ execute_event_extendables(constants.MEMBER_INITIALIZED_EVENT, {})
- def on_instance_started_event(self):
- self.__log.debug("Processing instance started event...")
- # TODO: copy artifacts extension
- self.execute_event_extendables(constants.INSTANCE_STARTED_EVENT, {})
-
- def create_dummy_interface(self):
- self.__log.debug("Processing lvs dummy interface creation...")
- lvs_vip = Config.lvs_virtual_ip.split("|")
- self.__log.debug("LVS dummy interface creation values %s %s " % (lvs_vip[0], lvs_vip[1]))
- self.execute_event_extendables(constants.CREATE_LVS_DUMMY_INTERFACE,
- {"EVENT": constants.CREATE_LVS_DUMMY_INTERFACE,
- "LVS_DUMMY_VIRTUAL_IP": lvs_vip[0],
- "LVS_SUBNET_MASK": lvs_vip[1]})
-
- def on_instance_activated_event(self):
- self.__log.debug("Processing instance activated event...")
- self.execute_event_extendables(constants.INSTANCE_ACTIVATED_EVENT, {})
-
- def on_artifact_updated_event(self, artifacts_updated_event):
- self.__log.debug("Processing artifact updated event for [tenant] %s [cluster] %s [status] %s" %
- (str(artifacts_updated_event.tenant_id),
- artifacts_updated_event.cluster_id,
- artifacts_updated_event.status))
-
- cluster_id_event = str(artifacts_updated_event.cluster_id).strip()
- cluster_id_payload = Config.cluster_id
- repo_url = str(artifacts_updated_event.repo_url).strip()
-
- if repo_url == "":
- self.__log.error("Repository URL is empty. Failed to process artifact updated event.")
- return
-
- if cluster_id_payload is None or cluster_id_payload == "":
- self.__log.error("Cluster ID in payload is empty. Failed to process artifact updated event.")
- return
-
- if cluster_id_payload != cluster_id_event:
- self.__log.debug("Cluster ID in artifact updated event does not match. Skipping event handler.")
- return
-
- repo_password = None
- if artifacts_updated_event.repo_password is not None:
- secret = Config.cartridge_key
- repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret)
-
- if Config.app_path is None:
- self.__log.error("Repository path is empty. Failed to process artifact updated event.")
- return
-
- if not EventHandler.validate_repo_path(Config.app_path):
- self.__log.error(
- "Repository path cannot be accessed, or is invalid. Failed to process artifact updated event.")
- return
-
- repo_username = artifacts_updated_event.repo_username
- tenant_id = artifacts_updated_event.tenant_id
- is_multitenant = Config.is_multiTenant
- commit_enabled = artifacts_updated_event.commit_enabled
-
- # create repo object
- local_repo_path = self.get_repo_path_for_tenant(str(tenant_id), Config.app_path, is_multitenant)
- repo_info = Repository(repo_url, repo_username, repo_password, local_repo_path, tenant_id, commit_enabled)
- self.__log.info("Executing checkout job on artifact updated event...")
- try:
- Config.artifact_checkout_plugin.plugin_object.checkout(repo_info)
- except Exception as e:
- self.__log.exception(
- "Checkout job on artifact updated event failed for tenant: %s %s" % (repo_info.tenant_id, e))
+def on_complete_tenant_event(complete_tenant_event):
+ log.debug("Processing Complete tenant event...")
- # execute artifact updated extension
- plugin_values = {"ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id,
- "ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id,
- "ARTIFACT_UPDATED_REPO_URL": artifacts_updated_event.repo_url,
- "ARTIFACT_UPDATED_REPO_PASSWORD": artifacts_updated_event.repo_password,
- "ARTIFACT_UPDATED_REPO_USERNAME": artifacts_updated_event.repo_username,
- "ARTIFACT_UPDATED_STATUS": artifacts_updated_event.status}
+ tenant_list_json = complete_tenant_event.tenant_list_json
+ log.debug("Complete tenants:" + json.dumps(tenant_list_json))
- try:
- self.execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, plugin_values)
- except Exception as e:
- self.__log.exception("Could not execute plugins for artifact updated event: %s" % e)
-
- if not Config.activated:
- # publish instance activated event if not yet activated
- publisher.publish_instance_activated_event()
- self.on_instance_activated_event()
-
- update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, True)
- auto_commit = Config.is_commits_enabled
- auto_checkout = Config.is_checkout_enabled
- self.__log.info("ADC configuration: [update_artifacts] %s, [auto-commit] %s, [auto-checkout] %s",
- update_artifacts, auto_commit, auto_checkout)
- if update_artifacts:
- try:
- update_interval = int(Config.artifact_update_interval)
- except ValueError:
- self.__log.exception("Invalid artifact sync interval specified: %s" % ValueError)
- update_interval = 10
-
- self.__log.info("Artifact updating task enabled, update interval: %s seconds" % update_interval)
-
- self.__log.info("Auto Commit is turned %s " % ("on" if auto_commit else "off"))
- self.__log.info("Auto Checkout is turned %s " % ("on" if auto_checkout else "off"))
-
- AgentGitHandler.schedule_artifact_update_task(
- repo_info,
- auto_checkout,
- auto_commit,
- update_interval)
-
- def on_instance_cleanup_cluster_event(self):
- self.__log.debug("Processing instance cleanup cluster event...")
- self.cleanup(constants.INSTANCE_CLEANUP_CLUSTER_EVENT)
-
- def on_instance_cleanup_member_event(self):
- self.__log.debug("Processing instance cleanup member event...")
- self.cleanup(constants.INSTANCE_CLEANUP_MEMBER_EVENT)
-
- def on_member_activated_event(self, member_activated_event):
- self.__log.debug("Processing Member activated event: [service] %r [cluster] %r [member] %r"
- % (member_activated_event.service_name,
- member_activated_event.cluster_id,
- member_activated_event.member_id))
-
- member_initialized = self.is_member_initialized_in_topology(
- member_activated_event.service_name,
- member_activated_event.cluster_id,
- member_activated_event.member_id)
-
- if not member_initialized:
- self.__log.error("Member has not initialized, failed to execute member activated event")
- return
-
- self.execute_event_extendables(constants.MEMBER_ACTIVATED_EVENT, {})
-
- def on_complete_topology_event(self, complete_topology_event):
- self.__log.debug("Processing Complete topology event...")
-
- service_name_in_payload = Config.service_name
- cluster_id_in_payload = Config.cluster_id
- member_id_in_payload = Config.member_id
-
- if not Config.initialized:
- member_initialized = self.is_member_initialized_in_topology(
- service_name_in_payload,
- cluster_id_in_payload,
- member_id_in_payload)
-
- if member_initialized:
- # Set cartridge agent as initialized since member is available and it is in initialized state
- Config.initialized = True
- self.__log.info("Member initialized [member id] %s, [cluster-id] %s, [service] %s" %
- (member_id_in_payload, cluster_id_in_payload, service_name_in_payload))
-
- topology = complete_topology_event.get_topology()
- service = topology.get_service(service_name_in_payload)
- if service is None:
- raise Exception("Service not found in topology [service] %s" % service_name_in_payload)
+ plugin_values = {"TENANT_LIST_JSON": json.dumps(tenant_list_json)}
- cluster = service.get_cluster(cluster_id_in_payload)
- if cluster is None:
- raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id_in_payload)
-
- plugin_values = {"TOPOLOGY_JSON": json.dumps(topology.json_str),
- "MEMBER_LIST_JSON": json.dumps(cluster.member_list_json)}
-
- self.execute_event_extendables(constants.COMPLETE_TOPOLOGY_EVENT, plugin_values)
-
- def on_member_initialized_event(self, member_initialized_event):
- """
- Member initialized event is sent by cloud controller once volume attachment and
- ip address allocation is completed successfully
- :return:
- """
- self.__log.debug("Processing Member initialized event...")
- service_name_in_payload = Config.service_name
- cluster_id_in_payload = Config.cluster_id
- member_id_in_payload = Config.member_id
-
- if not Config.initialized and member_id_in_payload == member_initialized_event.member_id:
- member_exists = self.member_exists_in_topology(service_name_in_payload, cluster_id_in_payload,
- member_id_in_payload)
- self.__log.debug("Member exists: %s" % member_exists)
- if member_exists:
- Config.initialized = True
- self.mark_member_as_initialized(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
- self.__log.info("Instance marked as initialized on member initialized event")
- else:
- raise Exception("Member [member-id] %s not found in topology while processing member initialized "
- "event. [Topology] %s" % (member_id_in_payload, TopologyContext.get_topology()))
+ execute_event_extendables(constants.COMPLETE_TENANT_EVENT, plugin_values)
+
+
+def on_member_terminated_event(member_terminated_event):
+ log.debug(
+ "Processing Member terminated event: [service] %s [cluster] %s [member] %s"
+ % (member_terminated_event.service_name, member_terminated_event.cluster_id, member_terminated_event.member_id))
+
+ member_initialized = is_member_initialized_in_topology(
+ member_terminated_event.service_name,
+ member_terminated_event.cluster_id,
+ member_terminated_event.member_id
+ )
+
+ if not member_initialized:
+ log.error("Member has not initialized, failed to execute member terminated event")
+ return
+
+ execute_event_extendables(constants.MEMBER_TERMINATED_EVENT, {})
+
+
+def on_member_suspended_event(member_suspended_event):
+ log.debug(
+ "Processing Member suspended event: [service] %s [cluster] %s [member] %s"
+ % (member_suspended_event.service_name, member_suspended_event.cluster_id, member_suspended_event.member_id))
+
+ member_initialized = is_member_initialized_in_topology(
+ member_suspended_event.service_name,
+ member_suspended_event.cluster_id,
+ member_suspended_event.member_id
+ )
+
+ if not member_initialized:
+ log.error("Member has not initialized, failed to execute member suspended event")
+ return
+
+ execute_event_extendables(constants.MEMBER_SUSPENDED_EVENT, {})
+
+
+def on_member_started_event(member_started_event):
+ log.debug(
+ "Processing Member started event: [service] %s [cluster] %s [member] %s"
+ % (member_started_event.service_name, member_started_event.cluster_id, member_started_event.member_id))
+
+ member_initialized = is_member_initialized_in_topology(
+ member_started_event.service_name,
+ member_started_event.cluster_id,
+ member_started_event.member_id
+ )
+
+ if not member_initialized:
+ log.error("Member has not initialized, failed to execute member started event")
+ return
+
+ execute_event_extendables(constants.MEMBER_STARTED_EVENT, {})
+
+
+def start_server_extension():
+ log.debug("Processing start server extension...")
+ service_name_in_payload = Config.service_name
+ cluster_id_in_payload = Config.cluster_id
+ member_id_in_payload = Config.member_id
+ member_initialized = is_member_initialized_in_topology(
+ service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
+
+ if not member_initialized:
+ log.error("Member has not initialized, failed to execute start server event")
+ return
+
+ execute_event_extendables("StartServers", {})
+
+
+def volume_mount_extension(persistence_mappings_payload):
+ log.debug("Processing volume mount extension...")
+ execute_event_extendables("VolumeMount", persistence_mappings_payload)
+
+
+def on_domain_mapping_added_event(domain_mapping_added_event):
+ tenant_domain = find_tenant_domain(domain_mapping_added_event.tenant_id)
+ log.debug(
+ "Processing Domain mapping added event: [tenant-id] " + str(domain_mapping_added_event.tenant_id) +
+ " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_added_event.domain_name +
+ " [application-context] " + domain_mapping_added_event.application_context
+ )
+
+ plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_added_event.application_id,
+ "SUBSCRIPTION_SERVICE_NAME": domain_mapping_added_event.service_name,
+ "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_added_event.domain_name,
+ "SUBSCRIPTION_CLUSTER_ID": domain_mapping_added_event.cluster_id,
+ "SUBSCRIPTION_TENANT_ID": int(domain_mapping_added_event.tenant_id),
+ "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain,
+ "SUBSCRIPTION_CONTEXT_PATH":
+ domain_mapping_added_event.context_path}
+
+ execute_event_extendables(constants.DOMAIN_MAPPING_ADDED_EVENT, plugin_values)
+
+
+def on_domain_mapping_removed_event(domain_mapping_removed_event):
+ tenant_domain = find_tenant_domain(domain_mapping_removed_event.tenant_id)
+ log.info(
+ "Domain mapping removed event received: [tenant-id] " + str(domain_mapping_removed_event.tenant_id) +
+ " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_removed_event.domain_name
+ )
+
+ plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_removed_event.application_id,
+ "SUBSCRIPTION_SERVICE_NAME": domain_mapping_removed_event.service_name,
+ "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_removed_event.domain_name,
+ "SUBSCRIPTION_CLUSTER_ID": domain_mapping_removed_event.cluster_id,
+ "SUBSCRIPTION_TENANT_ID": int(domain_mapping_removed_event.tenant_id),
+ "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain}
+
+ execute_event_extendables(constants.DOMAIN_MAPPING_REMOVED_EVENT, plugin_values)
+
+
+def on_copy_artifacts_extension(src, dest):
+ log.debug("Processing Copy artifacts extension...")
+ plugin_values = {"SOURCE": src, "DEST": dest}
+ execute_event_extendables("CopyArtifacts", plugin_values)
- self.execute_event_extendables(constants.MEMBER_INITIALIZED_EVENT, {})
-
- def on_complete_tenant_event(self, complete_tenant_event):
- self.__log.debug("Processing Complete tenant event...")
-
- tenant_list_json = complete_tenant_event.tenant_list_json
- self.__log.debug("Complete tenants:" + json.dumps(tenant_list_json))
-
- plugin_values = {"TENANT_LIST_JSON": json.dumps(tenant_list_json)}
-
- self.execute_event_extendables(constants.COMPLETE_TENANT_EVENT, plugin_values)
-
- def on_member_terminated_event(self, member_terminated_event):
- self.__log.debug("Processing Member terminated event: [service] %s [cluster] %s [member] %s" %
- (member_terminated_event.service_name, member_terminated_event.cluster_id,
- member_terminated_event.member_id))
-
- member_initialized = self.is_member_initialized_in_topology(
- member_terminated_event.service_name,
- member_terminated_event.cluster_id,
- member_terminated_event.member_id
- )
-
- if not member_initialized:
- self.__log.error("Member has not initialized, failed to execute member terminated event")
- return
-
- self.execute_event_extendables(constants.MEMBER_TERMINATED_EVENT, {})
-
- def on_member_suspended_event(self, member_suspended_event):
- self.__log.debug("Processing Member suspended event: [service] %s [cluster] %s [member] %s" %
- (member_suspended_event.service_name, member_suspended_event.cluster_id,
- member_suspended_event.member_id))
-
- member_initialized = self.is_member_initialized_in_topology(
- member_suspended_event.service_name,
- member_suspended_event.cluster_id,
- member_suspended_event.member_id
- )
-
- if not member_initialized:
- self.__log.error("Member has not initialized, failed to execute member suspended event")
- return
-
- self.execute_event_extendables(constants.MEMBER_SUSPENDED_EVENT, {})
-
- def on_member_started_event(self, member_started_event):
- self.__log.debug("Processing Member started event: [service] %s [cluster] %s [member] %s" %
- (member_started_event.service_name, member_started_event.cluster_id,
- member_started_event.member_id))
-
- member_initialized = self.is_member_initialized_in_topology(
- member_started_event.service_name,
- member_started_event.cluster_id,
- member_started_event.member_id
- )
-
- if not member_initialized:
- self.__log.error("Member has not initialized, failed to execute member started event")
- return
-
- self.execute_event_extendables(constants.MEMBER_STARTED_EVENT, {})
-
- def start_server_extension(self):
- self.__log.debug("Processing start server extension...")
- service_name_in_payload = Config.service_name
- cluster_id_in_payload = Config.cluster_id
- member_id_in_payload = Config.member_id
- member_initialized = self.is_member_initialized_in_topology(service_name_in_payload, cluster_id_in_payload,
- member_id_in_payload)
-
- if not member_initialized:
- self.__log.error("Member has not initialized, failed to execute start server event")
- return
-
- self.execute_event_extendables("StartServers", {})
-
- def volume_mount_extension(self, persistence_mappings_payload):
- self.__log.debug("Processing volume mount extension...")
- self.execute_event_extendables("VolumeMount", persistence_mappings_payload)
-
- def on_domain_mapping_added_event(self, domain_mapping_added_event):
- tenant_domain = EventHandler.find_tenant_domain(domain_mapping_added_event.tenant_id)
- self.__log.debug(
- "Processing Domain mapping added event: [tenant-id] " + str(domain_mapping_added_event.tenant_id) +
- " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_added_event.domain_name +
- " [application-context] " + domain_mapping_added_event.application_context
- )
-
- plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_added_event.application_id,
- "SUBSCRIPTION_SERVICE_NAME": domain_mapping_added_event.service_name,
- "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_added_event.domain_name,
- "SUBSCRIPTION_CLUSTER_ID": domain_mapping_added_event.cluster_id,
- "SUBSCRIPTION_TENANT_ID": int(domain_mapping_added_event.tenant_id),
- "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain,
- "SUBSCRIPTION_CONTEXT_PATH":
- domain_mapping_added_event.context_path}
-
- self.execute_event_extendables(constants.DOMAIN_MAPPING_ADDED_EVENT, plugin_values)
-
- def on_domain_mapping_removed_event(self, domain_mapping_removed_event):
- tenant_domain = EventHandler.find_tenant_domain(domain_mapping_removed_event.tenant_id)
- self.__log.info(
- "Domain mapping removed event received: [tenant-id] " + str(domain_mapping_removed_event.tenant_id) +
- " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_removed_event.domain_name
- )
-
- plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_removed_event.application_id,
- "SUBSCRIPTION_SERVICE_NAME": domain_mapping_removed_event.service_name,
- "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_removed_event.domain_name,
- "SUBSCRIPTION_CLUSTER_ID": domain_mapping_removed_event.cluster_id,
- "SUBSCRIPTION_TENANT_ID": int(domain_mapping_removed_event.tenant_id),
- "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain}
-
- self.execute_event_extendables(constants.DOMAIN_MAPPING_REMOVED_EVENT, plugin_values)
-
- def on_copy_artifacts_extension(self, src, dest):
- self.__log.debug("Processing Copy artifacts extension...")
- plugin_values = {"SOURCE": src, "DEST": dest}
- self.execute_event_extendables("CopyArtifacts", plugin_values)
-
- def on_tenant_subscribed_event(self, tenant_subscribed_event):
- self.__log.debug(
- "Processing Tenant subscribed event: [tenant] " + str(tenant_subscribed_event.tenant_id) +
- " [service] " + tenant_subscribed_event.service_name + " [cluster] " + tenant_subscribed_event.cluster_ids
- )
-
- self.execute_event_extendables(constants.TENANT_SUBSCRIBED_EVENT, {})
-
- def on_application_signup_removed_event(self, application_signup_removal_event):
- self.__log.debug(
- "Processing Tenant unsubscribed event: [tenant] " + str(application_signup_removal_event.tenantId) +
- " [application ID] " + str(application_signup_removal_event.applicationId)
- )
-
- if Config.application_id == application_signup_removal_event.applicationId:
- AgentGitHandler.remove_repo(application_signup_removal_event.tenantId)
-
- self.execute_event_extendables(constants.APPLICATION_SIGNUP_REMOVAL_EVENT, {})
-
- def cleanup(self, event):
- self.__log.debug("Executing cleanup extension for event %s..." % event)
- publisher.publish_maintenance_mode_event()
- self.execute_event_extendables("clean", {})
- publisher.publish_instance_ready_to_shutdown_event()
-
- def execute_event_extendables(self, event, input_values):
- """ Execute the extensions and plugins related to the event
- :param event: The event name string
- :param input_values: the values to be passed to the plugin
- :return:
- """
- try:
- input_values = EventHandler.add_common_input_values(input_values)
- except Exception as e:
- self.__log.error("Error while adding common input values for event extendables: %s" % e)
- input_values["EVENT"] = event
- self.__log.debug("Executing extensions for [event] %s with [input values] %s" % (event, input_values))
- # Execute the extension
- self.execute_extension_for_event(event, input_values)
- # Execute the plugins
- self.execute_plugins_for_event(event, input_values)
-
- def execute_plugins_for_event(self, event, input_values):
- """ For each plugin registered for the specified event, start a plugin execution thread
- :param str event: The event name string
- :param dict input_values: the values to be passed to the plugin
- :return:
- """
- try:
- plugins_for_event = Config.plugins.get(event)
- if plugins_for_event is not None:
- for plugin_info in plugins_for_event:
- self.__log.debug("Executing plugin %s for event %s" % (plugin_info.name, event))
- plugin_thread = PluginExecutor(plugin_info, input_values)
- plugin_thread.start()
-
- # block till plugin run completes.
- plugin_thread.join()
- else:
- self.__log.debug("No plugins registered for event %s" % event)
- except Exception as e:
- self.__log.exception("Error while executing plugin for event %s: %s" % (event, e))
-
- def execute_extension_for_event(self, event, extension_values):
- """ Execute the extension related to the event
- :param event: The event name string
- :param extension_values: the values to be passed to the plugin
- :return:
- """
- try:
- if Config.extension_executor is not None:
- self.__log.debug("Executing extension for event [%s]" % event)
- extension_thread = PluginExecutor(Config.extension_executor, extension_values)
- extension_thread.start()
+
+def on_tenant_subscribed_event(tenant_subscribed_event):
+ log.debug(
+ "Processing Tenant subscribed event: [tenant] " + str(tenant_subscribed_event.tenant_id) +
+ " [service] " + tenant_subscribed_event.service_name + " [cluster] " + tenant_subscribed_event.cluster_ids
+ )
+
+ execute_event_extendables(constants.TENANT_SUBSCRIBED_EVENT, {})
+
+
+def on_application_signup_removed_event(application_signup_removal_event):
+ log.debug(
+ "Processing Tenant unsubscribed event: [tenant] " + str(application_signup_removal_event.tenantId) +
+ " [application ID] " + str(application_signup_removal_event.applicationId)
+ )
+
+ if Config.application_id == application_signup_removal_event.applicationId:
+ AgentGitHandler.remove_repo(application_signup_removal_event.tenantId)
+
+ execute_event_extendables(constants.APPLICATION_SIGNUP_REMOVAL_EVENT, {})
+
+
+def cleanup(event):
+ log.debug("Executing cleanup extension for event %s..." % event)
+ publisher.publish_maintenance_mode_event()
+ execute_event_extendables("clean", {})
+ publisher.publish_instance_ready_to_shutdown_event()
+
+
+def execute_event_extendables(event, input_values):
+ """ Execute the extensions and plugins related to the event
+ :param event: The event name string
+ :param input_values: the values to be passed to the plugin
+ :return:
+ """
+ try:
+ input_values = add_common_input_values(input_values)
+ except Exception as e:
+ log.error("Error while adding common input values for event extendables: %s" % e)
+ input_values["EVENT"] = event
+ log.debug("Executing extensions for [event] %s with [input values] %s" % (event, input_values))
+ # Execute the extension
+ execute_extension_for_event(event, input_values)
+ # Execute the plugins
+ execute_plugins_for_event(event, input_values)
+
+
+def execute_plugins_for_event(event, input_values):
+ """ For each plugin registered for the specified event, start a plugin execution thread
+ :param str event: The event name string
+ :param dict input_values: the values to be passed to the plugin
+ :return:
+ """
+ try:
+ plugins_for_event = Config.plugins.get(event)
+ if plugins_for_event is not None:
+ for plugin_info in plugins_for_event:
+ log.debug("Executing plugin %s for event %s" % (plugin_info.name, event))
+ plugin_thread = PluginExecutor(plugin_info, input_values)
+ plugin_thread.start()
# block till plugin run completes.
- extension_thread.join()
- else:
- self.__log.debug("No extensions registered for event %s" % event)
- except OSError as e:
- self.__log.warn("No extension was found for event %s: %s" % (event, e))
- except Exception as e:
- self.__log.exception("Error while executing extension for event %s: %s" % (event, e))
-
- def get_repo_path_for_tenant(self, tenant_id, git_local_repo_path, is_multitenant):
- """ Finds the repository path for tenant to clone from the remote repository
- :param tenant_id:
- :param git_local_repo_path:
- :param is_multitenant:
- :return:
- """
- repo_path = ""
-
- if is_multitenant:
- if tenant_id == SUPER_TENANT_ID:
- # super tenant, /repository/deploy/server/
- super_tenant_repo_path = Config.super_tenant_repository_path
- # "app_path"
- repo_path += git_local_repo_path
-
- if super_tenant_repo_path is not None and super_tenant_repo_path != "":
- super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.startswith("/") \
- else "/" + super_tenant_repo_path
- super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.endswith("/") \
- else super_tenant_repo_path + "/"
- # "app_path/repository/deploy/server/"
- repo_path += super_tenant_repo_path
- else:
- # "app_path/repository/deploy/server/"
- repo_path += SUPER_TENANT_REPO_PATH
+ plugin_thread.join()
+ else:
+ log.debug("No plugins registered for event %s" % event)
+ except Exception as e:
+ log.exception("Error while executing plugin for event %s: %s" % (event, e))
+
+def execute_extension_for_event(event, extension_values):
+ """ Execute the extension related to the event
+ :param event: The event name string
+ :param extension_values: the values to be passed to the plugin
+ :return:
+ """
+ try:
+ if Config.extension_executor is not None:
+ log.debug("Executing extension for event [%s]" % event)
+ extension_thread = PluginExecutor(Config.extension_executor, extension_values)
+ extension_thread.start()
+
+ # block till plugin run completes.
+ extension_thread.join()
+ else:
+ log.debug("No extensions registered for event %s" % event)
+ except OSError as e:
+ log.warn("No extension was found for event %s: %s" % (event, e))
+ except Exception as e:
+ log.exception("Error while executing extension for event %s: %s" % (event, e))
+
+
+def get_repo_path_for_tenant(tenant_id, git_local_repo_path, is_multitenant):
+ """ Finds the repository path for tenant to clone from the remote repository
+ :param tenant_id:
+ :param git_local_repo_path:
+ :param is_multitenant:
+ :return:
+ """
+ repo_path = ""
+
+ if is_multitenant:
+ if tenant_id == SUPER_TENANT_ID:
+ # super tenant, /repository/deploy/server/
+ super_tenant_repo_path = Config.super_tenant_repository_path
+ # "app_path"
+ repo_path += git_local_repo_path
+
+ if super_tenant_repo_path is not None and super_tenant_repo_path != "":
+ super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.startswith("/") \
+ else "/" + super_tenant_repo_path
+ super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.endswith("/") \
+ else super_tenant_repo_path + "/"
+ # "app_path/repository/deploy/server/"
+ repo_path += super_tenant_repo_path
else:
- # normal tenant, /repository/tenants/tenant_id
- tenant_repo_path = Config.tenant_repository_path
- # "app_path"
- repo_path += git_local_repo_path
-
- if tenant_repo_path is not None and tenant_repo_path != "":
- tenant_repo_path = tenant_repo_path if tenant_repo_path.startswith("/") else "/" + tenant_repo_path
- tenant_repo_path = tenant_repo_path if tenant_repo_path.endswith("/") else tenant_repo_path + "/"
- # "app_path/repository/tenants/244653444"
- repo_path += tenant_repo_path + tenant_id
- else:
- # "app_path/repository/tenants/244653444"
- repo_path += TENANT_REPO_PATH + tenant_id
-
- # tenant_dir_path = git_local_repo_path + AgentGitHandler.TENANT_REPO_PATH + tenant_id
- # GitUtils.create_dir(repo_path)
+ # "app_path/repository/deploy/server/"
+ repo_path += SUPER_TENANT_REPO_PATH
+
else:
- # not multi tenant, app_path
- repo_path = git_local_repo_path
-
- self.__log.debug("Repo path returned : %r" % repo_path)
- return repo_path
-
- def is_member_initialized_in_topology(self, service_name, cluster_id, member_id):
- if self.member_exists_in_topology(service_name, cluster_id, member_id):
- topology = TopologyContext.get_topology()
- service = topology.get_service(service_name)
- if service is None:
- raise Exception("Service not found in topology [service] %s" % service_name)
-
- cluster = service.get_cluster(cluster_id)
- if cluster is None:
- raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
-
- member = cluster.get_member(member_id)
- if member is None:
- raise Exception("Member id not found in topology [member] %s" % member_id)
-
- self.__log.info("Found member: " + member.to_json())
- if member.status == MemberStatus.Initialized:
- return True
- return False
+ # normal tenant, /repository/tenants/tenant_id
+ tenant_repo_path = Config.tenant_repository_path
+ # "app_path"
+ repo_path += git_local_repo_path
+
+ if tenant_repo_path is not None and tenant_repo_path != "":
+ tenant_repo_path = tenant_repo_path if tenant_repo_path.startswith("/") else "/" + tenant_repo_path
+ tenant_repo_path = tenant_repo_path if tenant_repo_path.endswith("/") else tenant_repo_path + "/"
+ # "app_path/repository/tenants/244653444"
+ repo_path += tenant_repo_path + tenant_id
+ else:
+ # "app_path/repository/tenants/244653444"
+ repo_path += TENANT_REPO_PATH + tenant_id
+
+ # tenant_dir_path = git_local_repo_path + AgentGitHandler.TENANT_REPO_PATH + tenant_id
+ # GitUtils.create_dir(repo_path)
+ else:
+ # not multi tenant, app_path
+ repo_path = git_local_repo_path
- def member_exists_in_topology(self, service_name, cluster_id, member_id):
+ log.debug("Repo path returned : %r" % repo_path)
+ return repo_path
+
+
+def is_member_initialized_in_topology(service_name, cluster_id, member_id):
+ if member_exists_in_topology(service_name, cluster_id, member_id):
topology = TopologyContext.get_topology()
service = topology.get_service(service_name)
if service is None:
@@ -519,131 +524,149 @@ class EventHandler:
if cluster is None:
raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
- activated_member = cluster.get_member(member_id)
- if activated_member is None:
- self.__log.error("Member id not found in topology [member] %s" % member_id)
- return False
+ member = cluster.get_member(member_id)
+ if member is None:
+ raise Exception("Member id not found in topology [member] %s" % member_id)
- return True
+ log.info("Found member: " + member.to_json())
+ if member.status == MemberStatus.Initialized:
+ return True
+ return False
- @staticmethod
- def mark_member_as_initialized(service_name, cluster_id, member_id):
- topology = TopologyContext.get_topology()
- service = topology.get_service(service_name)
+
+def member_exists_in_topology(service_name, cluster_id, member_id):
+ topology = TopologyContext.get_topology()
+ service = topology.get_service(service_name)
+ if service is None:
+ raise Exception("Service not found in topology [service] %s" % service_name)
+
+ cluster = service.get_cluster(cluster_id)
+ if cluster is None:
+ raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
+
+ activated_member = cluster.get_member(member_id)
+ if activated_member is None:
+ log.error("Member id not found in topology [member] %s" % member_id)
+ return False
+
+ return True
+
+
+def mark_member_as_initialized(service_name, cluster_id, member_id):
+ topology = TopologyContext.get_topology()
+ service = topology.get_service(service_name)
+ if service is None:
+ raise Exception("Service not found in topology [service] %s" % service_name)
+
+ cluster = service.get_cluster(cluster_id)
+ if cluster is None:
+ raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
+
+ member = cluster.get_member(member_id)
+ if member is None:
+ raise Exception("Member id not found in topology [member] %s" % member_id)
+ member.status = MemberStatus.Initialized
+
+
+def add_common_input_values(plugin_values):
+ """
+ Adds the common parameters to be used by the extension scripts
+ :param dict[str, str] plugin_values: Dictionary to be added
+ :return: Dictionary with updated parameters
+ :rtype: dict[str, str]
+ """
+ if plugin_values is None:
+ plugin_values = {}
+ elif type(plugin_values) != dict:
+ plugin_values = {"VALUE1": str(plugin_values)}
+
+ plugin_values["APPLICATION_PATH"] = Config.app_path
+ plugin_values["PARAM_FILE_PATH"] = Config.read_property(constants.PARAM_FILE_PATH, False)
+ plugin_values["PERSISTENCE_MAPPINGS"] = Config.persistence_mappings
+
+ lb_cluster_id_in_payload = Config.lb_cluster_id
+ lb_private_ip, lb_public_ip = get_lb_member_ip(lb_cluster_id_in_payload)
+ plugin_values["LB_IP"] = lb_private_ip if lb_private_ip is not None else Config.lb_private_ip
+ plugin_values["LB_PUBLIC_IP"] = lb_public_ip if lb_public_ip is not None else Config.lb_public_ip
+
+ topology = TopologyContext.get_topology()
+ if topology.initialized:
+ service = topology.get_service(Config.service_name)
if service is None:
- raise Exception("Service not found in topology [service] %s" % service_name)
+ raise Exception("Service not found in topology [service] %s" % Config.service_name)
- cluster = service.get_cluster(cluster_id)
+ cluster = service.get_cluster(Config.cluster_id)
if cluster is None:
- raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
+ raise Exception("Cluster id not found in topology [cluster] %s" % Config.cluster_id)
- member = cluster.get_member(member_id)
+ member = cluster.get_member(Config.member_id)
if member is None:
- raise Exception("Member id not found in topology [member] %s" % member_id)
- member.status = MemberStatus.Initialized
-
- @staticmethod
- def add_common_input_values(plugin_values):
- """
- Adds the common parameters to be used by the extension scripts
- :param dict[str, str] plugin_values: Dictionary to be added
- :return: Dictionary with updated parameters
- :rtype: dict[str, str]
- """
- if plugin_values is None:
- plugin_values = {}
- elif type(plugin_values) != dict:
- plugin_values = {"VALUE1": str(plugin_values)}
-
- plugin_values["APPLICATION_PATH"] = Config.app_path
- plugin_values["PARAM_FILE_PATH"] = Config.read_property(constants.PARAM_FILE_PATH, False)
- plugin_values["PERSISTENCE_MAPPINGS"] = Config.persistence_mappings
-
- lb_cluster_id_in_payload = Config.lb_cluster_id
- lb_private_ip, lb_public_ip = EventHandler.get_lb_member_ip(lb_cluster_id_in_payload)
- plugin_values["LB_IP"] = lb_private_ip if lb_private_ip is not None else Config.lb_private_ip
- plugin_values["LB_PUBLIC_IP"] = lb_public_ip if lb_public_ip is not None else Config.lb_public_ip
+ raise Exception("Member id not found in topology [member] %s" % Config.member_id)
- topology = TopologyContext.get_topology()
- if topology.initialized:
- service = topology.get_service(Config.service_name)
- if service is None:
- raise Exception("Service not found in topology [service] %s" % Config.service_name)
-
- cluster = service.get_cluster(Config.cluster_id)
- if cluster is None:
- raise Exception("Cluster id not found in topology [cluster] %s" % Config.cluster_id)
-
- member = cluster.get_member(Config.member_id)
- if member is None:
- raise Exception("Member id not found in topology [member] %s" % Config.member_id)
-
- EventHandler.add_properties(service.properties, plugin_values, "SERVICE_PROPERTY")
- EventHandler.add_properties(cluster.properties, plugin_values, "CLUSTER_PROPERTY")
- EventHandler.add_properties(member.properties, plugin_values, "MEMBER_PROPERTY")
-
- plugin_values.update(Config.get_payload_params())
-
- return EventHandler.clean_process_parameters(plugin_values)
-
- @staticmethod
- def add_properties(properties, params, prefix):
- """
- Adds the given property list to the parameters list with given prefix in the parameter name
- :param dict[str, str] properties: service properties
- :param dict[str, str] params:
- :param str prefix:
- :return: dict[str, str]
- """
- if properties is None or properties.items() is None:
- return
-
- for key in properties:
- params[prefix + "_" + key] = str(properties[key])
-
- @staticmethod
- def get_lb_member_ip(lb_cluster_id):
- topology = TopologyContext.get_topology()
- services = topology.get_services()
-
- for service in services:
- clusters = service.get_clusters()
- for cluster in clusters:
- members = cluster.get_members()
- for member in members:
- if member.cluster_id == lb_cluster_id:
- return member.member_default_private_ip, member.member_default_public_ip
-
- return None, None
-
- @staticmethod
- def clean_process_parameters(params):
- """
- Removes any null valued parameters before passing them to the extension scripts
- :param dict params:
- :return: cleaned parameters
- :rtype: dict
- """
- for key, value in params.items():
- if value is None:
- del params[key]
-
- return params
-
- @staticmethod
- def find_tenant_domain(tenant_id):
- tenant = TenantContext.get_tenant(tenant_id)
- if tenant is None:
- raise RuntimeError("Tenant could not be found: [tenant-id] %s" % str(tenant_id))
-
- return tenant.tenant_domain
-
- @staticmethod
- def validate_repo_path(app_path):
- # app path would be ex: /var/www, or /opt/server/data
- return os.access(app_path, os.W_OK)
+ add_properties(service.properties, plugin_values, "SERVICE_PROPERTY")
+ add_properties(cluster.properties, plugin_values, "CLUSTER_PROPERTY")
+ add_properties(member.properties, plugin_values, "MEMBER_PROPERTY")
+
+ plugin_values.update(Config.get_payload_params())
+
+ return clean_process_parameters(plugin_values)
+
+
+def add_properties(properties, params, prefix):
+ """
+ Adds the given property list to the parameters list with given prefix in the parameter name
+ :param dict[str, str] properties: service properties
+ :param dict[str, str] params:
+ :param str prefix:
+ :return: dict[str, str]
+ """
+ if properties is None or properties.items() is None:
+ return
+
+ for key in properties:
+ params[prefix + "_" + key] = str(properties[key])
+
+
+def get_lb_member_ip(lb_cluster_id):
+ topology = TopologyContext.get_topology()
+ services = topology.get_services()
+
+ for service in services:
+ clusters = service.get_clusters()
+ for cluster in clusters:
+ members = cluster.get_members()
+ for member in members:
+ if member.cluster_id == lb_cluster_id:
+ return member.member_default_private_ip, member.member_default_public_ip
+
+ return None, None
+
+
+def clean_process_parameters(params):
+ """
+ Removes any null valued parameters before passing them to the extension scripts
+ :param dict params:
+ :return: cleaned parameters
+ :rtype: dict
+ """
+ for key, value in params.items():
+ if value is None:
+ del params[key]
+
+ return params
+
+
+def find_tenant_domain(tenant_id):
+ tenant = TenantContext.get_tenant(tenant_id)
+ if tenant is None:
+ raise RuntimeError("Tenant could not be found: [tenant-id] %s" % str(tenant_id))
+
+ return tenant.tenant_domain
+def validate_repo_path(app_path):
+ # app path would be ex: /var/www, or /opt/server/data
+ return os.access(app_path, os.W_OK)
class PluginExecutor(Thread):
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
index a24650a..5b6190e 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
@@ -15,10 +15,10 @@
# specific language governing permissions and limitations
# under the License.
-import threading
-
+from threading import Thread
import paho.mqtt.publish as publish
import time
+from Queue import Queue, Empty
import constants
import healthstats
@@ -201,7 +201,7 @@ def get_publisher(topic):
return publishers[topic]
-class EventPublisher:
+class EventPublisher(object):
"""
Handles publishing events to topics to the provided message broker
"""
@@ -210,24 +210,9 @@ class EventPublisher:
self.__topic = topic
self.__log = LogFactory().get_log(__name__)
self.__start_time = int(time.time())
+ self.__msg_queue = Queue()
def publish(self, event):
- publisher_thread = threading.Thread(target=self.__publish_event, args=(event,))
- publisher_thread.start()
-
- def __publish_event(self, event):
- """
- Publishes the given event to the message broker.
-
- When a list of message brokers are given the event is published to the first message broker
- available. Therefore the message brokers should share the data (ex: Sharing the KahaDB in ActiveMQ).
-
- When the event cannot be published, it will be retried until the mb_publisher_timeout is exceeded.
- This value is set in the agent.conf.
-
- :param event:
- :return: True if the event was published.
- """
if Config.mb_username is None:
auth = None
else:
@@ -244,20 +229,59 @@ class EventPublisher:
for mb_url in Config.mb_urls:
mb_ip, mb_port = mb_url.split(":")
+ # start a thread to execute publish event
+ publisher_thread = Thread(target=self.__publish_event, args=(event, mb_ip, mb_port, auth, payload))
+ publisher_thread.start()
+
+ # give sometime for the thread to complete
+ time.sleep(5)
+
+ # check if thread is still running and notify
+ if publisher_thread.isAlive():
+ self.__log.debug(
+ "Event publishing timed out before succeeding. The message broker could be offline.")
+
+ # check if publish.single() succeeded
try:
- publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port, auth=auth)
- self.__log.debug("Event type: %s published to MB: %s:%s" % (str(event.__class__), mb_ip, mb_port))
+ published = self.__msg_queue.get(block=False)
+ except Empty:
+ published = False
+
+ if published:
return True
- except:
- self.__log.debug(
- "Could not publish event to message broker %s:%s." % (mb_ip, mb_port))
+ # All the brokers on the list were offline
self.__log.debug(
"Could not publish event to any of the provided message brokers. Retrying in %s seconds."
% retry_interval)
time.sleep(retry_interval)
+ # Even publisher timeout exceeded
self.__log.warn("Could not publish event to any of the provided message brokers before "
"the timeout [%s] exceeded. The event will be dropped." % Config.mb_publisher_timeout)
return False
+
+ def __publish_event(self, event, mb_ip, mb_port, auth, payload):
+ """
+ Publishes the given event to the message broker.
+
+ When a list of message brokers are given the event is published to the first message broker
+ available. Therefore the message brokers should share the data (ex: Sharing the KahaDB in ActiveMQ).
+
+ When the event cannot be published, it will be retried until the mb_publisher_timeout is exceeded.
+ This value is set in the agent.conf.
+
+ :param event:
+ :return: True if the event was published.
+ """
+ try:
+ self.__log.debug("Publishing [event] %s to %s:%s" % (event.__class__.__name__, mb_ip, mb_port))
+ publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port, auth=auth)
+ self.__log.debug("[Event] %s published to MB: %s:%s" % (str(event.__class__.__name__), mb_ip, mb_port))
+ self.__msg_queue.put(True)
+ except Exception as err:
+ self.__log.debug(
+ "Could not publish [event] %s to message broker %s:%s. : %s"
+ % (str(event.__class__.__name__), mb_ip, mb_port, err))
+ self.__msg_queue.put(False)
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
index ff5cef9..c5a6d2d 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
@@ -178,7 +178,6 @@ class EventExecutor(threading.Thread):
def __init__(self, event_queue):
threading.Thread.__init__(self)
self.__event_queue = event_queue
- # TODO: several handlers for one event
self.__event_handlers = {}
EventSubscriber.log = LogFactory().get_log(__name__)
@@ -191,10 +190,9 @@ class EventExecutor(threading.Thread):
try:
EventSubscriber.log.debug("Executing handler for event %r" % event)
handler(event_msg)
- except:
- EventSubscriber.log.exception("Error processing %r event" % event)
+ except Exception as err:
+ EventSubscriber.log.exception("Error processing %r event: %s" % (event, err))
else:
-
EventSubscriber.log.debug("Event handler not found for event : %r" % event)
def register_event_handler(self, event, handler):
@@ -226,7 +224,7 @@ class MessageBrokerHeartBeatChecker(AbstractAsyncScheduledTask):
try:
self.__mb_client.connect(self.__mb_ip, self.__mb_port, 60)
self.__mb_client.disconnect()
- except:
+ except Exception:
self.__log.info(
"Message broker %s:%s cannot be reached. Disconnecting client..." % (self.__mb_ip, self.__mb_port))
self.__connected_client.disconnect()
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
index 2398099..ab4975a 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
@@ -67,6 +67,11 @@ public class ADCExtensionTestCase extends PythonAgentIntegrationTest {
startServerSocket(8080);
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
/**
* TearDown method for test method testPythonCartridgeAgent
*/
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
index 6e40dd6..05d5ba2 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
@@ -70,6 +70,11 @@ public class ADCMTAppTenantUserTestCase extends PythonAgentIntegrationTest {
startServerSocket(8080);
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
/**
* TearDown method for test method testPythonCartridgeAgent
*/
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
index 6f0b070..444a5e0 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
@@ -72,6 +72,11 @@ public class ADCMTAppTestCase extends PythonAgentIntegrationTest {
startServerSocket(8080);
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
/**
* TearDown method for test method testPythonCartridgeAgent
*/
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
index 0dc92be..dba6197 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
@@ -63,6 +63,11 @@ public class ADCTestCase extends PythonAgentIntegrationTest {
public ADCTestCase() throws IOException {
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
@BeforeMethod(alwaysRun = true)
public void setupADCTest() throws Exception {
log.info("Setting up ADCTestCase");
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
index ea156b6..db21359 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
@@ -79,6 +79,11 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
startServerSocket(8080);
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
/**
* TearDown method for test method testPythonCartridgeAgent
*/
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
index 44d295b..ce13d3f 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
@@ -76,6 +76,11 @@ public class CEPHAModeTestCase extends PythonAgentIntegrationTest {
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
@BeforeMethod(alwaysRun = true)
public void setupCEPHAModeTest() throws Exception {
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
index b1f4d8b..8c72f2d 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
@@ -42,6 +42,11 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
public MessageBrokerHATestCase() throws IOException {
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
private static final Log log = LogFactory.getLog(MessageBrokerHATestCase.class);
private static final int HA_TEST_TIMEOUT = 300000;
private static final String CLUSTER_ID = "php.php.domain";
@@ -169,10 +174,11 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
MEMBER_ID);
publishEvent(instanceCleanupMemberEvent);
publishCleanupEvent = true;
- waitUntilCleanupEventIsReceivedAndStopDefaultMB();
+
+ stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]);
}
- if (line.contains("Could not publish event to message broker localhost:1885.")) {
+ if (line.contains("Could not publish [event] ")) {
log.info("Event publishing to default message broker failed and the next option is tried.");
exit = true;
}
@@ -186,26 +192,6 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
log.info("MessageBrokerHATestCase publisher test completed successfully.");
}
- private void waitUntilCleanupEventIsReceivedAndStopDefaultMB() {
- boolean eventReceived = false;
- List<String> outputLines = new ArrayList<>();
-
- while (!eventReceived) {
- List<String> newLines = getNewLines(outputLines, outputStream.toString());
- if (newLines.size() > 0) {
- for (String line : newLines) {
- if (line.contains("Message received: instance/notifier/InstanceCleanupMemberEvent")) {
- // take down the default broker
- stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]);
- eventReceived = true;
- }
- }
- }
- log.info("Waiting until cleanup event is received by PCA...");
- }
- log.info("Cleanup event is received by PCA.");
- }
-
private void assertAgentActivation() {
pcaActivated = false;
instanceActivated = false;