You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ch...@apache.org on 2015/11/30 14:44:18 UTC
[1/5] stratos git commit: PCA - Publish MB events using a thread and
timeout after 5 seconds. Improved PCA structure and removed unnecessary
threading PCA Live Test - Improved logging, improved MB HA test case
Repository: stratos
Updated Branches:
refs/heads/stratos-4.1.x e722ff304 -> 81b72de83
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index 6e25b6b..08042b7 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -48,7 +48,7 @@ import java.util.concurrent.ExecutorService;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
-public class PythonAgentIntegrationTest {
+public abstract class PythonAgentIntegrationTest {
public static final String PATH_SEP = File.separator;
public static final String NEW_LINE = System.getProperty("line.separator");
@@ -219,6 +219,10 @@ public class PythonAgentIntegrationTest {
this.topologyEventReceiver.terminate();
this.initializerEventReceiver.terminate();
+ this.instanceStatusEventReceiver = null;
+ this.topologyEventReceiver = null;
+ this.initializerEventReceiver = null;
+
this.instanceActivated = false;
this.instanceStarted = false;
@@ -319,7 +323,7 @@ public class PythonAgentIntegrationTest {
log.error("ERROR found in PCA log", e);
}
}
- log.debug("[PCA] " + line);
+ log.debug("[" + getClassName() + "] [PCA] " + line);
}
}
sleep(100);
@@ -330,6 +334,12 @@ public class PythonAgentIntegrationTest {
}
/**
+ * Return concrete class name
+ * @return
+ */
+ protected abstract String getClassName();
+
+ /**
* Start server socket
*
* @param port Port number of server socket to be started
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf
index c6c55f3..770a548 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf
@@ -19,7 +19,7 @@
mb.urls =localhost:1885,localhost:1886,localhost:1887
mb.username =system
mb.password =manager
-mb.publisher.timeout =20
+mb.publisher.timeout =200
listen.address =localhost
thrift.receiver.urls =localhost:7712
thrift.server.admin.username =admin
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/log4j.properties b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/log4j.properties
index 759bbb4..74b61aa 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/log4j.properties
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/log4j.properties
@@ -31,7 +31,7 @@ log4j.additivity.org.apache.stratos.python.cartridge.agent.integration.tests.Pyt
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
#log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p {%c}:%L - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p {%c{1}}:%L - %m%n
log4j.appender.stdout.threshold=DEBUG
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-all.xml
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-all.xml b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-all.xml
index d8e342f..0e87501 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-all.xml
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-all.xml
@@ -24,6 +24,7 @@
<test name="all" preserve-order="true" parallel="false">
<groups>
<run>
+ <exclude name="ha"/>
<exclude name="failed"/>
<exclude name="disabled"/>
</run>
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-smoke.xml
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-smoke.xml b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-smoke.xml
index bcac412..77aeb2d 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-smoke.xml
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-smoke.xml
@@ -25,6 +25,8 @@
<groups>
<run>
<include name="smoke"/>
+ <!--<include name="ha"/>-->
+ <exclude name="ha"/>
<exclude name="failed"/>
<exclude name="disabled"/>
</run>
[5/5] stratos git commit: PCA Live Test - Minor test change
Posted by ch...@apache.org.
PCA Live Test - Minor test change
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/e40f2544
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/e40f2544
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/e40f2544
Branch: refs/heads/stratos-4.1.x
Commit: e40f25443b10d40700fb50e02cf07c36c68f276e
Parents: 7d35754
Author: Chamila de Alwis <ch...@apache.org>
Authored: Mon Nov 30 12:53:10 2015 +0530
Committer: Chamila de Alwis <ch...@apache.org>
Committed: Mon Nov 30 19:13:56 2015 +0530
----------------------------------------------------------------------
.../agent/integration/tests/ADCValidationTestCase.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/e40f2544/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCValidationTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCValidationTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCValidationTestCase.java
index 5148992..45834e5 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCValidationTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCValidationTestCase.java
@@ -59,6 +59,11 @@ public class ADCValidationTestCase extends PythonAgentIntegrationTest {
public ADCValidationTestCase() throws IOException {
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
log.info("Setting up ADCTestCase");
@@ -75,7 +80,7 @@ public class ADCValidationTestCase extends PythonAgentIntegrationTest {
tearDown();
}
- @Test(timeOut = ADC_TEST_TIMEOUT, groups = {"adddddd"})
+ @Test(timeOut = ADC_TEST_TIMEOUT, groups = {"smoke"})
public void testAppPathValidation(){
log.info("Testing app path validation for ADC");
startCommunicatorThread();
[2/5] stratos git commit: PCA - Publish MB events using a thread and
timeout after 5 seconds. Improved PCA structure and removed unnecessary
threading PCA Live Test - Improved logging, improved MB HA test case
Posted by ch...@apache.org.
PCA - Publish MB events using a thread and timeout after 5 seconds. Improved PCA structure and removed unnecessary threading
PCA Live Test - Improved logging, improved MB HA test case
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d1912180
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d1912180
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d1912180
Branch: refs/heads/stratos-4.1.x
Commit: d19121809154d42998ddc0c85722600f03f7225f
Parents: e722ff3
Author: Chamila de Alwis <ch...@apache.org>
Authored: Fri Nov 27 14:36:42 2015 +0530
Committer: Chamila de Alwis <ch...@apache.org>
Committed: Mon Nov 30 19:13:55 2015 +0530
----------------------------------------------------------------------
.../cartridge.agent/cartridge.agent/agent.py | 62 +-
.../modules/event/eventhandler.py | 1181 +++++++++---------
.../cartridge.agent/publisher.py | 72 +-
.../cartridge.agent/subscriber.py | 8 +-
.../integration/tests/ADCExtensionTestCase.java | 5 +
.../tests/ADCMTAppTenantUserTestCase.java | 5 +
.../integration/tests/ADCMTAppTestCase.java | 5 +
.../agent/integration/tests/ADCTestCase.java | 5 +
.../integration/tests/AgentStartupTestCase.java | 5 +
.../integration/tests/CEPHAModeTestCase.java | 5 +
.../tests/MessageBrokerHATestCase.java | 30 +-
.../tests/PythonAgentIntegrationTest.java | 14 +-
.../MessageBrokerHATestCase/agent.conf | 2 +-
.../src/test/resources/log4j.properties | 2 +-
.../src/test/resources/test-suite-all.xml | 1 +
.../src/test/resources/test-suite-smoke.xml | 2 +
16 files changed, 734 insertions(+), 670 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
index 959568b..6b81dff 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
@@ -16,22 +16,19 @@
# specific language governing permissions and limitations
# under the License.
-import threading
-
import publisher
from logpublisher import *
from modules.event.application.signup.events import *
from modules.event.domain.mapping.events import *
-from modules.event.eventhandler import EventHandler
+import modules.event.eventhandler as event_handler
from modules.event.instance.notifier.events import *
from modules.event.tenant.events import *
from modules.event.topology.events import *
from subscriber import EventSubscriber
-class CartridgeAgent(threading.Thread):
+class CartridgeAgent(object):
def __init__(self):
- threading.Thread.__init__(self)
Config.initialize_config()
self.__tenant_context_initialized = False
self.__log_publish_manager = None
@@ -47,9 +44,7 @@ class CartridgeAgent(threading.Thread):
self.__app_topic_subscriber = EventSubscriber(constants.APPLICATION_SIGNUP, mb_urls, mb_uname, mb_pwd)
self.__topology_event_subscriber = EventSubscriber(constants.TOPOLOGY_TOPIC, mb_urls, mb_uname, mb_pwd)
- self.__event_handler = EventHandler()
-
- def run(self):
+ def run_agent(self):
self.__log.info("Starting Cartridge Agent...")
# Start topology event receiver thread
@@ -58,7 +53,7 @@ class CartridgeAgent(threading.Thread):
if Config.lvs_virtual_ip is None or str(Config.lvs_virtual_ip).strip() == "":
self.__log.debug("LVS Virtual IP is not defined")
else:
- self.__event_handler.create_dummy_interface()
+ event_handler.create_dummy_interface()
# request complete topology event from CC by publishing CompleteTopologyRequestEvent
publisher.publish_complete_topology_request_event()
@@ -84,14 +79,14 @@ class CartridgeAgent(threading.Thread):
publisher.publish_complete_tenant_request_event()
# Execute instance started shell script
- self.__event_handler.on_instance_started_event()
+ event_handler.on_instance_started_event()
# Publish instance started event
publisher.publish_instance_started_event()
# Execute start servers extension
try:
- self.__event_handler.start_server_extension()
+ event_handler.start_server_extension()
except Exception as e:
self.__log.exception("Error processing start servers event: %s" % e)
@@ -100,7 +95,7 @@ class CartridgeAgent(threading.Thread):
if repo_url is None or str(repo_url).strip() == "":
self.__log.info("No artifact repository found")
publisher.publish_instance_activated_event()
- self.__event_handler.on_instance_activated_event()
+ event_handler.on_instance_activated_event()
else:
# instance activated event will be published in artifact updated event handler
self.__log.info(
@@ -109,7 +104,7 @@ class CartridgeAgent(threading.Thread):
persistence_mapping_payload = Config.persistence_mappings
if persistence_mapping_payload is not None:
- self.__event_handler.volume_mount_extension(persistence_mapping_payload)
+ event_handler.volume_mount_extension(persistence_mapping_payload)
# start log publishing thread
if DataPublisherConfiguration.get_instance().enabled:
@@ -198,14 +193,14 @@ class CartridgeAgent(threading.Thread):
def on_artifact_updated(self, msg):
event_obj = ArtifactUpdatedEvent.create_from_json(msg.payload)
- self.__event_handler.on_artifact_updated_event(event_obj)
+ event_handler.on_artifact_updated_event(event_obj)
def on_instance_cleanup_member(self, msg):
member_in_payload = Config.member_id
event_obj = InstanceCleanupMemberEvent.create_from_json(msg.payload)
member_in_event = event_obj.member_id
if member_in_payload == member_in_event:
- self.__event_handler.on_instance_cleanup_member_event()
+ event_handler.on_instance_cleanup_member_event()
def on_instance_cleanup_cluster(self, msg):
event_obj = InstanceCleanupClusterEvent.create_from_json(msg.payload)
@@ -215,7 +210,7 @@ class CartridgeAgent(threading.Thread):
instance_in_event = event_obj.cluster_instance_id
if cluster_in_event == cluster_in_payload and instance_in_payload == instance_in_event:
- self.__event_handler.on_instance_cleanup_cluster_event()
+ event_handler.on_instance_cleanup_cluster_event()
def on_member_created(self, msg):
self.__log.debug("Member created event received: %r" % msg.payload)
@@ -227,7 +222,7 @@ class CartridgeAgent(threading.Thread):
if not TopologyContext.topology.initialized:
return
- self.__event_handler.on_member_initialized_event(event_obj)
+ event_handler.on_member_initialized_event(event_obj)
def on_member_activated(self, msg):
self.__log.debug("Member activated event received: %r" % msg.payload)
@@ -235,7 +230,7 @@ class CartridgeAgent(threading.Thread):
return
event_obj = MemberActivatedEvent.create_from_json(msg.payload)
- self.__event_handler.on_member_activated_event(event_obj)
+ event_handler.on_member_activated_event(event_obj)
def on_member_terminated(self, msg):
self.__log.debug("Member terminated event received: %r" % msg.payload)
@@ -243,7 +238,7 @@ class CartridgeAgent(threading.Thread):
return
event_obj = MemberTerminatedEvent.create_from_json(msg.payload)
- self.__event_handler.on_member_terminated_event(event_obj)
+ event_handler.on_member_terminated_event(event_obj)
def on_member_suspended(self, msg):
self.__log.debug("Member suspended event received: %r" % msg.payload)
@@ -251,7 +246,7 @@ class CartridgeAgent(threading.Thread):
return
event_obj = MemberSuspendedEvent.create_from_json(msg.payload)
- self.__event_handler.on_member_suspended_event(event_obj)
+ event_handler.on_member_suspended_event(event_obj)
def on_complete_topology(self, msg):
event_obj = CompleteTopologyEvent.create_from_json(msg.payload)
@@ -259,7 +254,7 @@ class CartridgeAgent(threading.Thread):
if not TopologyContext.topology.initialized:
self.__log.info("Topology initialized from complete topology event")
TopologyContext.topology.initialized = True
- self.__event_handler.on_complete_topology_event(event_obj)
+ event_handler.on_complete_topology_event(event_obj)
self.__log.debug("Topology context updated with [topology] %r" % event_obj.topology.json_str)
@@ -269,17 +264,17 @@ class CartridgeAgent(threading.Thread):
return
event_obj = MemberStartedEvent.create_from_json(msg.payload)
- self.__event_handler.on_member_started_event(event_obj)
+ event_handler.on_member_started_event(event_obj)
def on_domain_mapping_added(self, msg):
self.__log.debug("Subscription domain added event received : %r" % msg.payload)
event_obj = DomainMappingAddedEvent.create_from_json(msg.payload)
- self.__event_handler.on_domain_mapping_added_event(event_obj)
+ event_handler.on_domain_mapping_added_event(event_obj)
def on_domain_mapping_removed(self, msg):
self.__log.debug("Subscription domain removed event received : %r" % msg.payload)
event_obj = DomainMappingRemovedEvent.create_from_json(msg.payload)
- self.__event_handler.on_domain_mapping_removed_event(event_obj)
+ event_handler.on_domain_mapping_removed_event(event_obj)
def on_complete_tenant(self, msg):
event_obj = CompleteTenantEvent.create_from_json(msg.payload)
@@ -287,19 +282,19 @@ class CartridgeAgent(threading.Thread):
if not self.__tenant_context_initialized:
self.__log.info("Tenant context initialized from complete tenant event")
self.__tenant_context_initialized = True
- self.__event_handler.on_complete_tenant_event(event_obj)
+ event_handler.on_complete_tenant_event(event_obj)
self.__log.debug("Tenant context updated with [tenant list] %r" % event_obj.tenant_list_json)
def on_tenant_subscribed(self, msg):
self.__log.debug("Tenant subscribed event received: %r" % msg.payload)
event_obj = TenantSubscribedEvent.create_from_json(msg.payload)
- self.__event_handler.on_tenant_subscribed_event(event_obj)
+ event_handler.on_tenant_subscribed_event(event_obj)
def on_application_signup_removed(self, msg):
self.__log.debug("Application signup removed event received: %r" % msg.payload)
event_obj = ApplicationSignUpRemovedEvent.create_from_json(msg.payload)
- self.__event_handler.on_application_signup_removed_event(event_obj)
+ event_handler.on_application_signup_removed_event(event_obj)
def wait_for_complete_topology(self):
while not TopologyContext.topology.initialized:
@@ -308,17 +303,12 @@ class CartridgeAgent(threading.Thread):
self.__log.info("Complete topology event received")
-def main():
- cartridge_agent = CartridgeAgent()
+if __name__ == "__main__":
log = LogFactory().get_log(__name__)
-
try:
log.info("Starting Stratos cartridge agent...")
- cartridge_agent.start()
+ cartridge_agent = CartridgeAgent()
+ cartridge_agent.run_agent()
except Exception as e:
log.exception("Cartridge Agent Exception: %r" % e)
- cartridge_agent.terminate()
-
-
-if __name__ == "__main__":
- main()
+ # cartridge_agent.terminate()
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
index 6e2aa4f..f8b0c2b 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
@@ -21,8 +21,6 @@ from threading import Thread
import publisher
from entity import *
-import constants
-from config import Config
from ..artifactmgt.git.agentgithandler import *
from ..artifactmgt.repository import Repository
from ..util import cartridgeagentutils
@@ -31,485 +29,492 @@ from ..util.log import LogFactory
SUPER_TENANT_ID = "-1234"
SUPER_TENANT_REPO_PATH = "/repository/deployment/server/"
TENANT_REPO_PATH = "/repository/tenants/"
+log = LogFactory().get_log(__name__)
+
+"""
+Event execution related logic
+"""
+
+
+def on_instance_started_event():
+ log.debug("Processing instance started event...")
+ # TODO: copy artifacts extension
+ execute_event_extendables(constants.INSTANCE_STARTED_EVENT, {})
+
+
+def create_dummy_interface():
+ log.debug("Processing lvs dummy interface creation...")
+ lvs_vip = Config.lvs_virtual_ip.split("|")
+ log.debug("LVS dummy interface creation values %s %s " % (lvs_vip[0], lvs_vip[1]))
+ execute_event_extendables(
+ constants.CREATE_LVS_DUMMY_INTERFACE,
+ {"EVENT": constants.CREATE_LVS_DUMMY_INTERFACE,
+ "LVS_DUMMY_VIRTUAL_IP": lvs_vip[0],
+ "LVS_SUBNET_MASK": lvs_vip[1]}
+ )
+
+
+def on_instance_activated_event():
+ log.debug("Processing instance activated event...")
+ execute_event_extendables(constants.INSTANCE_ACTIVATED_EVENT, {})
+
+
+def on_artifact_updated_event(artifacts_updated_event):
+ log.debug(
+ "Processing artifact updated event for [tenant] %s [cluster] %s [status] %s"
+ % (str(artifacts_updated_event.tenant_id), artifacts_updated_event.cluster_id, artifacts_updated_event.status))
+
+ cluster_id_event = str(artifacts_updated_event.cluster_id).strip()
+ cluster_id_payload = Config.cluster_id
+ repo_url = str(artifacts_updated_event.repo_url).strip()
+
+ if repo_url == "":
+ log.error("Repository URL is empty. Failed to process artifact updated event.")
+ return
+
+ if cluster_id_payload is None or cluster_id_payload == "":
+ log.error("Cluster ID in payload is empty. Failed to process artifact updated event.")
+ return
+
+ if cluster_id_payload != cluster_id_event:
+ log.debug("Cluster ID in artifact updated event does not match. Skipping event handler.")
+ return
+
+ repo_password = None
+ if artifacts_updated_event.repo_password is not None:
+ secret = Config.cartridge_key
+ repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret)
+
+ if Config.app_path is None:
+ log.error("Repository path is empty. Failed to process artifact updated event.")
+ return
+
+ repo_username = artifacts_updated_event.repo_username
+ tenant_id = artifacts_updated_event.tenant_id
+ is_multitenant = Config.is_multiTenant
+ commit_enabled = artifacts_updated_event.commit_enabled
+
+ # create repo object
+ local_repo_path = get_repo_path_for_tenant(str(tenant_id), Config.app_path, is_multitenant)
+ repo_info = Repository(repo_url, repo_username, repo_password, local_repo_path, tenant_id, commit_enabled)
+ log.info("Executing checkout job on artifact updated event...")
+
+ try:
+ Config.artifact_checkout_plugin.plugin_object.checkout(repo_info)
+ except Exception as e:
+ log.exception(
+ "Checkout job on artifact updated event failed for tenant: %s %s" % (repo_info.tenant_id, e))
+
+ # execute artifact updated extension
+ plugin_values = {"ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id,
+ "ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id,
+ "ARTIFACT_UPDATED_REPO_URL": artifacts_updated_event.repo_url,
+ "ARTIFACT_UPDATED_REPO_PASSWORD": artifacts_updated_event.repo_password,
+ "ARTIFACT_UPDATED_REPO_USERNAME": artifacts_updated_event.repo_username,
+ "ARTIFACT_UPDATED_STATUS": artifacts_updated_event.status}
+
+ try:
+ execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, plugin_values)
+ except Exception as e:
+ log.exception("Could not execute plugins for artifact updated event: %s" % e)
+
+ if not Config.activated:
+ # publish instance activated event if not yet activated
+ publisher.publish_instance_activated_event()
+ on_instance_activated_event()
+
+ update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, True)
+ auto_commit = Config.is_commits_enabled
+ auto_checkout = Config.is_checkout_enabled
+ log.info("ADC configuration: [update_artifacts] %s, [auto-commit] %s, [auto-checkout] %s"
+ % (update_artifacts, auto_commit, auto_checkout))
+
+ if update_artifacts:
+ try:
+ update_interval = int(Config.artifact_update_interval)
+ except ValueError:
+ log.exception("Invalid artifact sync interval specified: %s" % ValueError)
+ update_interval = 10
+
+ log.info("Artifact updating task enabled, update interval: %s seconds" % update_interval)
+
+ log.info("Auto Commit is turned %s " % ("on" if auto_commit else "off"))
+ log.info("Auto Checkout is turned %s " % ("on" if auto_checkout else "off"))
+
+ AgentGitHandler.schedule_artifact_update_task(
+ repo_info,
+ auto_checkout,
+ auto_commit,
+ update_interval)
+
+
+def on_instance_cleanup_cluster_event():
+ log.debug("Processing instance cleanup cluster event...")
+ cleanup(constants.INSTANCE_CLEANUP_CLUSTER_EVENT)
+
+
+def on_instance_cleanup_member_event():
+ log.debug("Processing instance cleanup member event...")
+ cleanup(constants.INSTANCE_CLEANUP_MEMBER_EVENT)
+
+
+def on_member_activated_event(member_activated_event):
+ log.debug(
+ "Processing Member activated event: [service] %r [cluster] %r [member] %r"
+ % (member_activated_event.service_name,
+ member_activated_event.cluster_id,
+ member_activated_event.member_id))
+
+ member_initialized = is_member_initialized_in_topology(
+ member_activated_event.service_name,
+ member_activated_event.cluster_id,
+ member_activated_event.member_id)
+
+ if not member_initialized:
+ log.error("Member has not initialized, failed to execute member activated event")
+ return
+
+ execute_event_extendables(constants.MEMBER_ACTIVATED_EVENT, {})
+
+
+def on_complete_topology_event(complete_topology_event):
+ log.debug("Processing Complete topology event...")
+
+ service_name_in_payload = Config.service_name
+ cluster_id_in_payload = Config.cluster_id
+ member_id_in_payload = Config.member_id
+
+ if not Config.initialized:
+ member_initialized = is_member_initialized_in_topology(
+ service_name_in_payload,
+ cluster_id_in_payload,
+ member_id_in_payload)
+ if member_initialized:
+ # Set cartridge agent as initialized since member is available and it is in initialized state
+ Config.initialized = True
+ log.info(
+ "Member initialized [member id] %s, [cluster-id] %s, [service] %s"
+ % (member_id_in_payload, cluster_id_in_payload, service_name_in_payload))
-class EventHandler:
+ topology = complete_topology_event.get_topology()
+ service = topology.get_service(service_name_in_payload)
+ if service is None:
+ raise Exception("Service not found in topology [service] %s" % service_name_in_payload)
+
+ cluster = service.get_cluster(cluster_id_in_payload)
+ if cluster is None:
+ raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id_in_payload)
+
+ plugin_values = {"TOPOLOGY_JSON": json.dumps(topology.json_str),
+ "MEMBER_LIST_JSON": json.dumps(cluster.member_list_json)}
+
+ execute_event_extendables(constants.COMPLETE_TOPOLOGY_EVENT, plugin_values)
+
+
+def on_member_initialized_event(member_initialized_event):
"""
- Event execution related logic
+ Member initialized event is sent by cloud controller once volume attachment and
+ ip address allocation is completed successfully
+ :param member_initialized_event:
+ :return:
"""
+ log.debug("Processing Member initialized event...")
+ service_name_in_payload = Config.service_name
+ cluster_id_in_payload = Config.cluster_id
+ member_id_in_payload = Config.member_id
+
+ if not Config.initialized and member_id_in_payload == member_initialized_event.member_id:
+ member_exists = member_exists_in_topology(
+ service_name_in_payload,
+ cluster_id_in_payload,
+ member_id_in_payload)
+
+ log.debug("Member exists: %s" % member_exists)
+
+ if member_exists:
+ Config.initialized = True
+ mark_member_as_initialized(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
+ log.info("Instance marked as initialized on member initialized event")
+ else:
+ raise Exception("Member [member-id] %s not found in topology while processing member initialized "
+ "event. [Topology] %s" % (member_id_in_payload, TopologyContext.get_topology()))
- def __init__(self):
- self.__log = LogFactory().get_log(__name__)
+ execute_event_extendables(constants.MEMBER_INITIALIZED_EVENT, {})
- def on_instance_started_event(self):
- self.__log.debug("Processing instance started event...")
- # TODO: copy artifacts extension
- self.execute_event_extendables(constants.INSTANCE_STARTED_EVENT, {})
-
- def create_dummy_interface(self):
- self.__log.debug("Processing lvs dummy interface creation...")
- lvs_vip = Config.lvs_virtual_ip.split("|")
- self.__log.debug("LVS dummy interface creation values %s %s " % (lvs_vip[0], lvs_vip[1]))
- self.execute_event_extendables(constants.CREATE_LVS_DUMMY_INTERFACE,
- {"EVENT": constants.CREATE_LVS_DUMMY_INTERFACE,
- "LVS_DUMMY_VIRTUAL_IP": lvs_vip[0],
- "LVS_SUBNET_MASK": lvs_vip[1]})
-
- def on_instance_activated_event(self):
- self.__log.debug("Processing instance activated event...")
- self.execute_event_extendables(constants.INSTANCE_ACTIVATED_EVENT, {})
-
- def on_artifact_updated_event(self, artifacts_updated_event):
- self.__log.debug("Processing artifact updated event for [tenant] %s [cluster] %s [status] %s" %
- (str(artifacts_updated_event.tenant_id),
- artifacts_updated_event.cluster_id,
- artifacts_updated_event.status))
-
- cluster_id_event = str(artifacts_updated_event.cluster_id).strip()
- cluster_id_payload = Config.cluster_id
- repo_url = str(artifacts_updated_event.repo_url).strip()
-
- if repo_url == "":
- self.__log.error("Repository URL is empty. Failed to process artifact updated event.")
- return
-
- if cluster_id_payload is None or cluster_id_payload == "":
- self.__log.error("Cluster ID in payload is empty. Failed to process artifact updated event.")
- return
-
- if cluster_id_payload != cluster_id_event:
- self.__log.debug("Cluster ID in artifact updated event does not match. Skipping event handler.")
- return
-
- repo_password = None
- if artifacts_updated_event.repo_password is not None:
- secret = Config.cartridge_key
- repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret)
-
- if Config.app_path is None:
- self.__log.error("Repository path is empty. Failed to process artifact updated event.")
- return
-
- if not EventHandler.validate_repo_path(Config.app_path):
- self.__log.error(
- "Repository path cannot be accessed, or is invalid. Failed to process artifact updated event.")
- return
-
- repo_username = artifacts_updated_event.repo_username
- tenant_id = artifacts_updated_event.tenant_id
- is_multitenant = Config.is_multiTenant
- commit_enabled = artifacts_updated_event.commit_enabled
-
- # create repo object
- local_repo_path = self.get_repo_path_for_tenant(str(tenant_id), Config.app_path, is_multitenant)
- repo_info = Repository(repo_url, repo_username, repo_password, local_repo_path, tenant_id, commit_enabled)
- self.__log.info("Executing checkout job on artifact updated event...")
- try:
- Config.artifact_checkout_plugin.plugin_object.checkout(repo_info)
- except Exception as e:
- self.__log.exception(
- "Checkout job on artifact updated event failed for tenant: %s %s" % (repo_info.tenant_id, e))
+def on_complete_tenant_event(complete_tenant_event):
+ log.debug("Processing Complete tenant event...")
- # execute artifact updated extension
- plugin_values = {"ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id,
- "ARTIFACT_UPDATED_TENANT_ID": artifacts_updated_event.tenant_id,
- "ARTIFACT_UPDATED_REPO_URL": artifacts_updated_event.repo_url,
- "ARTIFACT_UPDATED_REPO_PASSWORD": artifacts_updated_event.repo_password,
- "ARTIFACT_UPDATED_REPO_USERNAME": artifacts_updated_event.repo_username,
- "ARTIFACT_UPDATED_STATUS": artifacts_updated_event.status}
+ tenant_list_json = complete_tenant_event.tenant_list_json
+ log.debug("Complete tenants:" + json.dumps(tenant_list_json))
- try:
- self.execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, plugin_values)
- except Exception as e:
- self.__log.exception("Could not execute plugins for artifact updated event: %s" % e)
-
- if not Config.activated:
- # publish instance activated event if not yet activated
- publisher.publish_instance_activated_event()
- self.on_instance_activated_event()
-
- update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, True)
- auto_commit = Config.is_commits_enabled
- auto_checkout = Config.is_checkout_enabled
- self.__log.info("ADC configuration: [update_artifacts] %s, [auto-commit] %s, [auto-checkout] %s",
- update_artifacts, auto_commit, auto_checkout)
- if update_artifacts:
- try:
- update_interval = int(Config.artifact_update_interval)
- except ValueError:
- self.__log.exception("Invalid artifact sync interval specified: %s" % ValueError)
- update_interval = 10
-
- self.__log.info("Artifact updating task enabled, update interval: %s seconds" % update_interval)
-
- self.__log.info("Auto Commit is turned %s " % ("on" if auto_commit else "off"))
- self.__log.info("Auto Checkout is turned %s " % ("on" if auto_checkout else "off"))
-
- AgentGitHandler.schedule_artifact_update_task(
- repo_info,
- auto_checkout,
- auto_commit,
- update_interval)
-
- def on_instance_cleanup_cluster_event(self):
- self.__log.debug("Processing instance cleanup cluster event...")
- self.cleanup(constants.INSTANCE_CLEANUP_CLUSTER_EVENT)
-
- def on_instance_cleanup_member_event(self):
- self.__log.debug("Processing instance cleanup member event...")
- self.cleanup(constants.INSTANCE_CLEANUP_MEMBER_EVENT)
-
- def on_member_activated_event(self, member_activated_event):
- self.__log.debug("Processing Member activated event: [service] %r [cluster] %r [member] %r"
- % (member_activated_event.service_name,
- member_activated_event.cluster_id,
- member_activated_event.member_id))
-
- member_initialized = self.is_member_initialized_in_topology(
- member_activated_event.service_name,
- member_activated_event.cluster_id,
- member_activated_event.member_id)
-
- if not member_initialized:
- self.__log.error("Member has not initialized, failed to execute member activated event")
- return
-
- self.execute_event_extendables(constants.MEMBER_ACTIVATED_EVENT, {})
-
- def on_complete_topology_event(self, complete_topology_event):
- self.__log.debug("Processing Complete topology event...")
-
- service_name_in_payload = Config.service_name
- cluster_id_in_payload = Config.cluster_id
- member_id_in_payload = Config.member_id
-
- if not Config.initialized:
- member_initialized = self.is_member_initialized_in_topology(
- service_name_in_payload,
- cluster_id_in_payload,
- member_id_in_payload)
-
- if member_initialized:
- # Set cartridge agent as initialized since member is available and it is in initialized state
- Config.initialized = True
- self.__log.info("Member initialized [member id] %s, [cluster-id] %s, [service] %s" %
- (member_id_in_payload, cluster_id_in_payload, service_name_in_payload))
-
- topology = complete_topology_event.get_topology()
- service = topology.get_service(service_name_in_payload)
- if service is None:
- raise Exception("Service not found in topology [service] %s" % service_name_in_payload)
+ plugin_values = {"TENANT_LIST_JSON": json.dumps(tenant_list_json)}
- cluster = service.get_cluster(cluster_id_in_payload)
- if cluster is None:
- raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id_in_payload)
-
- plugin_values = {"TOPOLOGY_JSON": json.dumps(topology.json_str),
- "MEMBER_LIST_JSON": json.dumps(cluster.member_list_json)}
-
- self.execute_event_extendables(constants.COMPLETE_TOPOLOGY_EVENT, plugin_values)
-
- def on_member_initialized_event(self, member_initialized_event):
- """
- Member initialized event is sent by cloud controller once volume attachment and
- ip address allocation is completed successfully
- :return:
- """
- self.__log.debug("Processing Member initialized event...")
- service_name_in_payload = Config.service_name
- cluster_id_in_payload = Config.cluster_id
- member_id_in_payload = Config.member_id
-
- if not Config.initialized and member_id_in_payload == member_initialized_event.member_id:
- member_exists = self.member_exists_in_topology(service_name_in_payload, cluster_id_in_payload,
- member_id_in_payload)
- self.__log.debug("Member exists: %s" % member_exists)
- if member_exists:
- Config.initialized = True
- self.mark_member_as_initialized(service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
- self.__log.info("Instance marked as initialized on member initialized event")
- else:
- raise Exception("Member [member-id] %s not found in topology while processing member initialized "
- "event. [Topology] %s" % (member_id_in_payload, TopologyContext.get_topology()))
+ execute_event_extendables(constants.COMPLETE_TENANT_EVENT, plugin_values)
+
+
+def on_member_terminated_event(member_terminated_event):
+ log.debug(
+ "Processing Member terminated event: [service] %s [cluster] %s [member] %s"
+ % (member_terminated_event.service_name, member_terminated_event.cluster_id, member_terminated_event.member_id))
+
+ member_initialized = is_member_initialized_in_topology(
+ member_terminated_event.service_name,
+ member_terminated_event.cluster_id,
+ member_terminated_event.member_id
+ )
+
+ if not member_initialized:
+ log.error("Member has not initialized, failed to execute member terminated event")
+ return
+
+ execute_event_extendables(constants.MEMBER_TERMINATED_EVENT, {})
+
+
+def on_member_suspended_event(member_suspended_event):
+ log.debug(
+ "Processing Member suspended event: [service] %s [cluster] %s [member] %s"
+ % (member_suspended_event.service_name, member_suspended_event.cluster_id, member_suspended_event.member_id))
+
+ member_initialized = is_member_initialized_in_topology(
+ member_suspended_event.service_name,
+ member_suspended_event.cluster_id,
+ member_suspended_event.member_id
+ )
+
+ if not member_initialized:
+ log.error("Member has not initialized, failed to execute member suspended event")
+ return
+
+ execute_event_extendables(constants.MEMBER_SUSPENDED_EVENT, {})
+
+
+def on_member_started_event(member_started_event):
+ log.debug(
+ "Processing Member started event: [service] %s [cluster] %s [member] %s"
+ % (member_started_event.service_name, member_started_event.cluster_id, member_started_event.member_id))
+
+ member_initialized = is_member_initialized_in_topology(
+ member_started_event.service_name,
+ member_started_event.cluster_id,
+ member_started_event.member_id
+ )
+
+ if not member_initialized:
+ log.error("Member has not initialized, failed to execute member started event")
+ return
+
+ execute_event_extendables(constants.MEMBER_STARTED_EVENT, {})
+
+
+def start_server_extension():
+ log.debug("Processing start server extension...")
+ service_name_in_payload = Config.service_name
+ cluster_id_in_payload = Config.cluster_id
+ member_id_in_payload = Config.member_id
+ member_initialized = is_member_initialized_in_topology(
+ service_name_in_payload, cluster_id_in_payload, member_id_in_payload)
+
+ if not member_initialized:
+ log.error("Member has not initialized, failed to execute start server event")
+ return
+
+ execute_event_extendables("StartServers", {})
+
+
+def volume_mount_extension(persistence_mappings_payload):
+ log.debug("Processing volume mount extension...")
+ execute_event_extendables("VolumeMount", persistence_mappings_payload)
+
+
+def on_domain_mapping_added_event(domain_mapping_added_event):
+ tenant_domain = find_tenant_domain(domain_mapping_added_event.tenant_id)
+ log.debug(
+ "Processing Domain mapping added event: [tenant-id] " + str(domain_mapping_added_event.tenant_id) +
+ " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_added_event.domain_name +
+ " [application-context] " + domain_mapping_added_event.application_context
+ )
+
+ plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_added_event.application_id,
+ "SUBSCRIPTION_SERVICE_NAME": domain_mapping_added_event.service_name,
+ "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_added_event.domain_name,
+ "SUBSCRIPTION_CLUSTER_ID": domain_mapping_added_event.cluster_id,
+ "SUBSCRIPTION_TENANT_ID": int(domain_mapping_added_event.tenant_id),
+ "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain,
+ "SUBSCRIPTION_CONTEXT_PATH":
+ domain_mapping_added_event.context_path}
+
+ execute_event_extendables(constants.DOMAIN_MAPPING_ADDED_EVENT, plugin_values)
+
+
+def on_domain_mapping_removed_event(domain_mapping_removed_event):
+ tenant_domain = find_tenant_domain(domain_mapping_removed_event.tenant_id)
+ log.info(
+ "Domain mapping removed event received: [tenant-id] " + str(domain_mapping_removed_event.tenant_id) +
+ " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_removed_event.domain_name
+ )
+
+ plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_removed_event.application_id,
+ "SUBSCRIPTION_SERVICE_NAME": domain_mapping_removed_event.service_name,
+ "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_removed_event.domain_name,
+ "SUBSCRIPTION_CLUSTER_ID": domain_mapping_removed_event.cluster_id,
+ "SUBSCRIPTION_TENANT_ID": int(domain_mapping_removed_event.tenant_id),
+ "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain}
+
+ execute_event_extendables(constants.DOMAIN_MAPPING_REMOVED_EVENT, plugin_values)
+
+
+def on_copy_artifacts_extension(src, dest):
+ log.debug("Processing Copy artifacts extension...")
+ plugin_values = {"SOURCE": src, "DEST": dest}
+ execute_event_extendables("CopyArtifacts", plugin_values)
- self.execute_event_extendables(constants.MEMBER_INITIALIZED_EVENT, {})
-
- def on_complete_tenant_event(self, complete_tenant_event):
- self.__log.debug("Processing Complete tenant event...")
-
- tenant_list_json = complete_tenant_event.tenant_list_json
- self.__log.debug("Complete tenants:" + json.dumps(tenant_list_json))
-
- plugin_values = {"TENANT_LIST_JSON": json.dumps(tenant_list_json)}
-
- self.execute_event_extendables(constants.COMPLETE_TENANT_EVENT, plugin_values)
-
- def on_member_terminated_event(self, member_terminated_event):
- self.__log.debug("Processing Member terminated event: [service] %s [cluster] %s [member] %s" %
- (member_terminated_event.service_name, member_terminated_event.cluster_id,
- member_terminated_event.member_id))
-
- member_initialized = self.is_member_initialized_in_topology(
- member_terminated_event.service_name,
- member_terminated_event.cluster_id,
- member_terminated_event.member_id
- )
-
- if not member_initialized:
- self.__log.error("Member has not initialized, failed to execute member terminated event")
- return
-
- self.execute_event_extendables(constants.MEMBER_TERMINATED_EVENT, {})
-
- def on_member_suspended_event(self, member_suspended_event):
- self.__log.debug("Processing Member suspended event: [service] %s [cluster] %s [member] %s" %
- (member_suspended_event.service_name, member_suspended_event.cluster_id,
- member_suspended_event.member_id))
-
- member_initialized = self.is_member_initialized_in_topology(
- member_suspended_event.service_name,
- member_suspended_event.cluster_id,
- member_suspended_event.member_id
- )
-
- if not member_initialized:
- self.__log.error("Member has not initialized, failed to execute member suspended event")
- return
-
- self.execute_event_extendables(constants.MEMBER_SUSPENDED_EVENT, {})
-
- def on_member_started_event(self, member_started_event):
- self.__log.debug("Processing Member started event: [service] %s [cluster] %s [member] %s" %
- (member_started_event.service_name, member_started_event.cluster_id,
- member_started_event.member_id))
-
- member_initialized = self.is_member_initialized_in_topology(
- member_started_event.service_name,
- member_started_event.cluster_id,
- member_started_event.member_id
- )
-
- if not member_initialized:
- self.__log.error("Member has not initialized, failed to execute member started event")
- return
-
- self.execute_event_extendables(constants.MEMBER_STARTED_EVENT, {})
-
- def start_server_extension(self):
- self.__log.debug("Processing start server extension...")
- service_name_in_payload = Config.service_name
- cluster_id_in_payload = Config.cluster_id
- member_id_in_payload = Config.member_id
- member_initialized = self.is_member_initialized_in_topology(service_name_in_payload, cluster_id_in_payload,
- member_id_in_payload)
-
- if not member_initialized:
- self.__log.error("Member has not initialized, failed to execute start server event")
- return
-
- self.execute_event_extendables("StartServers", {})
-
- def volume_mount_extension(self, persistence_mappings_payload):
- self.__log.debug("Processing volume mount extension...")
- self.execute_event_extendables("VolumeMount", persistence_mappings_payload)
-
- def on_domain_mapping_added_event(self, domain_mapping_added_event):
- tenant_domain = EventHandler.find_tenant_domain(domain_mapping_added_event.tenant_id)
- self.__log.debug(
- "Processing Domain mapping added event: [tenant-id] " + str(domain_mapping_added_event.tenant_id) +
- " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_added_event.domain_name +
- " [application-context] " + domain_mapping_added_event.application_context
- )
-
- plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_added_event.application_id,
- "SUBSCRIPTION_SERVICE_NAME": domain_mapping_added_event.service_name,
- "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_added_event.domain_name,
- "SUBSCRIPTION_CLUSTER_ID": domain_mapping_added_event.cluster_id,
- "SUBSCRIPTION_TENANT_ID": int(domain_mapping_added_event.tenant_id),
- "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain,
- "SUBSCRIPTION_CONTEXT_PATH":
- domain_mapping_added_event.context_path}
-
- self.execute_event_extendables(constants.DOMAIN_MAPPING_ADDED_EVENT, plugin_values)
-
- def on_domain_mapping_removed_event(self, domain_mapping_removed_event):
- tenant_domain = EventHandler.find_tenant_domain(domain_mapping_removed_event.tenant_id)
- self.__log.info(
- "Domain mapping removed event received: [tenant-id] " + str(domain_mapping_removed_event.tenant_id) +
- " [tenant-domain] " + tenant_domain + " [domain-name] " + domain_mapping_removed_event.domain_name
- )
-
- plugin_values = {"SUBSCRIPTION_APPLICATION_ID": domain_mapping_removed_event.application_id,
- "SUBSCRIPTION_SERVICE_NAME": domain_mapping_removed_event.service_name,
- "SUBSCRIPTION_DOMAIN_NAME": domain_mapping_removed_event.domain_name,
- "SUBSCRIPTION_CLUSTER_ID": domain_mapping_removed_event.cluster_id,
- "SUBSCRIPTION_TENANT_ID": int(domain_mapping_removed_event.tenant_id),
- "SUBSCRIPTION_TENANT_DOMAIN": tenant_domain}
-
- self.execute_event_extendables(constants.DOMAIN_MAPPING_REMOVED_EVENT, plugin_values)
-
- def on_copy_artifacts_extension(self, src, dest):
- self.__log.debug("Processing Copy artifacts extension...")
- plugin_values = {"SOURCE": src, "DEST": dest}
- self.execute_event_extendables("CopyArtifacts", plugin_values)
-
- def on_tenant_subscribed_event(self, tenant_subscribed_event):
- self.__log.debug(
- "Processing Tenant subscribed event: [tenant] " + str(tenant_subscribed_event.tenant_id) +
- " [service] " + tenant_subscribed_event.service_name + " [cluster] " + tenant_subscribed_event.cluster_ids
- )
-
- self.execute_event_extendables(constants.TENANT_SUBSCRIBED_EVENT, {})
-
- def on_application_signup_removed_event(self, application_signup_removal_event):
- self.__log.debug(
- "Processing Tenant unsubscribed event: [tenant] " + str(application_signup_removal_event.tenantId) +
- " [application ID] " + str(application_signup_removal_event.applicationId)
- )
-
- if Config.application_id == application_signup_removal_event.applicationId:
- AgentGitHandler.remove_repo(application_signup_removal_event.tenantId)
-
- self.execute_event_extendables(constants.APPLICATION_SIGNUP_REMOVAL_EVENT, {})
-
- def cleanup(self, event):
- self.__log.debug("Executing cleanup extension for event %s..." % event)
- publisher.publish_maintenance_mode_event()
- self.execute_event_extendables("clean", {})
- publisher.publish_instance_ready_to_shutdown_event()
-
- def execute_event_extendables(self, event, input_values):
- """ Execute the extensions and plugins related to the event
- :param event: The event name string
- :param input_values: the values to be passed to the plugin
- :return:
- """
- try:
- input_values = EventHandler.add_common_input_values(input_values)
- except Exception as e:
- self.__log.error("Error while adding common input values for event extendables: %s" % e)
- input_values["EVENT"] = event
- self.__log.debug("Executing extensions for [event] %s with [input values] %s" % (event, input_values))
- # Execute the extension
- self.execute_extension_for_event(event, input_values)
- # Execute the plugins
- self.execute_plugins_for_event(event, input_values)
-
- def execute_plugins_for_event(self, event, input_values):
- """ For each plugin registered for the specified event, start a plugin execution thread
- :param str event: The event name string
- :param dict input_values: the values to be passed to the plugin
- :return:
- """
- try:
- plugins_for_event = Config.plugins.get(event)
- if plugins_for_event is not None:
- for plugin_info in plugins_for_event:
- self.__log.debug("Executing plugin %s for event %s" % (plugin_info.name, event))
- plugin_thread = PluginExecutor(plugin_info, input_values)
- plugin_thread.start()
-
- # block till plugin run completes.
- plugin_thread.join()
- else:
- self.__log.debug("No plugins registered for event %s" % event)
- except Exception as e:
- self.__log.exception("Error while executing plugin for event %s: %s" % (event, e))
-
- def execute_extension_for_event(self, event, extension_values):
- """ Execute the extension related to the event
- :param event: The event name string
- :param extension_values: the values to be passed to the plugin
- :return:
- """
- try:
- if Config.extension_executor is not None:
- self.__log.debug("Executing extension for event [%s]" % event)
- extension_thread = PluginExecutor(Config.extension_executor, extension_values)
- extension_thread.start()
+
+def on_tenant_subscribed_event(tenant_subscribed_event):
+ log.debug(
+ "Processing Tenant subscribed event: [tenant] " + str(tenant_subscribed_event.tenant_id) +
+ " [service] " + tenant_subscribed_event.service_name + " [cluster] " + tenant_subscribed_event.cluster_ids
+ )
+
+ execute_event_extendables(constants.TENANT_SUBSCRIBED_EVENT, {})
+
+
+def on_application_signup_removed_event(application_signup_removal_event):
+ log.debug(
+ "Processing Tenant unsubscribed event: [tenant] " + str(application_signup_removal_event.tenantId) +
+ " [application ID] " + str(application_signup_removal_event.applicationId)
+ )
+
+ if Config.application_id == application_signup_removal_event.applicationId:
+ AgentGitHandler.remove_repo(application_signup_removal_event.tenantId)
+
+ execute_event_extendables(constants.APPLICATION_SIGNUP_REMOVAL_EVENT, {})
+
+
+def cleanup(event):
+ log.debug("Executing cleanup extension for event %s..." % event)
+ publisher.publish_maintenance_mode_event()
+ execute_event_extendables("clean", {})
+ publisher.publish_instance_ready_to_shutdown_event()
+
+
+def execute_event_extendables(event, input_values):
+ """ Execute the extensions and plugins related to the event
+ :param event: The event name string
+ :param input_values: the values to be passed to the plugin
+ :return:
+ """
+ try:
+ input_values = add_common_input_values(input_values)
+ except Exception as e:
+ log.error("Error while adding common input values for event extendables: %s" % e)
+ input_values["EVENT"] = event
+ log.debug("Executing extensions for [event] %s with [input values] %s" % (event, input_values))
+ # Execute the extension
+ execute_extension_for_event(event, input_values)
+ # Execute the plugins
+ execute_plugins_for_event(event, input_values)
+
+
+def execute_plugins_for_event(event, input_values):
+ """ For each plugin registered for the specified event, start a plugin execution thread
+ :param str event: The event name string
+ :param dict input_values: the values to be passed to the plugin
+ :return:
+ """
+ try:
+ plugins_for_event = Config.plugins.get(event)
+ if plugins_for_event is not None:
+ for plugin_info in plugins_for_event:
+ log.debug("Executing plugin %s for event %s" % (plugin_info.name, event))
+ plugin_thread = PluginExecutor(plugin_info, input_values)
+ plugin_thread.start()
# block till plugin run completes.
- extension_thread.join()
- else:
- self.__log.debug("No extensions registered for event %s" % event)
- except OSError as e:
- self.__log.warn("No extension was found for event %s: %s" % (event, e))
- except Exception as e:
- self.__log.exception("Error while executing extension for event %s: %s" % (event, e))
-
- def get_repo_path_for_tenant(self, tenant_id, git_local_repo_path, is_multitenant):
- """ Finds the repository path for tenant to clone from the remote repository
- :param tenant_id:
- :param git_local_repo_path:
- :param is_multitenant:
- :return:
- """
- repo_path = ""
-
- if is_multitenant:
- if tenant_id == SUPER_TENANT_ID:
- # super tenant, /repository/deploy/server/
- super_tenant_repo_path = Config.super_tenant_repository_path
- # "app_path"
- repo_path += git_local_repo_path
-
- if super_tenant_repo_path is not None and super_tenant_repo_path != "":
- super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.startswith("/") \
- else "/" + super_tenant_repo_path
- super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.endswith("/") \
- else super_tenant_repo_path + "/"
- # "app_path/repository/deploy/server/"
- repo_path += super_tenant_repo_path
- else:
- # "app_path/repository/deploy/server/"
- repo_path += SUPER_TENANT_REPO_PATH
+ plugin_thread.join()
+ else:
+ log.debug("No plugins registered for event %s" % event)
+ except Exception as e:
+ log.exception("Error while executing plugin for event %s: %s" % (event, e))
+
+def execute_extension_for_event(event, extension_values):
+ """ Execute the extension related to the event
+ :param event: The event name string
+ :param extension_values: the values to be passed to the plugin
+ :return:
+ """
+ try:
+ if Config.extension_executor is not None:
+ log.debug("Executing extension for event [%s]" % event)
+ extension_thread = PluginExecutor(Config.extension_executor, extension_values)
+ extension_thread.start()
+
+ # block till plugin run completes.
+ extension_thread.join()
+ else:
+ log.debug("No extensions registered for event %s" % event)
+ except OSError as e:
+ log.warn("No extension was found for event %s: %s" % (event, e))
+ except Exception as e:
+ log.exception("Error while executing extension for event %s: %s" % (event, e))
+
+
+def get_repo_path_for_tenant(tenant_id, git_local_repo_path, is_multitenant):
+ """ Finds the repository path for tenant to clone from the remote repository
+ :param tenant_id:
+ :param git_local_repo_path:
+ :param is_multitenant:
+ :return:
+ """
+ repo_path = ""
+
+ if is_multitenant:
+ if tenant_id == SUPER_TENANT_ID:
+ # super tenant, /repository/deploy/server/
+ super_tenant_repo_path = Config.super_tenant_repository_path
+ # "app_path"
+ repo_path += git_local_repo_path
+
+ if super_tenant_repo_path is not None and super_tenant_repo_path != "":
+ super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.startswith("/") \
+ else "/" + super_tenant_repo_path
+ super_tenant_repo_path = super_tenant_repo_path if super_tenant_repo_path.endswith("/") \
+ else super_tenant_repo_path + "/"
+ # "app_path/repository/deploy/server/"
+ repo_path += super_tenant_repo_path
else:
- # normal tenant, /repository/tenants/tenant_id
- tenant_repo_path = Config.tenant_repository_path
- # "app_path"
- repo_path += git_local_repo_path
-
- if tenant_repo_path is not None and tenant_repo_path != "":
- tenant_repo_path = tenant_repo_path if tenant_repo_path.startswith("/") else "/" + tenant_repo_path
- tenant_repo_path = tenant_repo_path if tenant_repo_path.endswith("/") else tenant_repo_path + "/"
- # "app_path/repository/tenants/244653444"
- repo_path += tenant_repo_path + tenant_id
- else:
- # "app_path/repository/tenants/244653444"
- repo_path += TENANT_REPO_PATH + tenant_id
-
- # tenant_dir_path = git_local_repo_path + AgentGitHandler.TENANT_REPO_PATH + tenant_id
- # GitUtils.create_dir(repo_path)
+ # "app_path/repository/deploy/server/"
+ repo_path += SUPER_TENANT_REPO_PATH
+
else:
- # not multi tenant, app_path
- repo_path = git_local_repo_path
-
- self.__log.debug("Repo path returned : %r" % repo_path)
- return repo_path
-
- def is_member_initialized_in_topology(self, service_name, cluster_id, member_id):
- if self.member_exists_in_topology(service_name, cluster_id, member_id):
- topology = TopologyContext.get_topology()
- service = topology.get_service(service_name)
- if service is None:
- raise Exception("Service not found in topology [service] %s" % service_name)
-
- cluster = service.get_cluster(cluster_id)
- if cluster is None:
- raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
-
- member = cluster.get_member(member_id)
- if member is None:
- raise Exception("Member id not found in topology [member] %s" % member_id)
-
- self.__log.info("Found member: " + member.to_json())
- if member.status == MemberStatus.Initialized:
- return True
- return False
+ # normal tenant, /repository/tenants/tenant_id
+ tenant_repo_path = Config.tenant_repository_path
+ # "app_path"
+ repo_path += git_local_repo_path
+
+ if tenant_repo_path is not None and tenant_repo_path != "":
+ tenant_repo_path = tenant_repo_path if tenant_repo_path.startswith("/") else "/" + tenant_repo_path
+ tenant_repo_path = tenant_repo_path if tenant_repo_path.endswith("/") else tenant_repo_path + "/"
+ # "app_path/repository/tenants/244653444"
+ repo_path += tenant_repo_path + tenant_id
+ else:
+ # "app_path/repository/tenants/244653444"
+ repo_path += TENANT_REPO_PATH + tenant_id
+
+ # tenant_dir_path = git_local_repo_path + AgentGitHandler.TENANT_REPO_PATH + tenant_id
+ # GitUtils.create_dir(repo_path)
+ else:
+ # not multi tenant, app_path
+ repo_path = git_local_repo_path
- def member_exists_in_topology(self, service_name, cluster_id, member_id):
+ log.debug("Repo path returned : %r" % repo_path)
+ return repo_path
+
+
+def is_member_initialized_in_topology(service_name, cluster_id, member_id):
+ if member_exists_in_topology(service_name, cluster_id, member_id):
topology = TopologyContext.get_topology()
service = topology.get_service(service_name)
if service is None:
@@ -519,131 +524,149 @@ class EventHandler:
if cluster is None:
raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
- activated_member = cluster.get_member(member_id)
- if activated_member is None:
- self.__log.error("Member id not found in topology [member] %s" % member_id)
- return False
+ member = cluster.get_member(member_id)
+ if member is None:
+ raise Exception("Member id not found in topology [member] %s" % member_id)
- return True
+ log.info("Found member: " + member.to_json())
+ if member.status == MemberStatus.Initialized:
+ return True
+ return False
- @staticmethod
- def mark_member_as_initialized(service_name, cluster_id, member_id):
- topology = TopologyContext.get_topology()
- service = topology.get_service(service_name)
+
+def member_exists_in_topology(service_name, cluster_id, member_id):
+ topology = TopologyContext.get_topology()
+ service = topology.get_service(service_name)
+ if service is None:
+ raise Exception("Service not found in topology [service] %s" % service_name)
+
+ cluster = service.get_cluster(cluster_id)
+ if cluster is None:
+ raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
+
+ activated_member = cluster.get_member(member_id)
+ if activated_member is None:
+ log.error("Member id not found in topology [member] %s" % member_id)
+ return False
+
+ return True
+
+
+def mark_member_as_initialized(service_name, cluster_id, member_id):
+ topology = TopologyContext.get_topology()
+ service = topology.get_service(service_name)
+ if service is None:
+ raise Exception("Service not found in topology [service] %s" % service_name)
+
+ cluster = service.get_cluster(cluster_id)
+ if cluster is None:
+ raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
+
+ member = cluster.get_member(member_id)
+ if member is None:
+ raise Exception("Member id not found in topology [member] %s" % member_id)
+ member.status = MemberStatus.Initialized
+
+
+def add_common_input_values(plugin_values):
+ """
+ Adds the common parameters to be used by the extension scripts
+ :param dict[str, str] plugin_values: Dictionary to be added
+ :return: Dictionary with updated parameters
+ :rtype: dict[str, str]
+ """
+ if plugin_values is None:
+ plugin_values = {}
+ elif type(plugin_values) != dict:
+ plugin_values = {"VALUE1": str(plugin_values)}
+
+ plugin_values["APPLICATION_PATH"] = Config.app_path
+ plugin_values["PARAM_FILE_PATH"] = Config.read_property(constants.PARAM_FILE_PATH, False)
+ plugin_values["PERSISTENCE_MAPPINGS"] = Config.persistence_mappings
+
+ lb_cluster_id_in_payload = Config.lb_cluster_id
+ lb_private_ip, lb_public_ip = get_lb_member_ip(lb_cluster_id_in_payload)
+ plugin_values["LB_IP"] = lb_private_ip if lb_private_ip is not None else Config.lb_private_ip
+ plugin_values["LB_PUBLIC_IP"] = lb_public_ip if lb_public_ip is not None else Config.lb_public_ip
+
+ topology = TopologyContext.get_topology()
+ if topology.initialized:
+ service = topology.get_service(Config.service_name)
if service is None:
- raise Exception("Service not found in topology [service] %s" % service_name)
+ raise Exception("Service not found in topology [service] %s" % Config.service_name)
- cluster = service.get_cluster(cluster_id)
+ cluster = service.get_cluster(Config.cluster_id)
if cluster is None:
- raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id)
+ raise Exception("Cluster id not found in topology [cluster] %s" % Config.cluster_id)
- member = cluster.get_member(member_id)
+ member = cluster.get_member(Config.member_id)
if member is None:
- raise Exception("Member id not found in topology [member] %s" % member_id)
- member.status = MemberStatus.Initialized
-
- @staticmethod
- def add_common_input_values(plugin_values):
- """
- Adds the common parameters to be used by the extension scripts
- :param dict[str, str] plugin_values: Dictionary to be added
- :return: Dictionary with updated parameters
- :rtype: dict[str, str]
- """
- if plugin_values is None:
- plugin_values = {}
- elif type(plugin_values) != dict:
- plugin_values = {"VALUE1": str(plugin_values)}
-
- plugin_values["APPLICATION_PATH"] = Config.app_path
- plugin_values["PARAM_FILE_PATH"] = Config.read_property(constants.PARAM_FILE_PATH, False)
- plugin_values["PERSISTENCE_MAPPINGS"] = Config.persistence_mappings
-
- lb_cluster_id_in_payload = Config.lb_cluster_id
- lb_private_ip, lb_public_ip = EventHandler.get_lb_member_ip(lb_cluster_id_in_payload)
- plugin_values["LB_IP"] = lb_private_ip if lb_private_ip is not None else Config.lb_private_ip
- plugin_values["LB_PUBLIC_IP"] = lb_public_ip if lb_public_ip is not None else Config.lb_public_ip
+ raise Exception("Member id not found in topology [member] %s" % Config.member_id)
- topology = TopologyContext.get_topology()
- if topology.initialized:
- service = topology.get_service(Config.service_name)
- if service is None:
- raise Exception("Service not found in topology [service] %s" % Config.service_name)
-
- cluster = service.get_cluster(Config.cluster_id)
- if cluster is None:
- raise Exception("Cluster id not found in topology [cluster] %s" % Config.cluster_id)
-
- member = cluster.get_member(Config.member_id)
- if member is None:
- raise Exception("Member id not found in topology [member] %s" % Config.member_id)
-
- EventHandler.add_properties(service.properties, plugin_values, "SERVICE_PROPERTY")
- EventHandler.add_properties(cluster.properties, plugin_values, "CLUSTER_PROPERTY")
- EventHandler.add_properties(member.properties, plugin_values, "MEMBER_PROPERTY")
-
- plugin_values.update(Config.get_payload_params())
-
- return EventHandler.clean_process_parameters(plugin_values)
-
- @staticmethod
- def add_properties(properties, params, prefix):
- """
- Adds the given property list to the parameters list with given prefix in the parameter name
- :param dict[str, str] properties: service properties
- :param dict[str, str] params:
- :param str prefix:
- :return: dict[str, str]
- """
- if properties is None or properties.items() is None:
- return
-
- for key in properties:
- params[prefix + "_" + key] = str(properties[key])
-
- @staticmethod
- def get_lb_member_ip(lb_cluster_id):
- topology = TopologyContext.get_topology()
- services = topology.get_services()
-
- for service in services:
- clusters = service.get_clusters()
- for cluster in clusters:
- members = cluster.get_members()
- for member in members:
- if member.cluster_id == lb_cluster_id:
- return member.member_default_private_ip, member.member_default_public_ip
-
- return None, None
-
- @staticmethod
- def clean_process_parameters(params):
- """
- Removes any null valued parameters before passing them to the extension scripts
- :param dict params:
- :return: cleaned parameters
- :rtype: dict
- """
- for key, value in params.items():
- if value is None:
- del params[key]
-
- return params
-
- @staticmethod
- def find_tenant_domain(tenant_id):
- tenant = TenantContext.get_tenant(tenant_id)
- if tenant is None:
- raise RuntimeError("Tenant could not be found: [tenant-id] %s" % str(tenant_id))
-
- return tenant.tenant_domain
-
- @staticmethod
- def validate_repo_path(app_path):
- # app path would be ex: /var/www, or /opt/server/data
- return os.access(app_path, os.W_OK)
+ add_properties(service.properties, plugin_values, "SERVICE_PROPERTY")
+ add_properties(cluster.properties, plugin_values, "CLUSTER_PROPERTY")
+ add_properties(member.properties, plugin_values, "MEMBER_PROPERTY")
+
+ plugin_values.update(Config.get_payload_params())
+
+ return clean_process_parameters(plugin_values)
+
+
+def add_properties(properties, params, prefix):
+ """
+ Adds the given property list to the parameters list with given prefix in the parameter name
+ :param dict[str, str] properties: service properties
+ :param dict[str, str] params:
+ :param str prefix:
+ :return: dict[str, str]
+ """
+ if properties is None or properties.items() is None:
+ return
+
+ for key in properties:
+ params[prefix + "_" + key] = str(properties[key])
+
+
+def get_lb_member_ip(lb_cluster_id):
+ topology = TopologyContext.get_topology()
+ services = topology.get_services()
+
+ for service in services:
+ clusters = service.get_clusters()
+ for cluster in clusters:
+ members = cluster.get_members()
+ for member in members:
+ if member.cluster_id == lb_cluster_id:
+ return member.member_default_private_ip, member.member_default_public_ip
+
+ return None, None
+
+
+def clean_process_parameters(params):
+ """
+ Removes any null valued parameters before passing them to the extension scripts
+ :param dict params:
+ :return: cleaned parameters
+ :rtype: dict
+ """
+ for key, value in params.items():
+ if value is None:
+ del params[key]
+
+ return params
+
+
+def find_tenant_domain(tenant_id):
+ tenant = TenantContext.get_tenant(tenant_id)
+ if tenant is None:
+ raise RuntimeError("Tenant could not be found: [tenant-id] %s" % str(tenant_id))
+
+ return tenant.tenant_domain
+def validate_repo_path(app_path):
+ # app path would be ex: /var/www, or /opt/server/data
+ return os.access(app_path, os.W_OK)
class PluginExecutor(Thread):
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
index a24650a..5b6190e 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
@@ -15,10 +15,10 @@
# specific language governing permissions and limitations
# under the License.
-import threading
-
+from threading import Thread
import paho.mqtt.publish as publish
import time
+from Queue import Queue, Empty
import constants
import healthstats
@@ -201,7 +201,7 @@ def get_publisher(topic):
return publishers[topic]
-class EventPublisher:
+class EventPublisher(object):
"""
Handles publishing events to topics to the provided message broker
"""
@@ -210,24 +210,9 @@ class EventPublisher:
self.__topic = topic
self.__log = LogFactory().get_log(__name__)
self.__start_time = int(time.time())
+ self.__msg_queue = Queue()
def publish(self, event):
- publisher_thread = threading.Thread(target=self.__publish_event, args=(event,))
- publisher_thread.start()
-
- def __publish_event(self, event):
- """
- Publishes the given event to the message broker.
-
- When a list of message brokers are given the event is published to the first message broker
- available. Therefore the message brokers should share the data (ex: Sharing the KahaDB in ActiveMQ).
-
- When the event cannot be published, it will be retried until the mb_publisher_timeout is exceeded.
- This value is set in the agent.conf.
-
- :param event:
- :return: True if the event was published.
- """
if Config.mb_username is None:
auth = None
else:
@@ -244,20 +229,59 @@ class EventPublisher:
for mb_url in Config.mb_urls:
mb_ip, mb_port = mb_url.split(":")
+ # start a thread to execute publish event
+ publisher_thread = Thread(target=self.__publish_event, args=(event, mb_ip, mb_port, auth, payload))
+ publisher_thread.start()
+
+ # give sometime for the thread to complete
+ time.sleep(5)
+
+ # check if thread is still running and notify
+ if publisher_thread.isAlive():
+ self.__log.debug(
+ "Event publishing timed out before succeeding. The message broker could be offline.")
+
+ # check if publish.single() succeeded
try:
- publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port, auth=auth)
- self.__log.debug("Event type: %s published to MB: %s:%s" % (str(event.__class__), mb_ip, mb_port))
+ published = self.__msg_queue.get(block=False)
+ except Empty:
+ published = False
+
+ if published:
return True
- except:
- self.__log.debug(
- "Could not publish event to message broker %s:%s." % (mb_ip, mb_port))
+ # All the brokers on the list were offline
self.__log.debug(
"Could not publish event to any of the provided message brokers. Retrying in %s seconds."
% retry_interval)
time.sleep(retry_interval)
+ # Even publisher timeout exceeded
self.__log.warn("Could not publish event to any of the provided message brokers before "
"the timeout [%s] exceeded. The event will be dropped." % Config.mb_publisher_timeout)
return False
+
+ def __publish_event(self, event, mb_ip, mb_port, auth, payload):
+ """
+ Publishes the given event to the message broker.
+
+ When a list of message brokers are given the event is published to the first message broker
+ available. Therefore the message brokers should share the data (ex: Sharing the KahaDB in ActiveMQ).
+
+ When the event cannot be published, it will be retried until the mb_publisher_timeout is exceeded.
+ This value is set in the agent.conf.
+
+ :param event:
+ :return: True if the event was published.
+ """
+ try:
+ self.__log.debug("Publishing [event] %s to %s:%s" % (event.__class__.__name__, mb_ip, mb_port))
+ publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port, auth=auth)
+ self.__log.debug("[Event] %s published to MB: %s:%s" % (str(event.__class__.__name__), mb_ip, mb_port))
+ self.__msg_queue.put(True)
+ except Exception as err:
+ self.__log.debug(
+ "Could not publish [event] %s to message broker %s:%s. : %s"
+ % (str(event.__class__.__name__), mb_ip, mb_port, err))
+ self.__msg_queue.put(False)
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
index ff5cef9..c5a6d2d 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
@@ -178,7 +178,6 @@ class EventExecutor(threading.Thread):
def __init__(self, event_queue):
threading.Thread.__init__(self)
self.__event_queue = event_queue
- # TODO: several handlers for one event
self.__event_handlers = {}
EventSubscriber.log = LogFactory().get_log(__name__)
@@ -191,10 +190,9 @@ class EventExecutor(threading.Thread):
try:
EventSubscriber.log.debug("Executing handler for event %r" % event)
handler(event_msg)
- except:
- EventSubscriber.log.exception("Error processing %r event" % event)
+ except Exception as err:
+ EventSubscriber.log.exception("Error processing %r event: %s" % (event, err))
else:
-
EventSubscriber.log.debug("Event handler not found for event : %r" % event)
def register_event_handler(self, event, handler):
@@ -226,7 +224,7 @@ class MessageBrokerHeartBeatChecker(AbstractAsyncScheduledTask):
try:
self.__mb_client.connect(self.__mb_ip, self.__mb_port, 60)
self.__mb_client.disconnect()
- except:
+ except Exception:
self.__log.info(
"Message broker %s:%s cannot be reached. Disconnecting client..." % (self.__mb_ip, self.__mb_port))
self.__connected_client.disconnect()
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
index 2398099..ab4975a 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
@@ -67,6 +67,11 @@ public class ADCExtensionTestCase extends PythonAgentIntegrationTest {
startServerSocket(8080);
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
/**
* TearDown method for test method testPythonCartridgeAgent
*/
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
index 6e40dd6..05d5ba2 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
@@ -70,6 +70,11 @@ public class ADCMTAppTenantUserTestCase extends PythonAgentIntegrationTest {
startServerSocket(8080);
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
/**
* TearDown method for test method testPythonCartridgeAgent
*/
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
index 6f0b070..444a5e0 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
@@ -72,6 +72,11 @@ public class ADCMTAppTestCase extends PythonAgentIntegrationTest {
startServerSocket(8080);
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
/**
* TearDown method for test method testPythonCartridgeAgent
*/
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
index 0dc92be..dba6197 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
@@ -63,6 +63,11 @@ public class ADCTestCase extends PythonAgentIntegrationTest {
public ADCTestCase() throws IOException {
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
@BeforeMethod(alwaysRun = true)
public void setupADCTest() throws Exception {
log.info("Setting up ADCTestCase");
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
index ea156b6..db21359 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
@@ -79,6 +79,11 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
startServerSocket(8080);
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
/**
* TearDown method for test method testPythonCartridgeAgent
*/
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
index 44d295b..ce13d3f 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
@@ -76,6 +76,11 @@ public class CEPHAModeTestCase extends PythonAgentIntegrationTest {
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
@BeforeMethod(alwaysRun = true)
public void setupCEPHAModeTest() throws Exception {
http://git-wip-us.apache.org/repos/asf/stratos/blob/d1912180/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
index b1f4d8b..8c72f2d 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
@@ -42,6 +42,11 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
public MessageBrokerHATestCase() throws IOException {
}
+ @Override
+ protected String getClassName() {
+ return this.getClass().getSimpleName();
+ }
+
private static final Log log = LogFactory.getLog(MessageBrokerHATestCase.class);
private static final int HA_TEST_TIMEOUT = 300000;
private static final String CLUSTER_ID = "php.php.domain";
@@ -169,10 +174,11 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
MEMBER_ID);
publishEvent(instanceCleanupMemberEvent);
publishCleanupEvent = true;
- waitUntilCleanupEventIsReceivedAndStopDefaultMB();
+
+ stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]);
}
- if (line.contains("Could not publish event to message broker localhost:1885.")) {
+ if (line.contains("Could not publish [event] ")) {
log.info("Event publishing to default message broker failed and the next option is tried.");
exit = true;
}
@@ -186,26 +192,6 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
log.info("MessageBrokerHATestCase publisher test completed successfully.");
}
- private void waitUntilCleanupEventIsReceivedAndStopDefaultMB() {
- boolean eventReceived = false;
- List<String> outputLines = new ArrayList<>();
-
- while (!eventReceived) {
- List<String> newLines = getNewLines(outputLines, outputStream.toString());
- if (newLines.size() > 0) {
- for (String line : newLines) {
- if (line.contains("Message received: instance/notifier/InstanceCleanupMemberEvent")) {
- // take down the default broker
- stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]);
- eventReceived = true;
- }
- }
- }
- log.info("Waiting until cleanup event is received by PCA...");
- }
- log.info("Cleanup event is received by PCA.");
- }
-
private void assertAgentActivation() {
pcaActivated = false;
instanceActivated = false;
[3/5] stratos git commit: PCA Live Test - Improved agent log display
by removing duplicate log entries
Posted by ch...@apache.org.
PCA Live Test - Improved agent log display by removing duplicate log entries
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7d357546
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7d357546
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7d357546
Branch: refs/heads/stratos-4.1.x
Commit: 7d357546009ee633fa3a5006232f5392dfaf0abb
Parents: d191218
Author: Chamila de Alwis <ch...@apache.org>
Authored: Fri Nov 27 18:05:09 2015 +0530
Committer: Chamila de Alwis <ch...@apache.org>
Committed: Mon Nov 30 19:13:56 2015 +0530
----------------------------------------------------------------------
.../integration/tests/PythonAgentIntegrationTest.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/7d357546/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index 08042b7..7f436f6 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -311,7 +311,7 @@ public abstract class PythonAgentIntegrationTest {
Thread communicatorThread = new Thread(new Runnable() {
@Override
public void run() {
- List<String> outputLines = new ArrayList<String>();
+ List<String> outputLines = new ArrayList<>();
while (!outputStream.isClosed()) {
List<String> newLines = getNewLines(outputLines, outputStream.toString());
if (newLines.size() > 0) {
@@ -544,12 +544,13 @@ public abstract class PythonAgentIntegrationTest {
* @return new lines printed by Python agent process
*/
protected List<String> getNewLines(List<String> currentOutputLines, String output) {
- List<String> newLines = new ArrayList<String>();
+ List<String> newLines = new ArrayList<>();
if (StringUtils.isNotBlank(output)) {
- String[] lines = output.split(NEW_LINE);
- for (String line : lines) {
- if (!currentOutputLines.contains(line)) {
+ List<String> lines = Arrays.asList(output.split(NEW_LINE));
+ if (lines.size() > 0) {
+ int readStartIndex = (currentOutputLines.size() > 0) ? currentOutputLines.size() - 1 : 0;
+ for (String line : lines.subList(readStartIndex , lines.size() - 1)) {
currentOutputLines.add(line);
newLines.add(line);
}
[4/5] stratos git commit: PCA Live Test - Refactoring PCA - Fixed
missing app path validation in event handler
Posted by ch...@apache.org.
PCA Live Test - Refactoring
PCA - Fixed missing app path validation in event handler
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/81b72de8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/81b72de8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/81b72de8
Branch: refs/heads/stratos-4.1.x
Commit: 81b72de838432e35bb8af413fd2be50893e61115
Parents: e40f254
Author: Chamila de Alwis <ch...@apache.org>
Authored: Mon Nov 30 19:13:49 2015 +0530
Committer: Chamila de Alwis <ch...@apache.org>
Committed: Mon Nov 30 19:13:56 2015 +0530
----------------------------------------------------------------------
.../cartridge.agent/healthstats.py | 16 ++---
.../modules/event/eventhandler.py | 9 ++-
.../integration/tests/ADCExtensionTestCase.java | 27 ---------
.../tests/ADCMTAppTenantUserTestCase.java | 42 ++++----------
.../integration/tests/ADCMTAppTestCase.java | 41 ++++---------
.../agent/integration/tests/ADCTestCase.java | 39 ++++---------
.../tests/ADCValidationTestCase.java | 44 +++++---------
.../integration/tests/AgentStartupTestCase.java | 39 ++++---------
.../integration/tests/CEPHAModeTestCase.java | 58 ++++---------------
.../tests/MessageBrokerHATestCase.java | 38 ++++--------
.../tests/PythonAgentIntegrationTest.java | 61 +++++++++++++++++++-
11 files changed, 159 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/81b72de8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
index 92d2495..71f2894 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
@@ -62,7 +62,7 @@ class HealthStatisticsPublisherManager(Thread):
self.log.debug("Publishing load average: %r" % cartridge_stats.load_avg)
self.publisher.publish_load_average(cartridge_stats.load_avg)
- except ThriftReceiverOfflineException:
+ except Exception as e:
self.log.exception(
"Couldn't publish health statistics to CEP. Thrift Receiver offline. Reconnecting...")
self.publisher = HealthStatisticsPublisher()
@@ -186,6 +186,7 @@ class HealthStatisticsPublisher:
def add_publishers(self, cep_url):
"""
Add publishers to the publisher list for publishing
+ :param cep_url:
"""
cep_ip = cep_url.split(':')[0]
cep_port = cep_url.split(':')[1]
@@ -199,27 +200,28 @@ class HealthStatisticsPublisher:
self.publishers.append(publisher)
-
- def is_cep_active(self, cep_url):
+ @staticmethod
+ def is_cep_active(cep_url):
"""
Check if the cep node is active
return true if active
+ :param cep_url:
"""
- self.ports = []
+ ports = []
cep_ip = cep_url.split(':')[0]
cep_port = cep_url.split(':')[1]
- self.ports.append(cep_port)
+ ports.append(cep_port)
cep_active = cartridgeagentutils.check_ports_active(
cep_ip,
- self.ports)
+ ports)
return cep_active
-
def publish_event(self, event):
"""
Publish events to cep nodes
+ :param event:
"""
for publisher in self.publishers:
try:
http://git-wip-us.apache.org/repos/asf/stratos/blob/81b72de8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
index f8b0c2b..cea3c01 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
@@ -89,6 +89,13 @@ def on_artifact_updated_event(artifacts_updated_event):
log.error("Repository path is empty. Failed to process artifact updated event.")
return
+ if not validate_repo_path(Config.app_path):
+ log.error(
+ "Repository path cannot be accessed, or is invalid. Failed to process artifact updated event. [App Path] %s"
+ % Config.app_path)
+
+ return
+
repo_username = artifacts_updated_event.repo_username
tenant_id = artifacts_updated_event.tenant_id
is_multitenant = Config.is_multiTenant
@@ -666,7 +673,7 @@ def find_tenant_domain(tenant_id):
def validate_repo_path(app_path):
# app path would be ex: /var/www, or /opt/server/data
- return os.access(app_path, os.W_OK)
+ return os.path.isabs(app_path)
class PluginExecutor(Thread):
http://git-wip-us.apache.org/repos/asf/stratos/blob/81b72de8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
index ab4975a..2fddf6e 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCExtensionTestCase.java
@@ -132,31 +132,4 @@ public class ADCExtensionTestCase extends PythonAgentIntegrationTest {
artifactUpdatedEvent.setTenantId(TENANT_ID);
return artifactUpdatedEvent;
}
-
- /**
- * Create test topology
- *
- * @return Topology object with mock information
- */
- private Topology createTestTopology() {
- Topology topology = new Topology();
- Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant);
- topology.addService(service);
-
- Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME,
- AUTOSCALING_POLICY_NAME, APP_ID);
- service.addCluster(cluster);
-
- Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, CLUSTER_INSTANCE_ID,
- NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, System.currentTimeMillis());
-
- member.setDefaultPrivateIP("10.0.0.1");
- member.setDefaultPublicIP("20.0.0.1");
- Properties properties = new Properties();
- properties.setProperty("prop1", "value1");
- member.setProperties(properties);
- member.setStatus(MemberStatus.Created);
- cluster.addMember(member);
- return topology;
- }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/81b72de8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
index 05d5ba2..b653632 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTenantUserTestCase.java
@@ -137,7 +137,18 @@ public class ADCMTAppTenantUserTestCase extends PythonAgentIntegrationTest {
sleep(2000);
// Send complete topology event
log.info("Publishing complete topology event...");
- Topology topology = createTestTopology();
+ Topology topology = PythonAgentIntegrationTest.createTestTopology(
+ SERVICE_NAME,
+ CLUSTER_ID,
+ DEPLOYMENT_POLICY_NAME,
+ AUTOSCALING_POLICY_NAME,
+ APP_ID,
+ MEMBER_ID,
+ CLUSTER_INSTANCE_ID,
+ NETWORK_PARTITION_ID,
+ PARTITION_ID,
+ ServiceType.SingleTenant);
+
CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
publishEvent(completeTopologyEvent);
log.info("Complete topology event published");
@@ -190,33 +201,4 @@ public class ADCMTAppTenantUserTestCase extends PythonAgentIntegrationTest {
artifactUpdatedEvent.setTenantId(TENANT_ID);
return artifactUpdatedEvent;
}
-
- /**
- * Create test topology
- *
- * @return
- */
- private Topology createTestTopology() {
- Topology topology = new Topology();
- Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant);
- topology.addService(service);
-
- Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME,
- AUTOSCALING_POLICY_NAME, APP_ID);
- service.addCluster(cluster);
-
- Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID,
- CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private,
- System.currentTimeMillis());
-
- member.setDefaultPrivateIP("10.0.0.1");
- member.setDefaultPublicIP("20.0.0.1");
- Properties properties = new Properties();
- properties.setProperty("prop1", "value1");
- member.setProperties(properties);
- member.setStatus(MemberStatus.Created);
- cluster.addMember(member);
-
- return topology;
- }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/81b72de8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
index 444a5e0..02144e0 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCMTAppTestCase.java
@@ -139,7 +139,18 @@ public class ADCMTAppTestCase extends PythonAgentIntegrationTest {
sleep(2000);
// Send complete topology event
log.info("Publishing complete topology event...");
- Topology topology = createTestTopology();
+ Topology topology = PythonAgentIntegrationTest.createTestTopology(
+ SERVICE_NAME,
+ CLUSTER_ID,
+ DEPLOYMENT_POLICY_NAME,
+ AUTOSCALING_POLICY_NAME,
+ APP_ID,
+ MEMBER_ID,
+ CLUSTER_INSTANCE_ID,
+ NETWORK_PARTITION_ID,
+ PARTITION_ID,
+ ServiceType.SingleTenant);
+
CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
publishEvent(completeTopologyEvent);
log.info("Complete topology event published");
@@ -193,32 +204,4 @@ public class ADCMTAppTestCase extends PythonAgentIntegrationTest {
artifactUpdatedEvent.setTenantId(TENANT_ID);
return artifactUpdatedEvent;
}
-
- /**
- * Create test topology
- *
- * @return
- */
- private Topology createTestTopology() {
- Topology topology = new Topology();
- Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant);
- topology.addService(service);
-
- Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME,
- AUTOSCALING_POLICY_NAME, APP_ID);
- service.addCluster(cluster);
-
- Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID,
- CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private,
- System.currentTimeMillis());
-
- member.setDefaultPrivateIP("10.0.0.1");
- member.setDefaultPublicIP("20.0.0.1");
- Properties properties = new Properties();
- properties.setProperty("prop1", "value1");
- member.setProperties(properties);
- member.setStatus(MemberStatus.Created);
- cluster.addMember(member);
- return topology;
- }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/81b72de8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
index dba6197..6a42cce 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCTestCase.java
@@ -196,7 +196,17 @@ public class ADCTestCase extends PythonAgentIntegrationTest {
sleep(2000);
// Send complete topology event
log.info("Publishing complete topology event...");
- Topology topology = createTestTopology();
+ Topology topology = PythonAgentIntegrationTest.createTestTopology(
+ SERVICE_NAME,
+ CLUSTER_ID,
+ DEPLOYMENT_POLICY_NAME,
+ AUTOSCALING_POLICY_NAME,
+ APP_ID,
+ MEMBER_ID,
+ CLUSTER_INSTANCE_ID,
+ NETWORK_PARTITION_ID,
+ PARTITION_ID,
+ ServiceType.SingleTenant);
CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
publishEvent(completeTopologyEvent);
log.info("Complete topology event published");
@@ -255,31 +265,4 @@ public class ADCTestCase extends PythonAgentIntegrationTest {
artifactUpdatedEvent.setTenantId(TENANT_ID);
return artifactUpdatedEvent;
}
-
- /**
- * Create test topology
- *
- * @return Topology object with mock information
- */
- private Topology createTestTopology() {
- Topology topology = new Topology();
- Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant);
- topology.addService(service);
-
- Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME,
- AUTOSCALING_POLICY_NAME, APP_ID);
- service.addCluster(cluster);
-
- Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, CLUSTER_INSTANCE_ID,
- NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, System.currentTimeMillis());
-
- member.setDefaultPrivateIP("10.0.0.1");
- member.setDefaultPublicIP("20.0.0.1");
- Properties properties = new Properties();
- properties.setProperty("prop1", "value1");
- member.setProperties(properties);
- member.setStatus(MemberStatus.Created);
- cluster.addMember(member);
- return topology;
- }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/81b72de8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCValidationTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCValidationTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCValidationTestCase.java
index 45834e5..50ed350 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCValidationTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/ADCValidationTestCase.java
@@ -21,7 +21,6 @@ package org.apache.stratos.python.cartridge.agent.integration.tests;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.domain.LoadBalancingIPType;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
@@ -33,7 +32,6 @@ import org.testng.annotations.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
/**
* Test validation for application path input on the PCA
@@ -92,7 +90,7 @@ public class ADCValidationTestCase extends PythonAgentIntegrationTest {
sleep(1000);
}
List<String> outputLines = new ArrayList<>();
- while (!outputStream.isClosed()) {
+ while (!outputStream.isClosed() && !logDetected) {
List<String> newLines = getNewLines(outputLines, outputStream.toString());
if (newLines.size() > 0) {
for (String line : newLines) {
@@ -100,7 +98,18 @@ public class ADCValidationTestCase extends PythonAgentIntegrationTest {
sleep(2000);
// Send complete topology event
log.info("Publishing complete topology event...");
- Topology topology = createTestTopology();
+ Topology topology = PythonAgentIntegrationTest.createTestTopology(
+ SERVICE_NAME,
+ CLUSTER_ID,
+ DEPLOYMENT_POLICY_NAME,
+ AUTOSCALING_POLICY_NAME,
+ APP_ID,
+ MEMBER_ID,
+ CLUSTER_INSTANCE_ID,
+ NETWORK_PARTITION_ID,
+ PARTITION_ID,
+ ServiceType.SingleTenant);
+
CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
publishEvent(completeTopologyEvent);
log.info("Complete topology event published");
@@ -151,31 +160,4 @@ public class ADCValidationTestCase extends PythonAgentIntegrationTest {
artifactUpdatedEvent.setTenantId(TENANT_ID);
return artifactUpdatedEvent;
}
-
- /**
- * Create test topology
- *
- * @return Topology object with mock information
- */
- private Topology createTestTopology() {
- Topology topology = new Topology();
- Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant);
- topology.addService(service);
-
- Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME,
- AUTOSCALING_POLICY_NAME, APP_ID);
- service.addCluster(cluster);
-
- Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, CLUSTER_INSTANCE_ID,
- NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, System.currentTimeMillis());
-
- member.setDefaultPrivateIP("10.0.0.1");
- member.setDefaultPublicIP("20.0.0.1");
- Properties properties = new Properties();
- properties.setProperty("prop1", "value1");
- member.setProperties(properties);
- member.setStatus(MemberStatus.Created);
- cluster.addMember(member);
- return topology;
- }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/81b72de8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
index db21359..4f529d7 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
@@ -61,7 +61,17 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
private boolean topologyContextTestCompleted = false;
private boolean completeTenantInitialized = false;
private boolean thriftTestCompleted = false;
- private Topology topology = createTestTopology();
+ private Topology topology = PythonAgentIntegrationTest.createTestTopology(
+ SERVICE_NAME,
+ CLUSTER_ID,
+ DEPLOYMENT_POLICY_NAME,
+ AUTOSCALING_POLICY_NAME,
+ APP_ID,
+ MEMBER_ID,
+ CLUSTER_INSTANCE_ID,
+ NETWORK_PARTITION_ID,
+ PARTITION_ID,
+ ServiceType.SingleTenant);
public AgentStartupTestCase() throws IOException {
}
@@ -217,31 +227,4 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
tenantList.add(new Tenant(3, "test.three.domain"));
return tenantList;
}
-
- /**
- * Create test topology
- *
- * @return Topology object with mock information
- */
- private Topology createTestTopology() {
- Topology topology = new Topology();
- Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant);
- topology.addService(service);
-
- Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME,
- AUTOSCALING_POLICY_NAME, APP_ID);
- service.addCluster(cluster);
-
- Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, CLUSTER_INSTANCE_ID,
- NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, System.currentTimeMillis());
-
- member.setDefaultPrivateIP("10.0.0.1");
- member.setDefaultPublicIP("20.0.0.1");
- Properties properties = new Properties();
- properties.setProperty("prop1", "value1");
- member.setProperties(properties);
- member.setStatus(MemberStatus.Created);
- cluster.addMember(member);
- return topology;
- }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/81b72de8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
index ce13d3f..58b6deb 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/CEPHAModeTestCase.java
@@ -21,12 +21,9 @@ package org.apache.stratos.python.cartridge.agent.integration.tests;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.domain.LoadBalancingIPType;
import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
import org.apache.stratos.messaging.event.topology.MemberInitializedEvent;
-import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener;
import org.apache.stratos.python.cartridge.agent.integration.common.ThriftTestServer;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -41,7 +38,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
public class CEPHAModeTestCase extends PythonAgentIntegrationTest {
@@ -59,7 +55,18 @@ public class CEPHAModeTestCase extends PythonAgentIntegrationTest {
private static final String TENANT_ID = "-1234";
private static final String SERVICE_NAME = "tomcat";
private boolean startupTestCompleted = false;
- private Topology topology = createTestTopology();
+ private Topology topology = PythonAgentIntegrationTest.createTestTopology(
+ SERVICE_NAME,
+ CLUSTER_ID,
+ DEPLOYMENT_POLICY_NAME,
+ AUTOSCALING_POLICY_NAME,
+ APP_ID,
+ MEMBER_ID,
+ CLUSTER_INSTANCE_ID,
+ NETWORK_PARTITION_ID,
+ PARTITION_ID,
+ ServiceType.SingleTenant);
+
private static final int ADC_TIMEOUT = 300000;
private ThriftTestServer secondThriftTestServer;
private boolean thriftTestCompletedinServerTwo = false;
@@ -180,19 +187,6 @@ public class CEPHAModeTestCase extends PythonAgentIntegrationTest {
startupTestThread.start();
- instanceStatusEventReceiver.addEventListener(new InstanceActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- log.info("Publishing complete topology with a new member...");
- Member newMember = new Member(SERVICE_NAME, CLUSTER_ID, "new-member", CLUSTER_INSTANCE_ID,
- NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, System.currentTimeMillis());
- topology.getService(SERVICE_NAME).getCluster(CLUSTER_ID).addMember(newMember);
- CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
- publishEvent(completeTopologyEvent);
- log.info("Complete topology event published with new member");
- }
- });
-
while (!instanceStarted || !instanceActivated || !startupTestCompleted ||
!thriftTestCompletedinServerOne || !thriftTestCompletedinServerTwo) {
// wait until the instance activated event is received.
@@ -274,32 +268,4 @@ public class CEPHAModeTestCase extends PythonAgentIntegrationTest {
}
});
}
-
- /**
- * Create test topology
- *
- * @return Topology object with mock information
- */
- private Topology createTestTopology() {
- Topology topology = new Topology();
- Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant);
- topology.addService(service);
-
- Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME,
- AUTOSCALING_POLICY_NAME, APP_ID);
- service.addCluster(cluster);
-
- Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID,
- CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private,
- System.currentTimeMillis());
-
- member.setDefaultPrivateIP("10.0.0.1");
- member.setDefaultPublicIP("20.0.0.1");
- Properties properties = new Properties();
- properties.setProperty("prop1", "value1");
- member.setProperties(properties);
- member.setStatus(MemberStatus.Created);
- cluster.addMember(member);
- return topology;
- }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/81b72de8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
index 8c72f2d..1fdd8cd 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
@@ -212,7 +212,18 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
sleep(2000);
// Send complete topology event
log.info("Publishing complete topology event...");
- Topology topology = createTestTopology();
+ Topology topology = PythonAgentIntegrationTest.createTestTopology(
+ SERVICE_NAME,
+ CLUSTER_ID,
+ DEPLOYMENT_POLICY_NAME,
+ AUTOSCALING_POLICY_NAME,
+ APP_ID,
+ MEMBER_ID,
+ CLUSTER_INSTANCE_ID,
+ NETWORK_PARTITION_ID,
+ PARTITION_ID,
+ ServiceType.SingleTenant);
+
CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
publishEvent(completeTopologyEvent);
log.info("Complete topology event published");
@@ -245,30 +256,5 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
log.info("PCA activation assertion passed.");
}
- /**
- * Create test topology
- *
- * @return
- */
- private Topology createTestTopology() {
- Topology topology = new Topology();
- Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant);
- topology.addService(service);
-
- Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME,
- AUTOSCALING_POLICY_NAME, APP_ID);
- service.addCluster(cluster);
- Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, CLUSTER_INSTANCE_ID,
- NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, System.currentTimeMillis());
-
- member.setDefaultPrivateIP("10.0.0.1");
- member.setDefaultPublicIP("20.0.0.1");
- Properties properties = new Properties();
- properties.setProperty("prop1", "value1");
- member.setProperties(properties);
- member.setStatus(MemberStatus.Created);
- cluster.addMember(member);
- return topology;
- }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/81b72de8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index 7f436f6..649430f 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -28,9 +28,11 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.domain.LoadBalancingIPType;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener;
import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener;
@@ -549,8 +551,8 @@ public abstract class PythonAgentIntegrationTest {
if (StringUtils.isNotBlank(output)) {
List<String> lines = Arrays.asList(output.split(NEW_LINE));
if (lines.size() > 0) {
- int readStartIndex = (currentOutputLines.size() > 0) ? currentOutputLines.size() - 1 : 0;
- for (String line : lines.subList(readStartIndex , lines.size() - 1)) {
+ int readStartIndex = (currentOutputLines.size() > 0) ? (currentOutputLines.size() - 1) : 0;
+ for (String line : lines.subList(readStartIndex , lines.size())) {
currentOutputLines.add(line);
newLines.add(line);
}
@@ -586,4 +588,59 @@ public abstract class PythonAgentIntegrationTest {
return closed;
}
}
+
+ /**
+ * Create a test topology object
+ *
+ * @param serviceName
+ * @param clusterId
+ * @param depPolicyName
+ * @param autoscalingPolicyName
+ * @param appId
+ * @param memberId
+ * @param clusterInstanceId
+ * @param networkPartitionId
+ * @param partitionId
+ * @param serviceType
+ * @return
+ */
+ protected static Topology createTestTopology(
+ String serviceName,
+ String clusterId,
+ String depPolicyName,
+ String autoscalingPolicyName,
+ String appId,
+ String memberId,
+ String clusterInstanceId,
+ String networkPartitionId,
+ String partitionId,
+ ServiceType serviceType) {
+
+
+ Topology topology = new Topology();
+ Service service = new Service(serviceName, serviceType);
+ topology.addService(service);
+
+ Cluster cluster = new Cluster(service.getServiceName(), clusterId, depPolicyName, autoscalingPolicyName, appId);
+ service.addCluster(cluster);
+
+ Member member = new Member(
+ service.getServiceName(),
+ cluster.getClusterId(),
+ memberId,
+ clusterInstanceId,
+ networkPartitionId,
+ partitionId,
+ LoadBalancingIPType.Private,
+ System.currentTimeMillis());
+
+ member.setDefaultPrivateIP("10.0.0.1");
+ member.setDefaultPublicIP("20.0.0.1");
+ Properties properties = new Properties();
+ properties.setProperty("prop1", "value1");
+ member.setProperties(properties);
+ member.setStatus(MemberStatus.Created);
+ cluster.addMember(member);
+ return topology;
+ }
}