You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2014/10/11 20:46:46 UTC
[22/50] [abbrv] Added new properties to the agent.conf Refactored
modules and dependencies to avoid circular dependencies Added singleton to
CartridgeAgentConfiguration by overriding __new__ Refactored hard coded
strings to constants file Cleaned up file
http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
index ea85d44..64021a3 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
@@ -14,6 +14,11 @@ class HealthStatisticsPublisherManager(Thread):
Read from an implementation of AbstractHealthStatisticsPublisher the value for memory usage and
load average and publishes them as ThriftEvents to a CEP server
"""
+ STREAM_NAME = "cartridge_agent_health_stats"
+ STREAM_VERSION = "1.0.0"
+ STREAM_NICKNAME = "agent health stats"
+ STREAM_DESCRIPTION = "agent health stats"
+
def __init__(self, publish_interval):
"""
Initializes a new HealthStatistsPublisherManager with a given number of seconds as the interval
@@ -54,7 +59,13 @@ class HealthStatisticsPublisher:
self.log = LogFactory().get_log(__name__)
self.ports = []
self.ports.append(CEPPublisherConfiguration.get_instance().server_port)
- cartridgeagentutils.wait_until_ports_active(CEPPublisherConfiguration.get_instance().server_ip, self.ports)
+
+ self.cartridge_agent_config = CartridgeAgentConfiguration()
+
+ cartridgeagentutils.wait_until_ports_active(
+ CEPPublisherConfiguration.get_instance().server_ip,
+ self.ports,
+ int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False)))
cep_active = cartridgeagentutils.check_ports_active(CEPPublisherConfiguration.get_instance().server_ip, self.ports)
if not cep_active:
raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.")
@@ -74,17 +85,17 @@ class HealthStatisticsPublisher:
Create a StreamDefinition for publishing to CEP
"""
stream_def = StreamDefinition()
- stream_def.name = "cartridge_agent_health_stats"
- stream_def.version = "1.0.0"
- stream_def.nickname = "agent health stats"
- stream_def.description = "agent health stats"
-
- stream_def.add_payloaddata_attribute("cluster_id", "STRING")
- stream_def.add_payloaddata_attribute("network_partition_id", "STRING")
- stream_def.add_payloaddata_attribute("member_id", "STRING")
- stream_def.add_payloaddata_attribute("partition_id", "STRING")
- stream_def.add_payloaddata_attribute("health_description", "STRING")
- stream_def.add_payloaddata_attribute("value", "DOUBLE")
+ stream_def.name = HealthStatisticsPublisherManager.STREAM_NAME
+ stream_def.version = HealthStatisticsPublisherManager.STREAM_VERSION
+ stream_def.nickname = HealthStatisticsPublisherManager.STREAM_NICKNAME
+ stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION
+
+ stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING)
+ stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING)
+ stream_def.add_payloaddata_attribute("member_id", StreamDefinition.STRING)
+ stream_def.add_payloaddata_attribute("partition_id", StreamDefinition.STRING)
+ stream_def.add_payloaddata_attribute("health_description", StreamDefinition.STRING)
+ stream_def.add_payloaddata_attribute("value", StreamDefinition.DOUBLE)
return stream_def
@@ -95,10 +106,10 @@ class HealthStatisticsPublisher:
"""
event = ThriftEvent()
- event.payloadData.append(CartridgeAgentConfiguration.cluster_id)
- event.payloadData.append(CartridgeAgentConfiguration.network_partition_id)
- event.payloadData.append(CartridgeAgentConfiguration.member_id)
- event.payloadData.append(CartridgeAgentConfiguration.partition_id)
+ event.payloadData.append(self.cartridge_agent_config.cluster_id)
+ event.payloadData.append(self.cartridge_agent_config.network_partition_id)
+ event.payloadData.append(self.cartridge_agent_config.member_id)
+ event.payloadData.append(self.cartridge_agent_config.partition_id)
event.payloadData.append(cartridgeagentconstants.MEMORY_CONSUMPTION)
event.payloadData.append(memory_usage)
@@ -112,10 +123,10 @@ class HealthStatisticsPublisher:
"""
event = ThriftEvent()
- event.payloadData.append(CartridgeAgentConfiguration.cluster_id)
- event.payloadData.append(CartridgeAgentConfiguration.network_partition_id)
- event.payloadData.append(CartridgeAgentConfiguration.member_id)
- event.payloadData.append(CartridgeAgentConfiguration.partition_id)
+ event.payloadData.append(self.cartridge_agent_config.cluster_id)
+ event.payloadData.append(self.cartridge_agent_config.network_partition_id)
+ event.payloadData.append(self.cartridge_agent_config.member_id)
+ event.payloadData.append(self.cartridge_agent_config.partition_id)
event.payloadData.append(cartridgeagentconstants.LOAD_AVERAGE)
event.payloadData.append(load_avg)
@@ -173,28 +184,35 @@ class CEPPublisherConfiguration:
self.admin_password = None
self.read_config()
+ self.cartridge_agent_config = CartridgeAgentConfiguration()
+
def read_config(self):
- self.enabled = True if CartridgeAgentConfiguration.read_property("cep.stats.publisher.enabled", False).strip().lower() == "true" else False
+ self.enabled = True if self.cartridge_agent_config.read_property(
+ cartridgeagentconstants.CEP_PUBLISHER_ENABLED, False).strip().lower() == "true" else False
if not self.enabled:
CEPPublisherConfiguration.log.info("CEP Publisher disabled")
return
CEPPublisherConfiguration.log.info("CEP Publisher enabled")
- self.server_ip = CartridgeAgentConfiguration.read_property("thrift.receiver.ip", False)
+ self.server_ip = self.cartridge_agent_config.read_property(
+ cartridgeagentconstants.CEP_RECEIVER_IP, False)
if self.server_ip.strip() == "":
- raise RuntimeError("System property not found: thrift.receiver.ip")
+ raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_RECEIVER_IP)
- self.server_port = CartridgeAgentConfiguration.read_property("thrift.receiver.port", False)
+ self.server_port = self.cartridge_agent_config.read_property(
+ cartridgeagentconstants.CEP_RECEIVER_PORT, False)
if self.server_port.strip() == "":
- raise RuntimeError("System property not found: thrift.receiver.port")
+ raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_RECEIVER_PORT)
- self.admin_username = CartridgeAgentConfiguration.read_property("thrift.server.admin.username", False)
+ self.admin_username = self.cartridge_agent_config.read_property(
+ cartridgeagentconstants.CEP_SERVER_ADMIN_USERNAME, False)
if self.admin_username.strip() == "":
- raise RuntimeError("System property not found: thrift.server.admin.username")
+ raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_SERVER_ADMIN_USERNAME)
- self.admin_password = CartridgeAgentConfiguration.read_property("thrift.server.admin.password", False)
+ self.admin_password = self.cartridge_agent_config.read_property(
+ cartridgeagentconstants.CEP_SERVER_ADMIN_PASSWORD, False)
if self.admin_password.strip() == "":
- raise RuntimeError("System property not found: thrift.server.admin.password")
+ raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_SERVER_ADMIN_PASSWORD)
CEPPublisherConfiguration.log.info("CEP Publisher configuration initialized")
http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
index 260d67d..9b4d819 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
@@ -24,15 +24,15 @@ def publish_instance_started_event():
global started, log
if not started:
log.info("Publishing instance started event")
- service_name = CartridgeAgentConfiguration.service_name
- cluster_id = CartridgeAgentConfiguration.cluster_id
- network_partition_id = CartridgeAgentConfiguration.network_partition_id
- parition_id = CartridgeAgentConfiguration.partition_id
- member_id = CartridgeAgentConfiguration.member_id
+ service_name = CartridgeAgentConfiguration().service_name
+ cluster_id = CartridgeAgentConfiguration().cluster_id
+ network_partition_id = CartridgeAgentConfiguration().network_partition_id
+ parition_id = CartridgeAgentConfiguration().partition_id
+ member_id = CartridgeAgentConfiguration().member_id
instance_started_event = InstanceStartedEvent(service_name, cluster_id, network_partition_id, parition_id,
member_id)
- publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + "InstanceStartedEvent")
+ publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_STARTED_EVENT)
publisher.publish(instance_started_event)
started = True
log.info("Instance started event published")
@@ -44,15 +44,15 @@ def publish_instance_activated_event():
global activated, log
if not activated:
log.info("Publishing instance activated event")
- service_name = CartridgeAgentConfiguration.service_name
- cluster_id = CartridgeAgentConfiguration.cluster_id
- network_partition_id = CartridgeAgentConfiguration.network_partition_id
- parition_id = CartridgeAgentConfiguration.partition_id
- member_id = CartridgeAgentConfiguration.member_id
+ service_name = CartridgeAgentConfiguration().service_name
+ cluster_id = CartridgeAgentConfiguration().cluster_id
+ network_partition_id = CartridgeAgentConfiguration().network_partition_id
+ parition_id = CartridgeAgentConfiguration().partition_id
+ member_id = CartridgeAgentConfiguration().member_id
instance_activated_event = InstanceActivatedEvent(service_name, cluster_id, network_partition_id, parition_id,
member_id)
- publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + "InstanceActivatedEvent")
+ publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_ACTIVATED_EVENT)
publisher.publish(instance_activated_event)
log.info("Instance activated event published")
@@ -60,7 +60,7 @@ def publish_instance_activated_event():
if CEPPublisherConfiguration.get_instance().enabled:
interval_default = 15 # seconds
- interval = CartridgeAgentConfiguration.read_property("stats.notifier.interval")
+ interval = CartridgeAgentConfiguration().read_property("stats.notifier.interval")
if interval is not None and len(interval) > 0:
try:
interval = int(interval)
@@ -85,16 +85,16 @@ def publish_maintenance_mode_event():
if not maintenance:
log.info("Publishing instance maintenance mode event")
- service_name = CartridgeAgentConfiguration.service_name
- cluster_id = CartridgeAgentConfiguration.cluster_id
- network_partition_id = CartridgeAgentConfiguration.network_partition_id
- parition_id = CartridgeAgentConfiguration.partition_id
- member_id = CartridgeAgentConfiguration.member_id
+ service_name = CartridgeAgentConfiguration().service_name
+ cluster_id = CartridgeAgentConfiguration().cluster_id
+ network_partition_id = CartridgeAgentConfiguration().network_partition_id
+ parition_id = CartridgeAgentConfiguration().partition_id
+ member_id = CartridgeAgentConfiguration().member_id
instance_maintenance_mode_event = InstanceMaintenanceModeEvent(service_name, cluster_id, network_partition_id, parition_id,
member_id)
- publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + "InstanceMaintenanceModeEvent")
+ publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_MAINTENANCE_MODE_EVENT)
publisher.publish(instance_maintenance_mode_event)
maintenance = True
@@ -108,16 +108,16 @@ def publish_instance_ready_to_shutdown_event():
if not ready_to_shutdown:
log.info("Publishing instance activated event")
- service_name = CartridgeAgentConfiguration.service_name
- cluster_id = CartridgeAgentConfiguration.cluster_id
- network_partition_id = CartridgeAgentConfiguration.network_partition_id
- parition_id = CartridgeAgentConfiguration.partition_id
- member_id = CartridgeAgentConfiguration.member_id
+ service_name = CartridgeAgentConfiguration().service_name
+ cluster_id = CartridgeAgentConfiguration().cluster_id
+ network_partition_id = CartridgeAgentConfiguration().network_partition_id
+ parition_id = CartridgeAgentConfiguration().partition_id
+ member_id = CartridgeAgentConfiguration().member_id
instance_shutdown_event = InstanceReadyToShutdownEvent(service_name, cluster_id, network_partition_id, parition_id,
member_id)
- publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + "InstanceReadyToShutdownEvent")
+ publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_READY_TO_SHUTDOWN_EVENT)
publisher.publish(instance_shutdown_event)
ready_to_shutdown = True
@@ -140,14 +140,8 @@ class EventPublisher:
def __init__(self, topic):
self.__topic = topic
- """
- msgs = [{'topic': "instance/status/InstanceStartedEvent", 'payload': instance_started_event.to_JSON()}]
- #publish.single("instance", instance_started_event.to_JSON(), hostname="localhost", port=1883)
- publish.multiple(msgs, "localhost", 1883)
- """
-
def publish(self, event):
- mb_ip = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MB_IP)
- mb_port = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MB_PORT)
+ mb_ip = CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_IP)
+ mb_port = CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_PORT)
payload = event.to_json()
publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/subscriber/eventsubscriber.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/subscriber/eventsubscriber.py b/tools/python-cartridge-agent/cartridge-agent/modules/subscriber/eventsubscriber.py
index 46fe18a..da60d7d 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/subscriber/eventsubscriber.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/subscriber/eventsubscriber.py
@@ -1,11 +1,6 @@
-import logging
import threading
import paho.mqtt.client as mqtt
-from .. util import cartridgeagentconstants
-from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration
-from .. util.log import LogFactory
-
class EventSubscriber(threading.Thread):
"""
@@ -13,7 +8,7 @@ class EventSubscriber(threading.Thread):
register event handlers for various events.
"""
- def __init__(self, topic):
+ def __init__(self, topic, ip, port):
threading.Thread.__init__(self)
#{"ArtifactUpdateEvent" : onArtifactUpdateEvent()}
@@ -27,16 +22,16 @@ class EventSubscriber(threading.Thread):
self.__subscribed = False
+ self.__ip = ip
+ self.__port = port
+
def run(self):
self.__mb_client = mqtt.Client()
self.__mb_client.on_connect = self.on_connect
self.__mb_client.on_message = self.on_message
- mb_ip = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MB_IP)
- mb_port = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MB_PORT)
-
- self.log.debug("Connecting to the message broker with address %r:%r" % (mb_ip, mb_port))
- self.__mb_client.connect(mb_ip, mb_port, 60)
+ self.log.debug("Connecting to the message broker with address %r:%r" % (self.__ip, self.__port))
+ self.__mb_client.connect(self.__ip, self.__port, 60)
self.__subscribed = True
self.__mb_client.loop_forever()
@@ -79,3 +74,6 @@ class EventSubscriber(threading.Thread):
:rtype: bool
"""
return self.__subscribed
+
+
+from .. util.log import LogFactory
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py b/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
index 5d92306..4a13765 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py
@@ -247,8 +247,10 @@ class Cluster:
return member_id in self.member_map
def __str__(self):
- return "Cluster [serviceName=%r, clusterId=%r, autoscalePolicyName=%r, deploymentPolicyName=%r, hostNames=%r, tenantRange=%r, isLbCluster=%r, properties=%r]" % \
- (self.service_name, self.cluster_id, self.autoscale_policy_name, self.deployment_policy_name, self.hostnames, self.tenant_range, self.is_lb_cluster, self.properties)
+ return "Cluster [serviceName=" + self.service_name + ", clusterId=" + self.cluster_id \
+ + ", autoscalePolicyName=" + self.autoscale_policy_name + ", deploymentPolicyName=" \
+ + self.deployment_policy_name + ", hostNames=" + self.hostnames + ", tenantRange=" + self.tenant_range \
+ + ", isLbCluster=" + self.is_lb_cluster + ", properties=" + self.properties + "]"
def tenant_id_in_range(self, tenant_id):
"""
http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentconstants.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentconstants.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentconstants.py
index 3d6dea1..7265c15 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentconstants.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentconstants.py
@@ -5,15 +5,6 @@ EXTENSIONS_DIR = "extensions.dir"
MB_IP = "mb.ip"
MB_PORT = "mb.port"
-INSTANCE_STARTED_SH = "instance-started.sh"
-START_SERVERS_SH = "start-servers.sh"
-INSTANCE_ACTIVATED_SH = "instance-activated.sh"
-ARTIFACTS_UPDATED_SH = "artifacts-updated.sh"
-CLEAN_UP_SH = "clean.sh"
-MOUNT_VOLUMES_SH = "mount_volumes.sh"
-SUBSCRIPTION_DOMAIN_ADDED_SH = "subscription-domain-added.sh"
-SUBSCRIPTION_DOMAIN_REMOVED_SH = "subscription-domain-removed.sh"
-
CARTRIDGE_KEY = "CARTRIDGE_KEY"
APP_PATH = "APP_PATH"
SERVICE_GROUP = "SERIVCE_GROUP"
@@ -29,6 +20,7 @@ PORTS = "PORTS"
DEPLOYMENT = "DEPLOYMENT"
MANAGER_SERVICE_TYPE = "MANAGER_SERVICE_TYPE"
WORKER_SERVICE_TYPE = "WORKER_SERVICE_TYPE"
+PERSISTENCE_MAPPING = "PERSISTENCE_MAPPING"
# stratos.sh environment variables keys
LOG_FILE_PATHS = "LOG_FILE_PATHS"
@@ -87,6 +79,41 @@ TOPOLOGY_TOPIC = "topology/#"
TENANT_TOPIC = "tenant/#"
INSTANCE_STATUS_TOPIC = "instance/status/"
-
#Messaging Model
-TENANT_RANGE_DELIMITER = "-"
\ No newline at end of file
+TENANT_RANGE_DELIMITER = "-"
+
+INSTANCE_STARTED_EVENT = "InstanceStartedEvent"
+INSTANCE_ACTIVATED_EVENT = "InstanceActivatedEvent"
+INSTANCE_MAINTENANCE_MODE_EVENT = "InstanceMaintenanceModeEvent"
+INSTANCE_READY_TO_SHUTDOWN_EVENT = "InstanceReadyToShutdownEvent"
+
+PUBLISHER_SERVICE_NAME = "publisher"
+APISTORE_SERVICE_NAME = "apistore"
+APIMANAGER_SERVICE_NAME = "apim"
+GATEWAY_SERVICE_NAME = "gatewaymgt"
+GATEWAY_MGT_SERVICE_NAME = "gateway"
+KEY_MANAGER_SERVICE_NAME = "keymanager"
+
+PRIMARY = "PRIMARY"
+MIN_COUNT = "MIN_COUNT"
+
+#multi tenant constants
+INVALID_TENANT_ID = "-1"
+SUPER_TENANT_ID = "-1234"
+
+DATE_FORMAT = "%Y.%m.%d"
+
+PORT_CHECK_TIMEOUT = "port.check.timeout"
+
+CEP_PUBLISHER_ENABLED = "cep.stats.publisher.enabled"
+CEP_RECEIVER_IP = "thrift.receiver.ip"
+CEP_RECEIVER_PORT = "thrift.receiver.port"
+CEP_SERVER_ADMIN_USERNAME = "thrift.server.admin.username"
+CEP_SERVER_ADMIN_PASSWORD = "thrift.server.admin.password"
+
+MONITORING_PUBLISHER_ENABLED = "enable.data.publisher"
+MONITORING_RECEIVER_IP = "monitoring.server.ip"
+MONITORING_RECEIVER_PORT = "monitoring.server.port"
+MONITORING_RECEIVER_SECURE_PORT = "monitoring.server.secure.port"
+MONITORING_SERVER_ADMIN_USERNAME = "monitoring.server.admin.username"
+MONITORING_SERVER_ADMIN_PASSWORD = "monitoring.server.admin.password"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
index d9916da..81b2b92 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py
@@ -6,7 +6,6 @@ import time
import socket
import shutil
-from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration
import cartridgeagentconstants
from log import LogFactory
@@ -16,6 +15,16 @@ log = LogFactory().get_log(__name__)
current_milli_time = lambda: int(round(time.time() * 1000))
+extension_handler = None
+
+
+def get_extension_handler():
+ global extension_handler
+ if extension_handler is None:
+ extension_handler = defaultextensionhandler.DefaultExtensionHandler()
+
+ return extension_handler
+
def decrypt_password(pass_str, secret):
"""
@@ -75,14 +84,14 @@ def delete_folder_tree(path):
log.exception("Deletion of folder path %r failed." % path)
-def wait_until_ports_active(ip_address, ports):
+def wait_until_ports_active(ip_address, ports, ports_check_timeout=600000):
"""
Blocks until the given list of ports become active
:param str ip_address: Ip address of the member to be checked
:param list[str] ports: List of ports to be checked
+ :param int ports_check_timeout: The timeout in milliseconds, defaults to 1000*60*10
:return: void
"""
- ports_check_timeout = CartridgeAgentConfiguration.read_property("port.check.timeout", critical=False)
if ports_check_timeout is None:
ports_check_timeout = 1000 * 60 * 10
@@ -160,4 +169,17 @@ def get_carbon_server_property(property_key):
:rtype : str
"""
- raise NotImplementedError
\ No newline at end of file
+ raise NotImplementedError
+
+
+def get_working_dir():
+ """
+ Returns the base directory of the cartridge agent.
+ :return: Base working dir path
+ :rtype : str
+ """
+ #"/path/to/cartridge-agent/modules/util/".split("modules") returns ["/path/to/cartridge-agent/", "/util"]
+ return os.path.abspath(os.path.dirname(__file__)).split("modules")[0]
+
+
+from ..extensions import defaultextensionhandler
http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
index a694ff1..487def4 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py
@@ -3,17 +3,18 @@ import os
import subprocess
import time
-from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration
-from .. topology.topologycontext import *
from log import LogFactory
log = LogFactory().get_log(__name__)
+cartridge_agent_config = cartridgeagentconfiguration.CartridgeAgentConfiguration()
+
def execute_copy_artifact_extension(source, destination):
try:
log.debug("Executing artifacts copy extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ARTIFACTS_COPY_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.ARTIFACTS_COPY_SCRIPT, False)
command = prepare_command(script_name)
output, errors = execute_command(command + " " + source + " " + destination)
@@ -26,7 +27,8 @@ def execute_instance_started_extension(env_params):
try:
log.debug("Executing instance started extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.INSTANCE_STARTED_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.INSTANCE_STARTED_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -40,7 +42,8 @@ def execute_instance_started_extension(env_params):
def execute_instance_activated_extension():
try:
log.debug("Executing instance activated extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.INSTANCE_ACTIVATED_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.INSTANCE_ACTIVATED_SCRIPT, False)
command = prepare_command(script_name)
output, errors = execute_command(command)
@@ -53,7 +56,8 @@ def execute_artifacts_updated_extension(env_params):
try:
log.debug("Executing artifacts updated extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ARTIFACTS_UPDATED_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.ARTIFACTS_UPDATED_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -68,7 +72,8 @@ def execute_subscription_domain_added_extension(env_params):
try:
log.debug("Executing subscription domain added extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.SUBSCRIPTION_DOMAIN_ADDED_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.SUBSCRIPTION_DOMAIN_ADDED_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -83,7 +88,8 @@ def execute_subscription_domain_removed_extension(env_params):
try:
log.debug("Executing subscription domain removed extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.SUBSCRIPTION_DOMAIN_REMOVED_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.SUBSCRIPTION_DOMAIN_REMOVED_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -98,7 +104,8 @@ def execute_start_servers_extension(env_params):
try:
log.debug("Executing start servers extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.START_SERVERS_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.START_SERVERS_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -113,7 +120,8 @@ def execute_complete_topology_extension(env_params):
try:
log.debug("Executing complete topology extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.COMPLETE_TOPOLOGY_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.COMPLETE_TOPOLOGY_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -128,7 +136,8 @@ def execute_complete_tenant_extension(env_params):
try:
log.debug("Executing complete tenant extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.COMPLETE_TENANT_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.COMPLETE_TENANT_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -143,7 +152,8 @@ def execute_tenant_subscribed_extension(env_params):
try:
log.debug("Executing tenant subscribed extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.TENANT_SUBSCRIBED_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.TENANT_SUBSCRIBED_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -158,7 +168,8 @@ def execute_tenant_unsubscribed_extension(env_params):
try:
log.debug("Executing tenant unsubscribed extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.TENANT_UNSUBSCRIBED_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.TENANT_UNSUBSCRIBED_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -173,7 +184,8 @@ def execute_member_terminated_extension(env_params):
try:
log.debug("Executing member terminated extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MEMBER_TERMINATED_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.MEMBER_TERMINATED_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -188,7 +200,8 @@ def execute_member_suspended_extension(env_params):
try:
log.debug("Executing member suspended extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MEMBER_SUSPENDED_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.MEMBER_SUSPENDED_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -198,11 +211,13 @@ def execute_member_suspended_extension(env_params):
except:
log.exception("Could not execute member suspended extension")
+
def execute_member_started_extension(env_params):
try:
log.debug("Executing member started extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MEMBER_STARTED_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.MEMBER_STARTED_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -240,7 +255,7 @@ def check_topology_consistency(service_name, cluster_id, member_id):
def is_relevant_member_event(service_name, cluster_id, lb_cluster_id):
- cluster_id_in_payload = CartridgeAgentConfiguration.cluster_id
+ cluster_id_in_payload = cartridge_agent_config.cluster_id
if cluster_id_in_payload is None:
return False
@@ -254,7 +269,7 @@ def is_relevant_member_event(service_name, cluster_id, lb_cluster_id):
if cluster_id_in_payload == lb_cluster_id:
return True
- service_group_in_payload = CartridgeAgentConfiguration.service_group
+ service_group_in_payload = cartridge_agent_config.service_group
if service_group_in_payload is not None:
service_properties = topology.get_service(service_name).properties
if service_properties is None:
@@ -262,19 +277,27 @@ def is_relevant_member_event(service_name, cluster_id, lb_cluster_id):
member_service_group = service_properties[cartridgeagentconstants.SERVICE_GROUP_TOPOLOGY_KEY]
if member_service_group is not None and member_service_group == service_group_in_payload:
- if service_name == CartridgeAgentConfiguration.service_name:
+ if service_name == cartridge_agent_config.service_name:
log.debug("Service names are same")
return True
- elif "apistore" == CartridgeAgentConfiguration.service_name and service_name == "publisher":
+ elif cartridgeagentconstants.APISTORE_SERVICE_NAME == \
+ cartridge_agent_config.service_name \
+ and service_name == cartridgeagentconstants.PUBLISHER_SERVICE_NAME:
log.debug("Service name in payload is [store]. Serivce name in event is [%r] " % service_name)
return True
- elif "publisher" == CartridgeAgentConfiguration.service_name and service_name == "apistore":
+ elif cartridgeagentconstants.PUBLISHER_SERVICE_NAME == \
+ cartridge_agent_config.service_name \
+ and service_name == cartridgeagentconstants.APISTORE_SERVICE_NAME:
log.debug("Service name in payload is [publisher]. Serivce name in event is [%r] " % service_name)
return True
- elif cartridgeagentconstants.DEPLOYMENT_WORKER == CartridgeAgentConfiguration.deployment and service_name == CartridgeAgentConfiguration.manager_service_name:
+ elif cartridgeagentconstants.DEPLOYMENT_WORKER == \
+ cartridge_agent_config.deployment \
+ and service_name == cartridge_agent_config.manager_service_name:
log.debug("Deployment is worker. Worker's manager service name & service name in event are same")
return True
- elif cartridgeagentconstants.DEPLOYMENT_MANAGER == CartridgeAgentConfiguration.deployment and service_name == CartridgeAgentConfiguration.worker_service_name:
+ elif cartridgeagentconstants.DEPLOYMENT_MANAGER == \
+ cartridge_agent_config.deployment \
+ and service_name == cartridge_agent_config.worker_service_name:
log.debug("Deployment is manager. Manager's worker service name & service name in event are same")
return True
@@ -284,7 +307,8 @@ def is_relevant_member_event(service_name, cluster_id, lb_cluster_id):
def execute_volume_mount_extension(persistance_mappings_payload):
try:
log.debug("Executing volume mounting extension: [payload] %r" % persistance_mappings_payload)
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MOUNT_VOLUMES_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.MOUNT_VOLUMES_SCRIPT, False)
command = prepare_command(script_name)
output, errors = execute_command(command + " " + persistance_mappings_payload)
@@ -296,7 +320,8 @@ def execute_volume_mount_extension(persistance_mappings_payload):
def execute_cleanup_extension():
try:
log.debug("Executing cleanup extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.CLEAN_UP_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.CLEAN_UP_SCRIPT, False)
command = prepare_command(script_name)
output, errors = execute_command(command)
@@ -309,7 +334,8 @@ def execute_member_activated_extension(env_params):
try:
log.debug("Executing member activated extension")
- script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MEMBER_ACTIVATED_SCRIPT, False)
+ script_name = cartridge_agent_config.read_property(
+ cartridgeagentconstants.MEMBER_ACTIVATED_SCRIPT, False)
command = prepare_command(script_name)
env_params = add_payload_parameters(env_params)
env_params = clean_process_parameters(env_params)
@@ -321,11 +347,13 @@ def execute_member_activated_extension(env_params):
def prepare_command(script_name):
- extensions_dir = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.EXTENSIONS_DIR, False)
+ extensions_dir = cartridge_agent_config.read_property(
+ cartridgeagentconstants.EXTENSIONS_DIR, False)
if extensions_dir.strip() == "":
raise RuntimeError("System property not found: %r" % cartridgeagentconstants.EXTENSIONS_DIR)
- file_path = extensions_dir + script_name if str(extensions_dir).endswith("/") else extensions_dir + "/" + script_name
+ file_path = extensions_dir + script_name if str(extensions_dir).endswith("/") \
+ else extensions_dir + "/" + script_name
if os.path.isfile(file_path):
return file_path
@@ -354,32 +382,35 @@ def add_payload_parameters(env_params):
:return: Dictionary with updated parameters
:rtype: dict[str, str]
"""
- env_params["STRATOS_APP_PATH"] = CartridgeAgentConfiguration.app_path
- env_params["STRATOS_PARAM_FILE_PATH"] = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.PARAM_FILE_PATH, False)
- env_params["STRATOS_SERVICE_NAME"] = CartridgeAgentConfiguration.service_name
- env_params["STRATOS_TENANT_ID"] = CartridgeAgentConfiguration.tenant_id
- env_params["STRATOS_CARTRIDGE_KEY"] = CartridgeAgentConfiguration.cartridge_key
- env_params["STRATOS_LB_CLUSTER_ID"] = CartridgeAgentConfiguration.lb_cluster_id
- env_params["STRATOS_CLUSTER_ID"] = CartridgeAgentConfiguration.cluster_id
- env_params["STRATOS_NETWORK_PARTITION_ID"] = CartridgeAgentConfiguration.network_partition_id
- env_params["STRATOS_PARTITION_ID"] = CartridgeAgentConfiguration.partition_id
- env_params["STRATOS_PERSISTENCE_MAPPINGS"] = CartridgeAgentConfiguration.persistence_mappings
- env_params["STRATOS_REPO_URL"] = CartridgeAgentConfiguration.repo_url
-
- lb_cluster_id_in_payload = CartridgeAgentConfiguration.lb_cluster_id
+ env_params["STRATOS_APP_PATH"] = cartridge_agent_config.app_path
+ env_params["STRATOS_PARAM_FILE_PATH"] = cartridge_agent_config.read_property(
+ cartridgeagentconstants.PARAM_FILE_PATH, False)
+ env_params["STRATOS_SERVICE_NAME"] = cartridge_agent_config.service_name
+ env_params["STRATOS_TENANT_ID"] = cartridge_agent_config.tenant_id
+ env_params["STRATOS_CARTRIDGE_KEY"] = cartridge_agent_config.cartridge_key
+ env_params["STRATOS_LB_CLUSTER_ID"] = cartridge_agent_config.lb_cluster_id
+ env_params["STRATOS_CLUSTER_ID"] = cartridge_agent_config.cluster_id
+ env_params["STRATOS_NETWORK_PARTITION_ID"] = \
+ cartridge_agent_config.network_partition_id
+ env_params["STRATOS_PARTITION_ID"] = cartridge_agent_config.partition_id
+ env_params["STRATOS_PERSISTENCE_MAPPINGS"] = \
+ cartridge_agent_config.persistence_mappings
+ env_params["STRATOS_REPO_URL"] = cartridge_agent_config.repo_url
+
+ lb_cluster_id_in_payload = cartridge_agent_config.lb_cluster_id
member_ips = get_lb_member_ip(lb_cluster_id_in_payload)
if member_ips is not None:
env_params["STRATOS_LB_IP"] = member_ips[0]
env_params["STRATOS_LB_PUBLIC_IP"] = member_ips[1]
else:
- env_params["STRATOS_LB_IP"] = CartridgeAgentConfiguration.lb_private_ip
- env_params["STRATOS_LB_PUBLIC_IP"] = CartridgeAgentConfiguration.lb_public_ip
+ env_params["STRATOS_LB_IP"] = cartridge_agent_config.lb_private_ip
+ env_params["STRATOS_LB_PUBLIC_IP"] = cartridge_agent_config.lb_public_ip
topology = TopologyContext.get_topology()
if topology.initialized:
- service = topology.get_service(CartridgeAgentConfiguration.service_name)
- cluster = service.get_cluster(CartridgeAgentConfiguration.cluster_id)
- member_id_in_payload = CartridgeAgentConfiguration.member_id
+ service = topology.get_service(cartridge_agent_config.service_name)
+ cluster = service.get_cluster(cartridge_agent_config.cluster_id)
+ member_id_in_payload = cartridge_agent_config.member_id
add_properties(service.properties, env_params, "SERVICE_PROPERTY")
add_properties(cluster.properties, env_params, "CLUSTER_PROPERTY")
add_properties(cluster.get_member(member_id_in_payload).properties, env_params, "MEMBER_PROPERTY")
@@ -439,3 +470,7 @@ def execute_command(command, env_params=None):
raise RuntimeError("Command execution failed: \n %r" % errors)
return output, errors
+
+
+from .. config import cartridgeagentconfiguration
+from .. topology.topologycontext import *
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/util/log.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/log.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/log.py
index 83b1f50..b6fec95 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/util/log.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/log.py
@@ -10,7 +10,7 @@ class LogFactory(object):
class __LogFactory:
def __init__(self):
self.logs = {}
- logging_conf = os.path.join(os.path.dirname(__file__), "logging.ini")
+ logging_conf = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "logging.ini"
logging.config.fileConfig(logging_conf)
def get_log(self, name):