You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ga...@apache.org on 2015/12/08 16:18:11 UTC
[28/50] [abbrv] stratos git commit: PCA - Validate agent.conf entries
when loading Config, refactor handler methods for mb events, fix util methods,
fix exception initialization
PCA - Validate agent.conf entries when loading Config, refactor handler methods for mb events, fix util methods, fix exception initialization
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/a242b52b
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/a242b52b
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/a242b52b
Branch: refs/heads/master
Commit: a242b52be5452ba03a2234333c62810d3ea67cfb
Parents: 64f05a5
Author: Chamila de Alwis <ch...@apache.org>
Authored: Tue Dec 1 17:35:25 2015 +0530
Committer: Chamila de Alwis <ch...@apache.org>
Committed: Tue Dec 1 17:35:34 2015 +0530
----------------------------------------------------------------------
.../cartridge.agent/cartridge.agent/agent.py | 144 +++++++++++--------
.../cartridge.agent/cartridge.agent/config.py | 83 ++++++++++-
.../cartridge.agent/exception.py | 46 +++---
.../cartridge.agent/healthstats.py | 34 +----
.../cartridge.agent/logpublisher.py | 2 +-
.../modules/artifactmgt/git/agentgithandler.py | 4 +-
.../modules/event/application/signup/events.py | 3 -
.../modules/event/instance/notifier/events.py | 2 +-
.../modules/event/tenant/events.py | 2 +-
.../modules/util/cartridgeagentutils.py | 75 ++++++----
.../cartridge.agent/publisher.py | 4 +-
11 files changed, 250 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
index 6b81dff..7f95f26 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
@@ -30,12 +30,10 @@ from subscriber import EventSubscriber
class CartridgeAgent(object):
def __init__(self):
Config.initialize_config()
- self.__tenant_context_initialized = False
- self.__log_publish_manager = None
self.__terminated = False
self.__log = LogFactory().get_log(__name__)
- mb_urls = Config.mb_urls
+ mb_urls = Config.mb_urls.split(",")
mb_uname = Config.mb_username
mb_pwd = Config.mb_password
@@ -87,8 +85,8 @@ class CartridgeAgent(object):
# Execute start servers extension
try:
event_handler.start_server_extension()
- except Exception as e:
- self.__log.exception("Error processing start servers event: %s" % e)
+ except Exception as ex:
+ self.__log.exception("Error processing start servers event: %s" % ex)
# check if artifact management is required before publishing instance activated event
repo_url = Config.repo_url
@@ -107,21 +105,22 @@ class CartridgeAgent(object):
event_handler.volume_mount_extension(persistence_mapping_payload)
# start log publishing thread
+ log_publish_manager = None
if DataPublisherConfiguration.get_instance().enabled:
log_file_paths = Config.log_file_paths
if log_file_paths is None:
self.__log.exception("No valid log file paths found, no logs will be published")
else:
self.__log.debug("Starting Log Publisher Manager: [Log file paths] %s" % ", ".join(log_file_paths))
- self.__log_publish_manager = LogPublisherManager(log_file_paths)
- self.__log_publish_manager.start()
+ log_publish_manager = LogPublisherManager(log_file_paths)
+ log_publish_manager.start()
# run until terminated
while not self.__terminated:
time.sleep(5)
if DataPublisherConfiguration.get_instance().enabled:
- self.__log_publish_manager.terminate_all_publishers()
+ log_publish_manager.terminate_all_publishers()
def terminate(self):
"""
@@ -134,9 +133,10 @@ class CartridgeAgent(object):
def register_instance_topic_listeners(self):
self.__log.debug("Starting instance notifier event message receiver thread")
- self.__inst_topic_subscriber.register_handler("ArtifactUpdatedEvent", self.on_artifact_updated)
- self.__inst_topic_subscriber.register_handler("InstanceCleanupMemberEvent", self.on_instance_cleanup_member)
- self.__inst_topic_subscriber.register_handler("InstanceCleanupClusterEvent", self.on_instance_cleanup_cluster)
+ self.__inst_topic_subscriber.register_handler("ArtifactUpdatedEvent", Handlers.on_artifact_updated)
+ self.__inst_topic_subscriber.register_handler("InstanceCleanupMemberEvent", Handlers.on_instance_cleanup_member)
+ self.__inst_topic_subscriber.register_handler(
+ "InstanceCleanupClusterEvent", Handlers.on_instance_cleanup_cluster)
self.__inst_topic_subscriber.start()
self.__log.info("Instance notifier event message receiver thread started")
@@ -148,13 +148,13 @@ class CartridgeAgent(object):
def register_topology_event_listeners(self):
self.__log.debug("Starting topology event message receiver thread")
- self.__topology_event_subscriber.register_handler("MemberActivatedEvent", self.on_member_activated)
- self.__topology_event_subscriber.register_handler("MemberTerminatedEvent", self.on_member_terminated)
- self.__topology_event_subscriber.register_handler("MemberSuspendedEvent", self.on_member_suspended)
- self.__topology_event_subscriber.register_handler("CompleteTopologyEvent", self.on_complete_topology)
- self.__topology_event_subscriber.register_handler("MemberStartedEvent", self.on_member_started)
- self.__topology_event_subscriber.register_handler("MemberCreatedEvent", self.on_member_created)
- self.__topology_event_subscriber.register_handler("MemberInitializedEvent", self.on_member_initialized)
+ self.__topology_event_subscriber.register_handler("MemberActivatedEvent", Handlers.on_member_activated)
+ self.__topology_event_subscriber.register_handler("MemberTerminatedEvent", Handlers.on_member_terminated)
+ self.__topology_event_subscriber.register_handler("MemberSuspendedEvent", Handlers.on_member_suspended)
+ self.__topology_event_subscriber.register_handler("CompleteTopologyEvent", Handlers.on_complete_topology)
+ self.__topology_event_subscriber.register_handler("MemberStartedEvent", Handlers.on_member_started)
+ self.__topology_event_subscriber.register_handler("MemberCreatedEvent", Handlers.on_member_created)
+ self.__topology_event_subscriber.register_handler("MemberInitializedEvent", Handlers.on_member_initialized)
self.__topology_event_subscriber.start()
self.__log.info("Cartridge agent topology receiver thread started")
@@ -166,11 +166,11 @@ class CartridgeAgent(object):
def register_tenant_event_listeners(self):
self.__log.debug("Starting tenant event message receiver thread")
self.__tenant_topic_subscriber.register_handler("DomainMappingAddedEvent",
- self.on_domain_mapping_added)
+ Handlers.on_domain_mapping_added)
self.__tenant_topic_subscriber.register_handler("DomainsMappingRemovedEvent",
- self.on_domain_mapping_removed)
- self.__tenant_topic_subscriber.register_handler("CompleteTenantEvent", self.on_complete_tenant)
- self.__tenant_topic_subscriber.register_handler("TenantSubscribedEvent", self.on_tenant_subscribed)
+ Handlers.on_domain_mapping_removed)
+ self.__tenant_topic_subscriber.register_handler("CompleteTenantEvent", Handlers.on_complete_tenant)
+ self.__tenant_topic_subscriber.register_handler("TenantSubscribedEvent", Handlers.on_tenant_subscribed)
self.__tenant_topic_subscriber.start()
self.__log.info("Tenant event message receiver thread started")
@@ -182,7 +182,7 @@ class CartridgeAgent(object):
def register_application_signup_event_listeners(self):
self.__log.debug("Starting application signup event message receiver thread")
self.__app_topic_subscriber.register_handler("ApplicationSignUpRemovedEvent",
- self.on_application_signup_removed)
+ Handlers.on_application_signup_removed)
self.__app_topic_subscriber.start()
self.__log.info("Application signup event message receiver thread started")
@@ -191,18 +191,36 @@ class CartridgeAgent(object):
while not self.__app_topic_subscriber.is_subscribed():
time.sleep(1)
- def on_artifact_updated(self, msg):
+ def wait_for_complete_topology(self):
+ while not TopologyContext.topology.initialized:
+ self.__log.info("Waiting for complete topology event...")
+ time.sleep(5)
+ self.__log.info("Complete topology event received")
+
+
+class Handlers(object):
+ """
+ Handler methods for message broker events
+ """
+
+ __log = LogFactory().get_log(__name__)
+ __tenant_context_initialized = False
+
+ @staticmethod
+ def on_artifact_updated(msg):
event_obj = ArtifactUpdatedEvent.create_from_json(msg.payload)
event_handler.on_artifact_updated_event(event_obj)
- def on_instance_cleanup_member(self, msg):
+ @staticmethod
+ def on_instance_cleanup_member(msg):
member_in_payload = Config.member_id
event_obj = InstanceCleanupMemberEvent.create_from_json(msg.payload)
member_in_event = event_obj.member_id
if member_in_payload == member_in_event:
event_handler.on_instance_cleanup_member_event()
- def on_instance_cleanup_cluster(self, msg):
+ @staticmethod
+ def on_instance_cleanup_cluster(msg):
event_obj = InstanceCleanupClusterEvent.create_from_json(msg.payload)
cluster_in_payload = Config.cluster_id
cluster_in_event = event_obj.cluster_id
@@ -212,11 +230,13 @@ class CartridgeAgent(object):
if cluster_in_event == cluster_in_payload and instance_in_payload == instance_in_event:
event_handler.on_instance_cleanup_cluster_event()
- def on_member_created(self, msg):
- self.__log.debug("Member created event received: %r" % msg.payload)
+ @staticmethod
+ def on_member_created(msg):
+ Handlers.__log.debug("Member created event received: %r" % msg.payload)
- def on_member_initialized(self, msg):
- self.__log.debug("Member initialized event received: %r" % msg.payload)
+ @staticmethod
+ def on_member_initialized(msg):
+ Handlers.__log.debug("Member initialized event received: %r" % msg.payload)
event_obj = MemberInitializedEvent.create_from_json(msg.payload)
if not TopologyContext.topology.initialized:
@@ -224,84 +244,88 @@ class CartridgeAgent(object):
event_handler.on_member_initialized_event(event_obj)
- def on_member_activated(self, msg):
- self.__log.debug("Member activated event received: %r" % msg.payload)
+ @staticmethod
+ def on_member_activated(msg):
+ Handlers.__log.debug("Member activated event received: %r" % msg.payload)
if not TopologyContext.topology.initialized:
return
event_obj = MemberActivatedEvent.create_from_json(msg.payload)
event_handler.on_member_activated_event(event_obj)
- def on_member_terminated(self, msg):
- self.__log.debug("Member terminated event received: %r" % msg.payload)
+ @staticmethod
+ def on_member_terminated(msg):
+ Handlers.__log.debug("Member terminated event received: %r" % msg.payload)
if not TopologyContext.topology.initialized:
return
event_obj = MemberTerminatedEvent.create_from_json(msg.payload)
event_handler.on_member_terminated_event(event_obj)
- def on_member_suspended(self, msg):
- self.__log.debug("Member suspended event received: %r" % msg.payload)
+ @staticmethod
+ def on_member_suspended(msg):
+ Handlers.__log.debug("Member suspended event received: %r" % msg.payload)
if not TopologyContext.topology.initialized:
return
event_obj = MemberSuspendedEvent.create_from_json(msg.payload)
event_handler.on_member_suspended_event(event_obj)
- def on_complete_topology(self, msg):
+ @staticmethod
+ def on_complete_topology(msg):
event_obj = CompleteTopologyEvent.create_from_json(msg.payload)
TopologyContext.update(event_obj.topology)
if not TopologyContext.topology.initialized:
- self.__log.info("Topology initialized from complete topology event")
+ Handlers.__log.info("Topology initialized from complete topology event")
TopologyContext.topology.initialized = True
event_handler.on_complete_topology_event(event_obj)
- self.__log.debug("Topology context updated with [topology] %r" % event_obj.topology.json_str)
+ Handlers.__log.debug("Topology context updated with [topology] %r" % event_obj.topology.json_str)
- def on_member_started(self, msg):
- self.__log.debug("Member started event received: %r" % msg.payload)
+ @staticmethod
+ def on_member_started(msg):
+ Handlers.__log.debug("Member started event received: %r" % msg.payload)
if not TopologyContext.topology.initialized:
return
event_obj = MemberStartedEvent.create_from_json(msg.payload)
event_handler.on_member_started_event(event_obj)
- def on_domain_mapping_added(self, msg):
- self.__log.debug("Subscription domain added event received : %r" % msg.payload)
+ @staticmethod
+ def on_domain_mapping_added(msg):
+ Handlers.__log.debug("Subscription domain added event received : %r" % msg.payload)
event_obj = DomainMappingAddedEvent.create_from_json(msg.payload)
event_handler.on_domain_mapping_added_event(event_obj)
- def on_domain_mapping_removed(self, msg):
- self.__log.debug("Subscription domain removed event received : %r" % msg.payload)
+ @staticmethod
+ def on_domain_mapping_removed(msg):
+ Handlers.__log.debug("Subscription domain removed event received : %r" % msg.payload)
event_obj = DomainMappingRemovedEvent.create_from_json(msg.payload)
event_handler.on_domain_mapping_removed_event(event_obj)
- def on_complete_tenant(self, msg):
+ @staticmethod
+ def on_complete_tenant(msg):
event_obj = CompleteTenantEvent.create_from_json(msg.payload)
TenantContext.update(event_obj.tenants)
- if not self.__tenant_context_initialized:
- self.__log.info("Tenant context initialized from complete tenant event")
- self.__tenant_context_initialized = True
+ if not Handlers.__tenant_context_initialized:
+ Handlers.__log.info("Tenant context initialized from complete tenant event")
+ Handlers.__tenant_context_initialized = True
event_handler.on_complete_tenant_event(event_obj)
- self.__log.debug("Tenant context updated with [tenant list] %r" % event_obj.tenant_list_json)
+ Handlers.__log.debug("Tenant context updated with [tenant list] %r" % event_obj.tenant_list_json)
- def on_tenant_subscribed(self, msg):
- self.__log.debug("Tenant subscribed event received: %r" % msg.payload)
+ @staticmethod
+ def on_tenant_subscribed(msg):
+ Handlers.__log.debug("Tenant subscribed event received: %r" % msg.payload)
event_obj = TenantSubscribedEvent.create_from_json(msg.payload)
event_handler.on_tenant_subscribed_event(event_obj)
- def on_application_signup_removed(self, msg):
- self.__log.debug("Application signup removed event received: %r" % msg.payload)
+ @staticmethod
+ def on_application_signup_removed(msg):
+ Handlers.__log.debug("Application signup removed event received: %r" % msg.payload)
event_obj = ApplicationSignUpRemovedEvent.create_from_json(msg.payload)
event_handler.on_application_signup_removed_event(event_obj)
- def wait_for_complete_topology(self):
- while not TopologyContext.topology.initialized:
- self.__log.info("Waiting for complete topology event...")
- time.sleep(5)
- self.__log.info("Complete topology event received")
-
if __name__ == "__main__":
log = LogFactory().get_log(__name__)
http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
index f1a70ec..72fc5e2 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
@@ -22,7 +22,7 @@ import sys
from yapsy.PluginManager import PluginManager
from modules.util.log import LogFactory
-from exception import ParameterNotFoundException
+from exception import ParameterNotFoundException, InvalidConfigValueException
import constants
from plugins.contracts import ICartridgeAgentPlugin, IArtifactCommitPlugin, IArtifactCheckoutPlugin, \
IHealthStatReaderPlugin
@@ -148,6 +148,18 @@ class Config:
""" :type : str """
mb_publisher_timeout = None
""" :type : int """
+ cep_username = None
+ """ :type : str """
+ cep_password = None
+ """ :type : str """
+ cep_urls = []
+ """ :type : list """
+ artifact_clone_retry_count = None
+ """ :type : str """
+ artifact_clone_retry_interval = None
+ """ :type : str """
+ port_check_timeout = None
+ """ :type : str """
@staticmethod
def read_conf_file():
@@ -351,8 +363,29 @@ class Config:
Config.mb_username = Config.read_property(constants.MB_USERNAME, False)
Config.mb_password = Config.read_property(constants.MB_PASSWORD, False)
- Config.mb_urls = Config.read_property(constants.MB_URLS).split(",")
+ Config.mb_urls = Config.read_property(constants.MB_URLS)
Config.mb_publisher_timeout = int(Config.read_property(constants.MB_PUBLISHER_TIMEOUT))
+
+ Config.cep_username = Config.read_property(constants.CEP_SERVER_ADMIN_USERNAME)
+ Config.cep_password = Config.read_property(constants.CEP_SERVER_ADMIN_PASSWORD)
+ Config.cep_urls = Config.read_property(constants.CEP_RECEIVER_URLS)
+
+ try:
+ Config.artifact_clone_retry_count = Config.read_property(constants.ARTIFACT_CLONE_RETRIES)
+ except ParameterNotFoundException:
+ Config.artifact_clone_retry_count = "5"
+
+ try:
+ Config.artifact_clone_retry_interval = Config.read_property(constants.ARTIFACT_CLONE_INTERVAL)
+ except ParameterNotFoundException:
+ Config.artifact_clone_retry_interval = "10"
+
+ try:
+ Config.port_check_timeout = Config.read_property(constants.PORT_CHECK_TIMEOUT)
+ except ParameterNotFoundException:
+ Config.port_check_timeout = "600000"
+
+ Config.validate_config()
except ParameterNotFoundException as ex:
raise RuntimeError(ex)
@@ -381,6 +414,52 @@ class Config:
Config.extension_executor = Config.initialize_extensions()
@staticmethod
+ def validate_config():
+ try:
+ Config.validate_url_list(Config.mb_urls, constants.MB_URLS)
+ Config.validate_int(Config.mb_publisher_timeout, constants.MB_PUBLISHER_TIMEOUT)
+ Config.validate_url_list(Config.cep_urls, constants.CEP_RECEIVER_URLS)
+ Config.validate_int(Config.artifact_update_interval, constants.ARTIFACT_UPDATE_INTERVAL)
+ Config.validate_int(Config.artifact_clone_retry_count, constants.ARTIFACT_CLONE_RETRIES)
+ Config.validate_int(Config.artifact_clone_retry_interval, constants.ARTIFACT_CLONE_INTERVAL)
+ Config.validate_int(Config.port_check_timeout, constants.PORT_CHECK_TIMEOUT)
+ except ValueError as err:
+ raise InvalidConfigValueException("Invalid configuration for Cartridge Agent", err)
+
+ @staticmethod
+ def validate_url_list(urls, field_name):
+ """
+ host1:port1,host2:port2
+
+ :param urls:
+ :param field_name:
+ :return:
+ """
+ url_list = str(urls).split(",")
+ if len(url_list) < 1:
+ raise ValueError("Invalid value [field] \"%s\"" % field_name)
+
+ for single_url in url_list:
+ try:
+ url_ip, url_port = single_url.split(":")
+ except ValueError:
+ raise ValueError("Invalid host or port number value for [field] %s", field_name)
+
+ @staticmethod
+ def validate_int(int_value, field_name):
+ """
+ valid integer value
+
+ :param int_value:
+ :param field_name:
+ :return:
+ """
+ try:
+ int(int_value)
+ except ValueError:
+ raise ValueError("Invalid int value for [field] %s " % field_name)
+
+ @staticmethod
def initialize_plugins():
""" Find, load, activate and group plugins for Python CA
:return: a tuple of (PluginManager, plugins, artifact management plugins)
http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
index 1586c4d..345efee 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
@@ -21,17 +21,8 @@ class CartridgeAgentException(Exception):
Exception super class to be used by specific exceptions thrown in the cartridge agent
"""
- def __init__(self, message):
- super(CartridgeAgentException, self).__init__(message)
- self.__message = message
-
- def get_message(self):
- """
- The message provided when the exception is raised
- :return: message
- :rtype: str
- """
- return self.__message
+ def __init__(self, *args, **kwargs):
+ Exception.__init__(self, *args, **kwargs)
class DataPublisherException(CartridgeAgentException):
@@ -39,8 +30,8 @@ class DataPublisherException(CartridgeAgentException):
Exception to be used during log publishing operations
"""
- def __init__(self, message):
- super(DataPublisherException, self).__init__(message)
+ def __init__(self, *args, **kwargs):
+ CartridgeAgentException.__init__(self, *args, **kwargs)
class PluginExecutionException(CartridgeAgentException):
@@ -48,8 +39,8 @@ class PluginExecutionException(CartridgeAgentException):
Exception raised when a runtime error is met while executing an plugin script
"""
- def __init__(self, message):
- super(PluginExecutionException, self).__init__(message)
+ def __init__(self, *args, **kwargs):
+ CartridgeAgentException.__init__(self, *args, **kwargs)
class GitRepositorySynchronizationException(CartridgeAgentException):
@@ -57,8 +48,8 @@ class GitRepositorySynchronizationException(CartridgeAgentException):
Exception raised during a git repository related task
"""
- def __init__(self, message):
- super(GitRepositorySynchronizationException, self).__init__(message)
+ def __init__(self, *args, **kwargs):
+ CartridgeAgentException.__init__(self, *args, **kwargs)
class ParameterNotFoundException(CartridgeAgentException):
@@ -67,8 +58,8 @@ class ParameterNotFoundException(CartridgeAgentException):
of the cartridge agent
"""
- def __init__(self, message):
- super(ParameterNotFoundException, self).__init__(message)
+ def __init__(self, *args, **kwargs):
+ CartridgeAgentException.__init__(self, *args, **kwargs)
class ThriftReceiverOfflineException(CartridgeAgentException):
@@ -76,8 +67,8 @@ class ThriftReceiverOfflineException(CartridgeAgentException):
Exception raised when the connection to the Thrift receiver is dropped when publishing events
"""
- def __init__(self, message):
- super(ThriftReceiverOfflineException, self).__init__(message)
+ def __init__(self, *args, **kwargs):
+ CartridgeAgentException.__init__(self, *args, **kwargs)
class CEPPublisherException(CartridgeAgentException):
@@ -85,5 +76,14 @@ class CEPPublisherException(CartridgeAgentException):
Exception to be used during CEP publishing operations
"""
- def __init__(self, message):
- super(CEPPublisherException, self).__init__(message)
+ def __init__(self, *args, **kwargs):
+ CartridgeAgentException.__init__(self, *args, **kwargs)
+
+
+class InvalidConfigValueException(CartridgeAgentException):
+ """
+ Exception to be used when validating agent configuration
+ """
+
+ def __init__(self, *args, **kwargs):
+ CartridgeAgentException.__init__(self, *args, **kwargs)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
index 71f2894..2005537 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
@@ -62,7 +62,7 @@ class HealthStatisticsPublisherManager(Thread):
self.log.debug("Publishing load average: %r" % cartridge_stats.load_avg)
self.publisher.publish_load_average(cartridge_stats.load_avg)
- except Exception as e:
+ except Exception:
self.log.exception(
"Couldn't publish health statistics to CEP. Thrift Receiver offline. Reconnecting...")
self.publisher = HealthStatisticsPublisher()
@@ -76,33 +76,17 @@ class HealthStatisticsPublisher:
"""
log = LogFactory().get_log(__name__)
- @staticmethod
- def read_config(conf_key):
- """
- Read a given key from the cartridge agent configuration
- :param conf_key: The key to look for in the CA config
- :return: The value for the key from the CA config
- :raise: RuntimeError if the given key is not found in the CA config
- """
- conf_value = Config.read_property(conf_key, False)
-
- if conf_value is None or conf_value.strip() == "":
- raise RuntimeError("System property not found: " + conf_key)
-
- return conf_value
-
def __init__(self):
self.publishers = []
self.deactive_publishers = []
- self.cep_admin_username = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_USERNAME)
- self.cep_admin_password = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_PASSWORD)
+ self.cep_admin_username = Config.cep_username
+ self.cep_admin_password = Config.cep_password
self.stream_definition = HealthStatisticsPublisher.create_stream_definition()
HealthStatisticsPublisher.log.debug("Stream definition created: %r" % str(self.stream_definition))
# 1.1.1.1:1883,2.2.2.2:1883
- cep_urls = HealthStatisticsPublisher.read_config(constants.CEP_RECEIVER_URLS)
- cep_urls = cep_urls.split(',')
+ cep_urls = Config.cep_urls.split(',')
for cep_url in cep_urls:
cep_active = self.is_cep_active(cep_url)
@@ -207,14 +191,10 @@ class HealthStatisticsPublisher:
return true if active
:param cep_url:
"""
- ports = []
cep_ip = cep_url.split(':')[0]
cep_port = cep_url.split(':')[1]
- ports.append(cep_port)
- cep_active = cartridgeagentutils.check_ports_active(
- cep_ip,
- ports)
+ cep_active = cartridgeagentutils.check_port_active(cep_ip, cep_port)
return cep_active
@@ -229,8 +209,8 @@ class HealthStatisticsPublisher:
except Exception as ex:
raise ThriftReceiverOfflineException(ex)
- deactive_ceps = self.deactive_publishers
- for cep_url in deactive_ceps:
+ inactive_ceps = self.deactive_publishers
+ for cep_url in inactive_ceps:
cep_active = self.is_cep_active(cep_url)
if cep_active:
self.add_publishers(cep_url)
http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
index e7ab1c7..b9f5d63 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
@@ -145,7 +145,7 @@ class LogPublisherManager(Thread):
ports_active = cartridgeagentutils.wait_until_ports_active(
DataPublisherConfiguration.get_instance().monitoring_server_ip,
self.ports,
- int(Config.read_property("port.check.timeout", critical=False)))
+ int(Config.port_check_timeout))
if not ports_active:
self.log.debug("Monitoring server is not active")
http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
index 2170a33..15d3733 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
@@ -168,8 +168,8 @@ class AgentGitHandler:
"""
git_clone_successful = False
# Read properties from agent.conf
- max_retry_attempts = int(Config.read_property(constants.ARTIFACT_CLONE_RETRIES, 5))
- retry_interval = int(Config.read_property(constants.ARTIFACT_CLONE_INTERVAL, 10))
+ max_retry_attempts = int(Config.artifact_clone_retry_count)
+ retry_interval = int(Config.artifact_clone_retry_interval)
retry_attempts = 0
# Iterate until git clone is successful or reaches max retry attempts
http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py
index f44dd04..13bb680 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py
@@ -27,7 +27,6 @@ class ApplicationSignUpAddedEvent:
self.clusterIds = None
""" :type : list[str] """
-
@staticmethod
def create_from_json(json_str):
json_obj = json.loads(json_str)
@@ -49,7 +48,6 @@ class ApplicationSignUpRemovedEvent:
self.clusterIds = None
""" :type : list[str] """
-
@staticmethod
def create_from_json(json_str):
json_obj = json.loads(json_str)
@@ -59,4 +57,3 @@ class ApplicationSignUpRemovedEvent:
app_signup_removed.tenantId = json_obj["tenantId"] if "tenantId" in json_obj else None
return app_signup_removed
-
http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py
index 41dc133..d19cd0c 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py
@@ -79,4 +79,4 @@ class InstanceCleanupMemberEvent:
json_obj = json.loads(json_str)
m_id = json_obj["memberId"] if "memberId" in json_obj else None
- return InstanceCleanupMemberEvent(m_id)
\ No newline at end of file
+ return InstanceCleanupMemberEvent(m_id)
http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py
index 2e287bd..8a49bc2 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py
@@ -84,4 +84,4 @@ class TenantUnsubscribedEvent:
instance.service_name = json_obj["serviceName"] if "serviceName" in json_obj else None
instance.cluster_ids = json_obj["clusterIds"] if "clusterIds" in json_obj else None
- return instance
\ No newline at end of file
+ return instance
http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
index 79bc6c5..e5ad877 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
@@ -24,12 +24,18 @@ import hashlib
from log import LogFactory
+BS = 16
+
log = LogFactory().get_log(__name__)
-unpad = lambda s: s[0:-ord(s[-1])]
-current_milli_time = lambda: int(round(time.time() * 1000))
-BS = 16
-pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
+
+def unpad(s): return s[0:-ord(s[-1])]
+
+
+def current_milli_time(): return int(round(time.time() * 1000))
+
+
+def pad(s): return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
def decrypt_password(pass_str, secret):
@@ -77,11 +83,26 @@ def wait_until_ports_active(ip_address, ports, ports_check_timeout=600000):
log.debug("Port check timeout: %s" % ports_check_timeout)
- active = False
+ ports_left = ports
start_time = current_milli_time()
- while not active:
+
+ # check ports until all are active or timeout exceeds
+ while True:
log.info("Waiting for ports to be active: [ip] %s [ports] %s" % (ip_address, ports))
- active = check_ports_active(ip_address, ports)
+
+ # check each port for activity
+ for checking_port in list(ports_left):
+ port_active = check_port_active(ip_address, checking_port)
+ if port_active:
+ log.debug("Port %s on host %s active" % (checking_port, ip_address))
+ ports_left.remove(checking_port)
+
+ # if no ports are left to check for activity, return
+ if len(ports_left) == 0:
+ log.info("Ports activated: [ip] %r [ports] %r" % (ip_address, ports))
+ return True
+
+ # active = check_ports_active(ip_address, ports)
end_time = current_milli_time()
duration = end_time - start_time
@@ -92,33 +113,33 @@ def wait_until_ports_active(ip_address, ports, ports_check_timeout=600000):
time.sleep(5)
- log.info("Ports activated: [ip] %r [ports] %r" % (ip_address, ports))
- return True
-
-def check_ports_active(ip_address, ports):
+def check_port_active(ip_address, port):
"""
- Checks the given list of port addresses for active state
+ Checks the given port on the given host for activity
:param str ip_address: Ip address of the member to be checked
- :param list[str] ports: The list of ports to be checked
+ :param str port: The port to be checked
:return: True if the ports are active, False if at least one is not active
:rtype: bool
"""
- if len(ports) < 1:
- raise RuntimeError("No ports found")
-
- for port in ports:
- s = socket.socket()
- s.settimeout(5)
- try:
- s.connect((ip_address, int(port)))
- log.debug("Port %s is active" % port)
- s.close()
- except socket.error:
- log.debug("Port %s is not active" % port)
- return False
+ if port is None:
+ raise RuntimeError("Cannot check invalid port for activity")
+
+ try:
+ port_int = int(port)
+ except ValueError:
+ raise RuntimeError("Cannot check invalid port for activity %s" % port)
- return True
+ s = socket.socket()
+ s.settimeout(5)
+ try:
+ s.connect((ip_address, port_int))
+ log.debug("Port %s is active" % port)
+ s.close()
+ return True
+ except socket.error:
+ log.debug("Port %s is not active" % port)
+ return False
class IncrementalCeilingListIterator(object):
http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
index 5b6190e..229c354 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
@@ -71,7 +71,7 @@ def publish_instance_activated_event():
ports_active = cartridgeagentutils.wait_until_ports_active(
listen_address,
configuration_ports,
- int(Config.read_property("port.check.timeout", critical=False)))
+ int(Config.port_check_timeout))
if ports_active:
log.info("Publishing instance activated event...")
@@ -226,7 +226,7 @@ class EventPublisher(object):
while int(time.time()) - self.__start_time < (Config.mb_publisher_timeout * 1000):
retry_interval = retry_iterator.get_next_retry_interval()
- for mb_url in Config.mb_urls:
+ for mb_url in Config.mb_urls.split(","):
mb_ip, mb_port = mb_url.split(":")
# start a thread to execute publish event