You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ra...@apache.org on 2015/11/11 10:45:42 UTC
[1/3] stratos git commit: Closing STRATOS-1544, STRATOS-1612,
STRATOS-1611: topology, tenant, application model initialize optimization
Repository: stratos
Updated Branches:
refs/heads/stratos-4.1.x 00f624b48 -> 60b801144
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
index f78f460..0994048 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
@@ -17,12 +17,12 @@
import paho.mqtt.publish as publish
-from modules.event.instance.status.events import *
-from modules.util.log import *
-from modules.util import cartridgeagentutils
-import healthstats
import constants
+import healthstats
from config import Config
+from modules.event.instance.status.events import *
+from modules.util import cartridgeagentutils
+from modules.util.log import *
log = LogFactory().get_log(__name__)
publishers = {}
@@ -183,6 +183,20 @@ def publish_instance_ready_to_shutdown_event():
log.warn("Instance already in a ReadyToShutDown event...")
+def publish_complete_topology_request_event():
+ complete_topology_request_event = CompleteTopologyRequestEvent()
+ publisher = get_publisher(constants.INITIALIZER_TOPIC + constants.COMPLETE_TOPOLOGY_REQUEST_EVENT)
+ publisher.publish(complete_topology_request_event)
+ log.info("Complete topology request event published")
+
+
+def publish_complete_tenant_request_event():
+ complete_tenant_request_event = CompleteTenantRequestEvent()
+ publisher = get_publisher(constants.INITIALIZER_TOPIC + constants.COMPLETE_TENANT_REQUEST_EVENT)
+ publisher.publish(complete_tenant_request_event)
+ log.info("Complete tenant request event published")
+
+
def get_publisher(topic):
if topic not in publishers:
publishers[topic] = EventPublisher(topic)
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
index 8f80013..f0a70d4 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
@@ -22,10 +22,14 @@ package org.apache.stratos.python.cartridge.agent.integration.tests;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.domain.LoadBalancingIPType;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
import org.apache.stratos.messaging.event.topology.MemberInitializedEvent;
+import org.apache.stratos.messaging.listener.initializer.CompleteTenantRequestEventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteTopologyRequestEventListener;
import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -55,6 +59,7 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
private static final String SERVICE_NAME = "php";
private boolean startupTestCompleted = false;
private boolean topologyContextTestCompleted = false;
+ private boolean completeTenantInitialized = false;
private boolean thriftTestCompleted = false;
private Topology topology = createTestTopology();
@@ -74,7 +79,6 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
startServerSocket(8080);
}
-
/**
* TearDown method for test method testPythonCartridgeAgent
*/
@@ -83,8 +87,9 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
tearDown();
}
- @Test(timeOut = STARTUP_TIMEOUT, description = "Test PCA initialization, activation, health stat publishing and " +
- "topology context update", groups = {"smoke"})
+ @Test(timeOut = STARTUP_TIMEOUT,
+ description = "Test PCA initialization, activation, health stat publishing and " + "topology context update",
+ groups = { "smoke" })
public void testPythonCartridgeAgent() {
startCommunicatorThread();
subscribeToThriftDatabridge();
@@ -99,24 +104,6 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
List<String> newLines = getNewLines(outputLines, outputStream.toString());
if (newLines.size() > 0) {
for (String line : newLines) {
- if (line.contains("Subscribed to 'topology/#'")) {
- sleep(2000);
- // Send complete topology event
- log.info("Publishing complete topology event...");
- CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
- publishEvent(completeTopologyEvent);
- log.info("Complete topology event published");
-
- // Publish member initialized event
- log.info("Publishing member initialized event...");
- MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent(
- SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID,
- PARTITION_ID, INSTANCE_ID
- );
- publishEvent(memberInitializedEvent);
- log.info("Member initialized event published");
- }
-
if (line.contains("Published event to thrift stream")) {
startupTestCompleted = true;
}
@@ -125,6 +112,11 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
if (line.contains("Topology context update test passed!")) {
topologyContextTestCompleted = true;
}
+
+ // assert complete tenant initialization
+ if (line.contains("Tenant context updated with")){
+ completeTenantInitialized = true;
+ }
}
}
sleep(1000);
@@ -134,6 +126,35 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
startupTestThread.start();
+ initializerEventReceiver.addEventListener(new CompleteTopologyRequestEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ // Send complete topology event
+ log.info("CompleteTopologyRequestEvent received. Publishing complete topology event...");
+ CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
+ publishEvent(completeTopologyEvent);
+ log.info("Complete topology event published");
+
+ // Publish member initialized event
+ log.info("Publishing member initialized event...");
+ MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent(SERVICE_NAME, CLUSTER_ID,
+ CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, PARTITION_ID, INSTANCE_ID);
+ publishEvent(memberInitializedEvent);
+ log.info("Member initialized event published");
+ }
+ });
+
+ initializerEventReceiver.addEventListener(new CompleteTenantRequestEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ // Send complete tenant event
+ log.info("CompleteTenantRequestEvent received. Publishing complete tenant event...");
+ CompleteTenantEvent completeTenantEvent = new CompleteTenantEvent(createTestTenantList());
+ publishEvent(completeTenantEvent);
+ log.info("Complete tenant event published");
+ }
+ });
+
instanceStatusEventReceiver.addEventListener(new InstanceActivatedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -148,7 +169,7 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
});
while (!instanceStarted || !instanceActivated || !startupTestCompleted || !topologyContextTestCompleted ||
- !thriftTestCompleted) {
+ !thriftTestCompleted || !completeTenantInitialized) {
// wait until the instance activated event is received.
// this will assert whether instance got activated within timeout period; no need for explicit assertions
sleep(2000);
@@ -180,6 +201,19 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
}
/**
+ * Create test tenant list
+ *
+ * @return List of tenant objects with mock information
+ */
+ private List<Tenant> createTestTenantList() {
+ List<Tenant> tenantList = new ArrayList<>();
+ tenantList.add(new Tenant(1, "test.one.domain"));
+ tenantList.add(new Tenant(2, "test.two.domain"));
+ tenantList.add(new Tenant(3, "test.three.domain"));
+ return tenantList;
+ }
+
+ /**
* Create test topology
*
* @return Topology object with mock information
@@ -193,9 +227,8 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
AUTOSCALING_POLICY_NAME, APP_ID);
service.addCluster(cluster);
- Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID,
- CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private,
- System.currentTimeMillis());
+ Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, CLUSTER_INSTANCE_ID,
+ NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, System.currentTimeMillis());
member.setDefaultPrivateIP("10.0.0.1");
member.setDefaultPublicIP("20.0.0.1");
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index 66b9290..d441c1e 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener;
import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener;
+import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
import org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
@@ -72,6 +73,7 @@ public class PythonAgentIntegrationTest {
protected boolean eventReceiverInitiated = false;
protected TopologyEventReceiver topologyEventReceiver;
protected InstanceStatusEventReceiver instanceStatusEventReceiver;
+ protected InitializerEventReceiver initializerEventReceiver;
protected boolean instanceStarted;
protected boolean instanceActivated;
protected ByteArrayOutputStreamLocal outputStream;
@@ -112,14 +114,17 @@ public class PythonAgentIntegrationTest {
}
});
+ initializerEventReceiver = new InitializerEventReceiver();
+ initializerEventReceiver.setExecutorService(executorService);
+ initializerEventReceiver.execute();
+
this.eventReceiverInitiated = true;
}
// Start CEP Thrift test server
thriftTestServer = new ThriftTestServer();
- File file =
- new File(getResourcesPath() + PATH_SEP + "common" + PATH_SEP + "stratos-health-stream-def.json");
+ File file = new File(getResourcesPath() + PATH_SEP + "common" + PATH_SEP + "stratos-health-stream-def.json");
FileInputStream fis = new FileInputStream(file);
byte[] data = new byte[(int) file.length()];
fis.read(data);
@@ -139,7 +144,6 @@ public class PythonAgentIntegrationTest {
this.outputStream = executeCommand("python " + agentPath + PATH_SEP + "agent.py", timeout);
}
-
protected void tearDown() {
tearDown(null);
}
@@ -155,8 +159,7 @@ public class PythonAgentIntegrationTest {
log.info("Terminating process: " + commandText);
executor.setExitValue(0);
executor.getWatchdog().destroyProcess();
- }
- catch (Exception ignore) {
+ } catch (Exception ignore) {
}
}
// wait until everything cleans up to avoid connection errors
@@ -165,36 +168,34 @@ public class PythonAgentIntegrationTest {
try {
log.info("Stopping socket server: " + serverSocket.getLocalSocketAddress());
serverSocket.close();
- }
- catch (IOException ignore) {
+ } catch (IOException ignore) {
}
}
try {
if (thriftTestServer != null) {
thriftTestServer.stop();
}
- }
- catch (Exception ignore) {
+ } catch (Exception ignore) {
}
if (sourcePath != null) {
try {
log.info("Deleting source checkout folder...");
FileUtils.deleteDirectory(new File(sourcePath));
- }
- catch (Exception ignore) {
+ } catch (Exception ignore) {
}
}
+ log.info("Terminating event receivers...");
this.instanceStatusEventReceiver.terminate();
this.topologyEventReceiver.terminate();
+ this.initializerEventReceiver.terminate();
this.instanceActivated = false;
this.instanceStarted = false;
try {
broker.stop();
broker = null;
- }
- catch (Exception ignore) {
+ } catch (Exception ignore) {
}
// TODO: use thread synchronization and assert all connections are properly closed
// leave some room to clear up active connections
@@ -203,8 +204,7 @@ public class PythonAgentIntegrationTest {
public PythonAgentIntegrationTest() throws IOException {
integrationProperties
- .load(PythonAgentIntegrationTest.class
- .getResourceAsStream(PATH_SEP + "integration-test.properties"));
+ .load(PythonAgentIntegrationTest.class.getResourceAsStream(PATH_SEP + "integration-test.properties"));
distributionName = integrationProperties.getProperty(DISTRIBUTION_NAME);
amqpBindAddress = integrationProperties.getProperty(ACTIVEMQ_AMQP_BIND_ADDRESS);
mqttBindAddress = integrationProperties.getProperty(ACTIVEMQ_MQTT_BIND_ADDRESS);
@@ -223,7 +223,7 @@ public class PythonAgentIntegrationTest {
AuthenticationUser authenticationUser = new AuthenticationUser("system", "manager", "users,admins");
List<AuthenticationUser> authUserList = new ArrayList<>();
authUserList.add(authenticationUser);
- broker.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(authUserList)});
+ broker.setPlugins(new BrokerPlugin[] { new SimpleAuthenticationPlugin(authUserList) });
broker.setBrokerName("testBroker");
broker.setDataDirectory(
PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." + PATH_SEP +
@@ -244,8 +244,7 @@ public class PythonAgentIntegrationTest {
if (line.contains("Exception in thread") || line.contains("ERROR")) {
try {
throw new RuntimeException(line);
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error("ERROR found in PCA log", e);
}
}
@@ -284,8 +283,7 @@ public class PythonAgentIntegrationTest {
log.info("Message received for [port] " + port + ", [message] " + output);
}
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
String message = "Could not start server socket: [port] " + port;
log.error(message, e);
throw new RuntimeException(message, e);
@@ -300,7 +298,6 @@ public class PythonAgentIntegrationTest {
".." + PATH_SEP + "src" + PATH_SEP + "test" + PATH_SEP + "resources" + PATH_SEP + "common";
}
-
public static String getResourcesPath() {
return PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." + PATH_SEP +
".." + PATH_SEP + "src" + PATH_SEP + "test" + PATH_SEP + "resources";
@@ -320,13 +317,12 @@ public class PythonAgentIntegrationTest {
try {
log.info("Setting up python cartridge agent...");
-
String srcAgentPath = PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() +
PATH_SEP + ".." + PATH_SEP + ".." + PATH_SEP + ".." + PATH_SEP + ".." + PATH_SEP + "distribution" +
PATH_SEP + "target" + PATH_SEP + distributionName + ".zip";
- String unzipDestPath =
- PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." + PATH_SEP +
- PYTHON_AGENT_DIR_NAME + PATH_SEP;
+ String unzipDestPath = PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".."
+ + PATH_SEP +
+ PYTHON_AGENT_DIR_NAME + PATH_SEP;
//FileUtils.copyFile(new File(srcAgentPath), new File(destAgentPath));
unzip(srcAgentPath, unzipDestPath);
String destAgentPath = PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." +
@@ -370,8 +366,7 @@ public class PythonAgentIntegrationTest {
log.info("Python cartridge agent setup completed");
return destAgentPath;
- }
- catch (Exception e) {
+ } catch (Exception e) {
String message = "Could not copy cartridge agent distribution";
log.error(message, e);
throw new RuntimeException(message, e);
@@ -442,8 +437,7 @@ public class PythonAgentIntegrationTest {
});
executorList.put(commandText, exec);
return outputStream;
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error(outputStream.toString(), e);
throw new RuntimeException(e);
}
@@ -457,8 +451,7 @@ public class PythonAgentIntegrationTest {
protected void sleep(long time) {
try {
Thread.sleep(time);
- }
- catch (InterruptedException ignore) {
+ } catch (InterruptedException ignore) {
}
}
@@ -495,7 +488,6 @@ public class PythonAgentIntegrationTest {
eventPublisher.publish(event);
}
-
/**
* Implements ByteArrayOutputStream.isClosed() method
*/
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
index 721a5c6..e4650e4 100644
--- a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
+++ b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
@@ -35,6 +35,10 @@ import org.apache.stratos.messaging.listener.application.*;
import org.apache.stratos.messaging.listener.topology.*;
import org.apache.stratos.messaging.message.receiver.application.ApplicationManager;
import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
+import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
+import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpManager;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.mock.iaas.client.MockIaasApiClient;
@@ -55,12 +59,18 @@ public class TopologyHandler {
public static final int APPLICATION_ACTIVATION_TIMEOUT = 500000;
public static final int APPLICATION_UNDEPLOYMENT_TIMEOUT = 500000;
public static final int MEMBER_TERMINATION_TIMEOUT = 500000;
- public static final int APPLICATION_TOPOLOGY_TIMEOUT = 120000;
+ public static final int APPLICATION_INIT_TIMEOUT = 20000;
+ public static final int TENANT_INIT_TIMEOUT = 20000;
+ public static final int APPLICATION_SIGNUP_INIT_TIMEOUT = 20000;
+ public static final int TOPOLOGY_INIT_TIMEOUT = 20000;
public static final String APPLICATION_STATUS_CREATED = "Created";
public static final String APPLICATION_STATUS_UNDEPLOYING = "Undeploying";
private ApplicationsEventReceiver applicationsEventReceiver;
private TopologyEventReceiver topologyEventReceiver;
+ private TenantEventReceiver tenantEventReceiver;
+ private ApplicationSignUpEventReceiver applicationSignUpEventReceiver;
public static TopologyHandler topologyHandler;
+ private ExecutorService executorService = StratosThreadPool.getExecutorService("stratos.integration.test.pool", 10);
private Map<String, Long> terminatedMembers = new ConcurrentHashMap<String, Long>();
private Map<String, Long> terminatingMembers = new ConcurrentHashMap<String, Long>();
private Map<String, Long> createdMembers = new ConcurrentHashMap<String, Long>();
@@ -70,12 +80,28 @@ public class TopologyHandler {
private TopologyHandler() {
initializeApplicationEventReceiver();
initializeTopologyEventReceiver();
+ initializeTenantEventReceiver();
+ initializeApplicationSignUpEventReceiver();
assertApplicationTopologyInitialized();
assertTopologyInitialized();
+ assertTenantInitialized();
+ assertApplicationSignUpInitialized();
addTopologyEventListeners();
addApplicationEventListeners();
}
+ private void initializeApplicationSignUpEventReceiver() {
+ applicationSignUpEventReceiver = new ApplicationSignUpEventReceiver();
+ applicationSignUpEventReceiver.setExecutorService(executorService);
+ applicationSignUpEventReceiver.execute();
+ }
+
+ private void initializeTenantEventReceiver() {
+ tenantEventReceiver = new TenantEventReceiver();
+ tenantEventReceiver.setExecutorService(executorService);
+ tenantEventReceiver.execute();
+ }
+
public static TopologyHandler getInstance() {
if (topologyHandler == null) {
synchronized (TopologyHandler.class) {
@@ -91,30 +117,25 @@ public class TopologyHandler {
* Initialize application event receiver
*/
private void initializeApplicationEventReceiver() {
- if (applicationsEventReceiver == null) {
- applicationsEventReceiver = new ApplicationsEventReceiver();
- ExecutorService executorService = StratosThreadPool.getExecutorService("STRATOS_TEST_SERVER", 1);
- applicationsEventReceiver.setExecutorService(executorService);
- applicationsEventReceiver.execute();
- }
+ applicationsEventReceiver = new ApplicationsEventReceiver();
+ applicationsEventReceiver.setExecutorService(executorService);
+ applicationsEventReceiver.execute();
}
/**
* Initialize Topology event receiver
*/
private void initializeTopologyEventReceiver() {
- if (topologyEventReceiver == null) {
- topologyEventReceiver = new TopologyEventReceiver();
- ExecutorService executorService = StratosThreadPool.getExecutorService("STRATOS_TEST_SERVER1", 1);
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
- }
+ topologyEventReceiver = new TopologyEventReceiver();
+ topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
}
/**
* Assert application Topology initialization
*/
private void assertApplicationTopologyInitialized() {
+ log.info(String.format("Asserting application topology initialization within %d ms", APPLICATION_INIT_TIMEOUT));
long startTime = System.currentTimeMillis();
boolean applicationTopologyInitialized = ApplicationManager.getApplications().isInitialized();
while (!applicationTopologyInitialized) {
@@ -123,18 +144,24 @@ public class TopologyHandler {
} catch (InterruptedException ignore) {
}
applicationTopologyInitialized = ApplicationManager.getApplications().isInitialized();
- if ((System.currentTimeMillis() - startTime) > APPLICATION_TOPOLOGY_TIMEOUT) {
+ if ((System.currentTimeMillis() - startTime) > APPLICATION_INIT_TIMEOUT) {
break;
}
}
- assertEquals(String.format("Application Topology didn't get initialized "), applicationTopologyInitialized,
- true);
+ if (applicationTopologyInitialized) {
+ log.info(String.format("Application topology initialized under %d ms",
+ (System.currentTimeMillis() - startTime)));
+ }
+ assertEquals(
+ String.format("Application topology didn't get initialized within %d ms", APPLICATION_INIT_TIMEOUT),
+ applicationTopologyInitialized, true);
}
/**
* Assert Topology initialization
*/
private void assertTopologyInitialized() {
+ log.info(String.format("Asserting topology initialization within %d ms", TOPOLOGY_INIT_TIMEOUT));
long startTime = System.currentTimeMillis();
boolean topologyInitialized = TopologyManager.getTopology().isInitialized();
while (!topologyInitialized) {
@@ -143,11 +170,59 @@ public class TopologyHandler {
} catch (InterruptedException ignore) {
}
topologyInitialized = TopologyManager.getTopology().isInitialized();
- if ((System.currentTimeMillis() - startTime) > APPLICATION_TOPOLOGY_TIMEOUT) {
+ if ((System.currentTimeMillis() - startTime) > TOPOLOGY_INIT_TIMEOUT) {
break;
}
}
- assertEquals(String.format("Topology didn't get initialized "), topologyInitialized, true);
+ if (topologyInitialized) {
+ log.info(String.format("Topology initialized under %d ms", (System.currentTimeMillis() - startTime)));
+ }
+ assertEquals(String.format("Topology didn't get initialized within %d ms", TOPOLOGY_INIT_TIMEOUT),
+ topologyInitialized, true);
+ }
+
+ private void assertTenantInitialized() {
+ log.info(String.format("Asserting tenant model initialization within %d ms", TENANT_INIT_TIMEOUT));
+ long startTime = System.currentTimeMillis();
+ boolean tenantInitialized = TenantManager.getInstance().isInitialized();
+ while (!tenantInitialized) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ tenantInitialized = TenantManager.getInstance().isInitialized();
+ if ((System.currentTimeMillis() - startTime) > TENANT_INIT_TIMEOUT) {
+ break;
+ }
+ }
+ if (tenantInitialized) {
+ log.info(String.format("Tenant model initialized under %d ms", (System.currentTimeMillis() - startTime)));
+ }
+ assertEquals(String.format("Tenant model didn't get initialized within %d ms", TENANT_INIT_TIMEOUT),
+ tenantInitialized, true);
+ }
+
+ private void assertApplicationSignUpInitialized() {
+ log.info(String.format("Asserting application signup initialization within %d ms",
+ APPLICATION_SIGNUP_INIT_TIMEOUT));
+ long startTime = System.currentTimeMillis();
+ boolean applicationSignUpInitialized = ApplicationSignUpManager.getInstance().isInitialized();
+ while (!applicationSignUpInitialized) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ applicationSignUpInitialized = ApplicationSignUpManager.getInstance().isInitialized();
+ if ((System.currentTimeMillis() - startTime) > APPLICATION_SIGNUP_INIT_TIMEOUT) {
+ break;
+ }
+ }
+ if (applicationSignUpInitialized) {
+ log.info(String.format("Application signup initialized under %d ms",
+ (System.currentTimeMillis() - startTime)));
+ }
+ assertEquals(String.format("Application signup didn't get initialized within %d ms",
+ APPLICATION_SIGNUP_INIT_TIMEOUT), applicationSignUpInitialized, true);
}
/**
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/products/stratos/modules/integration/test-integration/src/test/resources/common/log4j.properties
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-integration/src/test/resources/common/log4j.properties b/products/stratos/modules/integration/test-integration/src/test/resources/common/log4j.properties
index 6fc6f45..b99dd63 100644
--- a/products/stratos/modules/integration/test-integration/src/test/resources/common/log4j.properties
+++ b/products/stratos/modules/integration/test-integration/src/test/resources/common/log4j.properties
@@ -59,7 +59,7 @@ log4j.logger.org.apache.stratos.cloud.controller=DEBUG
log4j.logger.org.wso2.andes.client=ERROR
# Autoscaler rule logs
log4j.logger.org.apache.stratos.autoscaler.rule.RuleLog=DEBUG
-log4j.logger.org.apache.stratos.cloud.controller.messaging.topology.TopologyManager=INFO
+log4j.logger.org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder=INFO
log4j.logger.org.apache.stratos.mock.iaas.client=DEBUG
log4j.logger.org.apache.stratos.mock.iaas.services=DEBUG
log4j.logger.org.apache.stratos.metadata.service=DEBUG
[2/3] stratos git commit: Closing STRATOS-1544, STRATOS-1612,
STRATOS-1611: topology, tenant, application model initialize optimization
Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 5f7bd01..bd09d7e 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -22,7 +22,6 @@ import com.hazelcast.core.HazelcastInstance;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.Component;
-import org.apache.stratos.common.services.ComponentActivationEventListener;
import org.apache.stratos.common.services.ComponentStartUpSynchronizer;
import org.apache.stratos.common.services.DistributedObjectProvider;
import org.apache.stratos.common.threading.StratosThreadPool;
@@ -31,6 +30,7 @@ import org.apache.stratos.manager.messaging.publisher.TenantEventPublisher;
import org.apache.stratos.manager.messaging.publisher.synchronizer.ApplicationSignUpEventSynchronizer;
import org.apache.stratos.manager.messaging.publisher.synchronizer.TenantEventSynchronizer;
import org.apache.stratos.manager.messaging.receiver.StratosManagerApplicationEventReceiver;
+import org.apache.stratos.manager.messaging.receiver.StratosManagerInitializerTopicReceiver;
import org.apache.stratos.manager.messaging.receiver.StratosManagerInstanceStatusEventReceiver;
import org.apache.stratos.manager.messaging.receiver.StratosManagerTopologyEventReceiver;
import org.apache.stratos.manager.user.management.TenantUserRoleManager;
@@ -73,9 +73,11 @@ import java.util.concurrent.TimeUnit;
* @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
* cardinality="1..1" policy="dynamic" bind="setTaskService"
* unbind="unsetTaskService"
- * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.services.DistributedObjectProvider"
+ * @scr.reference name="distributedObjectProvider"
+ * interface="org.apache.stratos.common.services.DistributedObjectProvider"
* cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
- * @scr.reference name="componentStartUpSynchronizer" interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
+ * @scr.reference name="componentStartUpSynchronizer"
+ * interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
* cardinality="1..1" policy="dynamic" bind="setComponentStartUpSynchronizer" unbind="unsetComponentStartUpSynchronizer"
*/
public class StratosManagerServiceComponent {
@@ -89,6 +91,7 @@ public class StratosManagerServiceComponent {
private StratosManagerTopologyEventReceiver topologyEventReceiver;
private StratosManagerInstanceStatusEventReceiver instanceStatusEventReceiver;
private StratosManagerApplicationEventReceiver applicationEventReceiver;
+ private StratosManagerInitializerTopicReceiver initializerTopicReceiver;
private ExecutorService executorService;
private ScheduledExecutorService scheduler;
@@ -98,21 +101,21 @@ public class StratosManagerServiceComponent {
}
try {
executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE);
- scheduler = StratosThreadPool.getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID,
- SCHEDULER_THREAD_POOL_SIZE);
+ scheduler = StratosThreadPool
+ .getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, SCHEDULER_THREAD_POOL_SIZE);
Runnable stratosManagerActivator = new Runnable() {
@Override
public void run() {
try {
- ComponentStartUpSynchronizer componentStartUpSynchronizer =
- ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
+ ComponentStartUpSynchronizer componentStartUpSynchronizer = ServiceReferenceHolder.getInstance()
+ .getComponentStartUpSynchronizer();
// Wait for cloud controller and autoscaler components to be activated
- componentStartUpSynchronizer.waitForComponentActivation(Component.StratosManager,
- Component.CloudController);
- componentStartUpSynchronizer.waitForComponentActivation(Component.StratosManager,
- Component.Autoscaler);
+ componentStartUpSynchronizer
+ .waitForComponentActivation(Component.StratosManager, Component.CloudController);
+ componentStartUpSynchronizer
+ .waitForComponentActivation(Component.StratosManager, Component.Autoscaler);
CartridgeConfigFileReader.readProperties();
if (StratosManagerContext.getInstance().isClustered()) {
@@ -123,8 +126,8 @@ public class StratosManagerServiceComponent {
ServiceReferenceHolder.getInstance().getHazelcastInstance()
.getLock(STRATOS_MANAGER_COORDINATOR_LOCK).lock();
- String localMemberId = ServiceReferenceHolder.getInstance().getHazelcastInstance()
- .getCluster().getLocalMember().getUuid();
+ String localMemberId = ServiceReferenceHolder.getInstance()
+ .getHazelcastInstance().getCluster().getLocalMember().getUuid();
log.info("Elected this member [" + localMemberId + "] " +
"as the stratos manager coordinator for the cluster");
@@ -149,8 +152,8 @@ public class StratosManagerServiceComponent {
// Initialize application event receiver
initializeApplicationEventReceiver();
- componentStartUpSynchronizer.waitForAxisServiceActivation(Component.StratosManager,
- "StratosManagerService");
+ componentStartUpSynchronizer
+ .waitForAxisServiceActivation(Component.StratosManager, "StratosManagerService");
componentStartUpSynchronizer.setComponentStatus(Component.StratosManager, true);
if (log.isInfoEnabled()) {
log.info("Stratos manager component is activated");
@@ -174,17 +177,25 @@ public class StratosManagerServiceComponent {
* @throws UserStoreException
* @throws UserManagerException
*/
- private void executeCoordinatorTasks(ComponentContext componentContext) throws UserStoreException,
- UserManagerException {
-
+ private void executeCoordinatorTasks(ComponentContext componentContext)
+ throws UserStoreException, UserManagerException {
initializeTenantEventPublisher(componentContext);
initializeInstanceStatusEventReceiver();
- registerComponentStartUpEventListeners();
-
+ initializeInitializerEventReceiver();
+ Runnable tenantSynchronizer = new TenantEventSynchronizer();
+ scheduler.scheduleAtFixedRate(tenantSynchronizer, 0, 1, TimeUnit.MINUTES);
+ Runnable applicationSignUpSynchronizer = new ApplicationSignUpEventSynchronizer();
+ scheduler.scheduleAtFixedRate(applicationSignUpSynchronizer, 0, 1, TimeUnit.MINUTES);
// Create internal/user Role at server start-up
createInternalUserRole(componentContext);
}
+ private void initializeInitializerEventReceiver() {
+ initializerTopicReceiver = new StratosManagerInitializerTopicReceiver();
+ initializerTopicReceiver.setExecutorService(executorService);
+ initializerTopicReceiver.execute();
+ }
+
/**
* Initialize instance status event receiver
*/
@@ -219,16 +230,17 @@ public class StratosManagerServiceComponent {
* @throws UserStoreException
* @throws UserManagerException
*/
- private void createInternalUserRole(ComponentContext componentContext) throws UserStoreException, UserManagerException {
+ private void createInternalUserRole(ComponentContext componentContext)
+ throws UserStoreException, UserManagerException {
RealmService realmService = ServiceReferenceHolder.getRealmService();
UserRealm realm = realmService.getBootstrapRealm();
UserStoreManager userStoreManager = realm.getUserStoreManager();
UserRoleCreator.createInternalUserRole(userStoreManager);
TenantUserRoleManager tenantUserRoleManager = new TenantUserRoleManager();
- componentContext.getBundleContext().registerService(
- org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(),
- tenantUserRoleManager, null);
+ componentContext.getBundleContext()
+ .registerService(org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(),
+ tenantUserRoleManager, null);
}
/**
@@ -242,44 +254,19 @@ public class StratosManagerServiceComponent {
log.debug("Initializing tenant event publisher...");
}
final TenantEventPublisher tenantEventPublisher = new TenantEventPublisher();
- componentContext.getBundleContext().registerService(
- org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(),
- tenantEventPublisher, null);
+ componentContext.getBundleContext()
+ .registerService(org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(),
+ tenantEventPublisher, null);
if (log.isInfoEnabled()) {
log.info("Tenant event publisher initialized");
}
}
- private void registerComponentStartUpEventListeners() {
- ComponentStartUpSynchronizer componentStartUpSynchronizer =
- ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
- if (componentStartUpSynchronizer.isEnabled()) {
- componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() {
- @Override
- public void activated(Component component) {
- if (component == Component.StratosManager) {
- scheduleEventSynchronizers();
- }
- }
- });
- } else {
- scheduleEventSynchronizers();
- }
- }
-
- private void scheduleEventSynchronizers() {
- Runnable tenantSynchronizer = new TenantEventSynchronizer();
- scheduler.scheduleAtFixedRate(tenantSynchronizer, 0, 1, TimeUnit.MINUTES);
-
- Runnable applicationSignUpSynchronizer = new ApplicationSignUpEventSynchronizer();
- scheduler.scheduleAtFixedRate(applicationSignUpSynchronizer, 0, 1, TimeUnit.MINUTES);
- }
-
protected void setConfigurationContextService(ConfigurationContextService contextService) {
ServiceReferenceHolder.setClientConfigContext(contextService.getClientConfigContext());
ServiceReferenceHolder.setServerConfigContext(contextService.getServerConfigContext());
- ServiceReferenceHolder.getInstance().setAxisConfiguration(
- contextService.getServerConfigContext().getAxisConfiguration());
+ ServiceReferenceHolder.getInstance()
+ .setAxisConfiguration(contextService.getServerConfigContext().getAxisConfiguration());
}
protected void unsetConfigurationContextService(ConfigurationContextService contextService) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
index d7d8ef4..f4cfbd8 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
@@ -42,9 +42,13 @@ public class TenantEventSynchronizer implements Runnable {
@Override
public void run() {
+ sendCompleteTenantEvent();
+ }
+
+ public static synchronized void sendCompleteTenantEvent(){
try {
if (log.isDebugEnabled()) {
- log.debug(String.format("Publishing complete tenant event"));
+ log.debug("Publishing complete tenant event");
}
Tenant tenant;
List<Tenant> tenants = new ArrayList<Tenant>();
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
new file mode 100644
index 0000000..ed526a6
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.manager.messaging.receiver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.components.ApplicationSignUpHandler;
+import org.apache.stratos.manager.messaging.publisher.ApplicationSignUpEventPublisher;
+import org.apache.stratos.manager.messaging.publisher.synchronizer.TenantEventSynchronizer;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.listener.initializer.CompleteApplicationSignUpsRequestEventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteTenantRequestEventListener;
+import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
+
+import java.util.concurrent.ExecutorService;
+
+public class StratosManagerInitializerTopicReceiver {
+ private static final Log log = LogFactory.getLog(StratosManagerInitializerTopicReceiver.class);
+ private InitializerEventReceiver initializerEventReceiver;
+ private ExecutorService executorService;
+ private ApplicationSignUpHandler applicationSignUpHandler;
+
+ public StratosManagerInitializerTopicReceiver() {
+ this.initializerEventReceiver = new InitializerEventReceiver();
+ applicationSignUpHandler = new ApplicationSignUpHandler();
+ addEventListeners();
+ }
+
+ public void execute() {
+ initializerEventReceiver.setExecutorService(executorService);
+ initializerEventReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller initializer topic receiver started");
+ }
+ }
+
+ private void addEventListeners() {
+ initializerEventReceiver.addEventListener(new CompleteTenantRequestEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ if (log.isDebugEnabled()) {
+ log.debug("Handling CompleteTenantRequestEvent");
+ }
+ try {
+ TenantEventSynchronizer.sendCompleteTenantEvent();
+ } catch (Exception e) {
+ log.error("Failed to process CompleteTenantRequestEvent", e);
+ }
+ }
+ });
+
+ initializerEventReceiver.addEventListener(new CompleteApplicationSignUpsRequestEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ if (log.isDebugEnabled()) {
+ log.debug("Handling CompleteApplicationSignUpsRequestEvent");
+ }
+ try {
+ ApplicationSignUpEventPublisher
+ .publishCompleteApplicationSignUpsEvent(applicationSignUpHandler.getApplicationSignUps());
+ } catch (Exception e) {
+ log.error("Failed to process CompleteApplicationSignUpsRequestEvent", e);
+ }
+ }
+ });
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java
new file mode 100644
index 0000000..ba912fd
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.initializer;
+
+import java.io.Serializable;
+
+public class CompleteApplicationSignUpsRequestEvent extends InitializerEvent implements Serializable {
+ public CompleteApplicationSignUpsRequestEvent() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java
new file mode 100644
index 0000000..621b8a6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.initializer;
+
+import java.io.Serializable;
+
+public class CompleteApplicationsRequestEvent extends InitializerEvent implements Serializable {
+ public CompleteApplicationsRequestEvent() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java
new file mode 100644
index 0000000..32416a6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.initializer;
+
+import java.io.Serializable;
+
+public class CompleteTenantRequestEvent extends InitializerEvent implements Serializable {
+ public CompleteTenantRequestEvent() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java
new file mode 100644
index 0000000..257bf44
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.initializer;
+
+import java.io.Serializable;
+
+public class CompleteTopologyRequestEvent extends InitializerEvent implements Serializable {
+ public CompleteTopologyRequestEvent() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java
new file mode 100644
index 0000000..fa54bb2
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.initializer;
+
+import org.apache.stratos.messaging.event.Event;
+
+import java.io.Serializable;
+
+public class InitializerEvent extends Event implements Serializable {
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java
new file mode 100644
index 0000000..08e4c85
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.initializer;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class CompleteApplicationSignUpsRequestEventListener extends EventListener {
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java
new file mode 100644
index 0000000..f1da2da
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.initializer;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class CompleteApplicationsRequestEventListener extends EventListener {
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java
new file mode 100644
index 0000000..20bdf13
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.initializer;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class CompleteTenantRequestEventListener extends EventListener {
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java
new file mode 100644
index 0000000..cbac1c9
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.listener.initializer;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class CompleteTopologyRequestEventListener extends EventListener {
+
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java
index 8346d84..17922dd 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java
@@ -85,6 +85,9 @@ public class CompleteApplicationsMessageProcessor extends MessageProcessor {
log.debug("No Application information found in Complete Applications event");
}
}
+ if (log.isInfoEnabled()) {
+ log.info("Application topology initialized");
+ }
// Set topology initialized
applications.setInitialized(true);
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java
new file mode 100644
index 0000000..557de79
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.initializer;
+
+import org.apache.stratos.messaging.event.initializer.CompleteApplicationSignUpsRequestEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.MessagingUtil;
+
+public class CompleteApplicationSignUpsRequestMessageProcessor extends MessageProcessor {
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (CompleteApplicationSignUpsRequestEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ CompleteApplicationSignUpsRequestEvent event = (CompleteApplicationSignUpsRequestEvent) MessagingUtil
+ .jsonToObject(message, CompleteApplicationSignUpsRequestEvent.class);
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(String.format(
+ "Failed to process message using available message processors: [type] %s [body] %s", type,
+ message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java
new file mode 100644
index 0000000..ae80a68
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.initializer;
+
+import org.apache.stratos.messaging.event.initializer.CompleteApplicationsRequestEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.MessagingUtil;
+
+public class CompleteApplicationsRequestMessageProcessor extends MessageProcessor {
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (CompleteApplicationsRequestEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ CompleteApplicationsRequestEvent event = (CompleteApplicationsRequestEvent) MessagingUtil
+ .jsonToObject(message, CompleteApplicationsRequestEvent.class);
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(String.format(
+ "Failed to process message using available message processors: [type] %s [body] %s", type,
+ message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java
new file mode 100644
index 0000000..a6af9fd
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.initializer;
+
+import org.apache.stratos.messaging.event.initializer.CompleteTenantRequestEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.MessagingUtil;
+
+public class CompleteTenantRequestMessageProcessor extends MessageProcessor {
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (CompleteTenantRequestEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ CompleteTenantRequestEvent event = (CompleteTenantRequestEvent) MessagingUtil
+ .jsonToObject(message, CompleteTenantRequestEvent.class);
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(String.format(
+ "Failed to process message using available message processors: [type] %s [body] %s", type,
+ message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java
new file mode 100644
index 0000000..73e4bf7
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.initializer;
+
+import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.MessagingUtil;
+
+public class CompleteTopologyRequestMessageProcessor extends MessageProcessor {
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (CompleteTopologyRequestEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ CompleteTopologyRequestEvent event = (CompleteTopologyRequestEvent) MessagingUtil
+ .jsonToObject(message, CompleteTopologyRequestEvent.class);
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(String.format(
+ "Failed to process message using available message processors: [type] %s [body] %s", type,
+ message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java
new file mode 100644
index 0000000..f3e292f
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteApplicationSignUpsRequestEventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteApplicationsRequestEventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteTenantRequestEventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteTopologyRequestEventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+public class InitializerMessageProcessorChain extends MessageProcessorChain {
+ private static final Log log = LogFactory.getLog(InitializerMessageProcessorChain.class);
+ private CompleteTopologyRequestMessageProcessor completeTopologyRequestMessageProcessor;
+ private CompleteApplicationsRequestMessageProcessor completeApplicationsRequestMessageProcessor;
+ private CompleteTenantRequestMessageProcessor completeTenantRequestMessageProcessor;
+ private CompleteApplicationSignUpsRequestMessageProcessor completeApplicationSignUpsRequestMessageProcessor;
+
+ @Override
+ protected void initialize() {
+ completeTopologyRequestMessageProcessor = new CompleteTopologyRequestMessageProcessor();
+ add(completeTopologyRequestMessageProcessor);
+
+ completeApplicationsRequestMessageProcessor = new CompleteApplicationsRequestMessageProcessor();
+ add(completeApplicationsRequestMessageProcessor);
+
+ completeTenantRequestMessageProcessor = new CompleteTenantRequestMessageProcessor();
+ add(completeTenantRequestMessageProcessor);
+
+ completeApplicationSignUpsRequestMessageProcessor = new CompleteApplicationSignUpsRequestMessageProcessor();
+ add(completeApplicationSignUpsRequestMessageProcessor);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Initializer message processor chain initialized");
+ }
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener) {
+ if (eventListener instanceof CompleteTopologyRequestEventListener) {
+ completeTopologyRequestMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof CompleteApplicationsRequestEventListener) {
+ completeApplicationsRequestMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof CompleteTenantRequestEventListener) {
+ completeTenantRequestMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof CompleteApplicationSignUpsRequestEventListener) {
+ completeApplicationSignUpsRequestMessageProcessor.addEventListener(eventListener);
+ } else {
+ throw new RuntimeException("Unknown event listener");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
index 6b9085f..d7e7196 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
@@ -20,7 +20,10 @@ package org.apache.stratos.messaging.message.receiver.application;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
+import org.apache.stratos.messaging.event.initializer.CompleteApplicationsRequestEvent;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.util.MessagingUtil;
@@ -32,7 +35,6 @@ public class ApplicationsEventReceiver {
private ApplicationsEventMessageDelegator messageDelegator;
private ApplicationsEventMessageListener messageListener;
private EventSubscriber eventSubscriber;
- private boolean terminated;
private ExecutorService executorService;
public ApplicationsEventReceiver() {
@@ -45,11 +47,11 @@ public class ApplicationsEventReceiver {
messageDelegator.addEventListener(eventListener);
}
-
public void execute() {
try {
// Start topic subscriber thread
- eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(), messageListener);
+ eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(),
+ messageListener);
executorService.execute(eventSubscriber);
if (log.isDebugEnabled()) {
@@ -62,8 +64,7 @@ public class ApplicationsEventReceiver {
if (log.isDebugEnabled()) {
log.debug("Application status event message delegator thread started");
}
-
-
+ initializeCompleteApplicationsModel();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Application status failed", e);
@@ -74,7 +75,26 @@ public class ApplicationsEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
- terminated = true;
+ }
+
+ public void initializeCompleteApplicationsModel() {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ while (!eventSubscriber.isSubscribed()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ CompleteApplicationsRequestEvent completeApplicationsRequestEvent
+ = new CompleteApplicationsRequestEvent();
+ String topic = MessagingUtil.getMessageTopicName(completeApplicationsRequestEvent);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
+ eventPublisher.publish(completeApplicationsRequestEvent);
+ }
+ });
}
public ExecutorService getExecutorService() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index 7863ee4..55e3fd1 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -21,7 +21,10 @@ package org.apache.stratos.messaging.message.receiver.application.signup;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
+import org.apache.stratos.messaging.event.initializer.CompleteApplicationSignUpsRequestEvent;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.util.MessagingUtil;
@@ -49,15 +52,14 @@ public class ApplicationSignUpEventReceiver {
messageDelegator.addEventListener(eventListener);
}
-
public void execute() {
try {
// Start topic subscriber thread
- eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_SIGNUP_TOPIC.getTopicName(), messageListener);
+ eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_SIGNUP_TOPIC.getTopicName(),
+ messageListener);
// subscriber.setMessageListener(messageListener);
executorService.execute(eventSubscriber);
-
if (log.isDebugEnabled()) {
log.debug("Application signup event message receiver thread started");
}
@@ -68,7 +70,7 @@ public class ApplicationSignUpEventReceiver {
log.debug("Application signup event message delegator thread started");
}
-
+ initializeCompleteApplicationSignUps();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Application signup receiver failed", e);
@@ -76,6 +78,26 @@ public class ApplicationSignUpEventReceiver {
}
}
+ public void initializeCompleteApplicationSignUps() {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ while (!eventSubscriber.isSubscribed()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ CompleteApplicationSignUpsRequestEvent completeApplicationSignUpsRequestEvent
+ = new CompleteApplicationSignUpsRequestEvent();
+ String topic = MessagingUtil.getMessageTopicName(completeApplicationSignUpsRequestEvent);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
+ eventPublisher.publish(completeApplicationSignUpsRequestEvent);
+ }
+ });
+ }
+
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
new file mode 100644
index 0000000..ffd2ae4
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.Message;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.initializer.InitializerMessageProcessorChain;
+
+public class InitializerEventMessageDelegator implements Runnable {
+ private static final Log log = LogFactory.getLog(InitializerEventMessageDelegator.class);
+
+ private MessageProcessorChain processorChain;
+ private InitializerEventMessageQueue messageQueue;
+ private boolean terminated;
+
+ public InitializerEventMessageDelegator(InitializerEventMessageQueue initializerEventMessageQueue) {
+ this.messageQueue = initializerEventMessageQueue;
+ this.processorChain = new InitializerMessageProcessorChain();
+ }
+
+ public void addEventListener(EventListener eventListener) {
+ processorChain.addEventListener(eventListener);
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (log.isInfoEnabled()) {
+ log.info("Initializer event message delegator started");
+ }
+
+ while (!terminated) {
+ try {
+ Message message = messageQueue.take();
+ String type = message.getEventClassName();
+
+ // Retrieve the actual message
+ String json = message.getText();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Initializer event message [%s] received from queue: %s", type,
+ messageQueue.getClass()));
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Delegating initializer event message: %s", type));
+ }
+ processorChain.process(type, json, null);
+ } catch (InterruptedException ignore) {
+ log.info("Shutting down initializer event message delegator...");
+ terminate();
+ } catch (Exception e) {
+ log.error("Failed to retrieve initializer event message", e);
+ }
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Initializer event message delegator failed", e);
+ }
+ }
+ }
+
+ /**
+ * Terminate initializer event message delegator thread.
+ */
+ public void terminate() {
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java
new file mode 100644
index 0000000..bdfe875
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.MessageListener;
+import org.apache.stratos.messaging.domain.Message;
+
+public class InitializerEventMessageListener implements MessageListener {
+ private static final Log log = LogFactory.getLog(InitializerEventMessageListener.class);
+
+ private final InitializerEventMessageQueue messageQueue;
+
+ public InitializerEventMessageListener(InitializerEventMessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+ @Override
+ public void messageReceived(Message message) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Initializer event message received: %s", message.getText()));
+ }
+ // Add received message to the queue
+ messageQueue.add(message);
+
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java
new file mode 100644
index 0000000..326c5b8
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.initializer;
+
+import org.apache.stratos.messaging.domain.Message;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class InitializerEventMessageQueue extends LinkedBlockingQueue<Message> {
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
new file mode 100644
index 0000000..90d358c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.MessagingUtil;
+
+import java.util.concurrent.ExecutorService;
+
+public class InitializerEventReceiver {
+ private static final Log log = LogFactory.getLog(InitializerEventReceiver.class);
+
+ private InitializerEventMessageDelegator messageDelegator;
+ private InitializerEventMessageListener messageListener;
+ private EventSubscriber eventSubscriber;
+ private ExecutorService executorService;
+
+ public InitializerEventReceiver() {
+ InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue();
+ this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue);
+ this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue);
+ }
+
+ public void addEventListener(EventListener eventListener) {
+ messageDelegator.addEventListener(eventListener);
+ }
+
+ public void execute() {
+ try {
+ // Start topic subscriber thread
+ eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INITIALIZER_TOPIC.getTopicName(),
+ messageListener);
+ executorService.execute(eventSubscriber);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Initializer event message delegator thread started");
+ }
+ // Start initializer event message delegator thread
+ executorService.execute(messageDelegator);
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Initializer receiver failed", e);
+ }
+ }
+ }
+
+ public void terminate() {
+ eventSubscriber.terminate();
+ messageDelegator.terminate();
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index 1d0529a..988a2ce 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -21,7 +21,10 @@ package org.apache.stratos.messaging.message.receiver.tenant;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
+import org.apache.stratos.messaging.event.initializer.CompleteTenantRequestEvent;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.util.MessagingUtil;
@@ -68,7 +71,7 @@ public class TenantEventReceiver {
log.debug("Tenant event message delegator thread started");
}
-
+ initializeCompleteTenant();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Tenant receiver failed", e);
@@ -76,6 +79,25 @@ public class TenantEventReceiver {
}
}
+ public void initializeCompleteTenant() {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ while (!eventSubscriber.isSubscribed()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ CompleteTenantRequestEvent completeTenantRequestEvent = new CompleteTenantRequestEvent();
+ String topic = MessagingUtil.getMessageTopicName(completeTenantRequestEvent);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
+ eventPublisher.publish(completeTenantRequestEvent);
+ }
+ });
+ }
+
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index 58f2c36..b841d0a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -21,7 +21,10 @@ package org.apache.stratos.messaging.message.receiver.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
+import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.util.MessagingUtil;
@@ -50,15 +53,12 @@ public class TopologyEventReceiver {
messageDelegator.addEventListener(eventListener);
}
-
public void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new EventSubscriber(MessagingUtil.Topics.TOPOLOGY_TOPIC.getTopicName(), messageListener);
- // subscriber.setMessageListener(messageListener);
executorService.execute(eventSubscriber);
-
if (log.isDebugEnabled()) {
log.debug("Topology event message receiver thread started");
}
@@ -69,7 +69,7 @@ public class TopologyEventReceiver {
log.debug("Topology event message delegator thread started");
}
-
+ initializeCompleteTopology();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Topology receiver failed", e);
@@ -82,6 +82,25 @@ public class TopologyEventReceiver {
messageDelegator.terminate();
}
+ public void initializeCompleteTopology() {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ while (!eventSubscriber.isSubscribed()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ CompleteTopologyRequestEvent completeTopologyRequestEvent = new CompleteTopologyRequestEvent();
+ String topic = MessagingUtil.getMessageTopicName(completeTopologyRequestEvent);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
+ eventPublisher.publish(completeTopologyRequestEvent);
+ }
+ });
+ }
+
public ExecutorService getExecutorService() {
return executorService;
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java
index c4bfb9d..b3efbfb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java
@@ -63,6 +63,7 @@ public class MessagingUtil {
*/
public static enum Topics {
TOPOLOGY_TOPIC("topology/#"),
+ INITIALIZER_TOPIC("initializer/#"),
HEALTH_STAT_TOPIC("summarized-health-stats/#"),
INSTANCE_STATUS_TOPIC("instance/status/#"),
INSTANCE_NOTIFIER_TOPIC("instance/notifier/#"),
@@ -248,4 +249,4 @@ public class MessagingUtil {
return UUID.randomUUID().toString().replace(HYPHEN_MINUS, EMPTY_SPACE).substring(START_INDEX, len);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
index 54a7421..9c1159c 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
@@ -18,18 +18,15 @@
import threading
-from subscriber import EventSubscriber
import publisher
-from modules.event.instance.notifier.events import *
-from modules.event.tenant.events import *
-from modules.event.topology.events import *
+from logpublisher import *
from modules.event.application.signup.events import *
from modules.event.domain.mapping.events import *
-from entity import *
-from logpublisher import *
-from config import Config
from modules.event.eventhandler import EventHandler
-import constants
+from modules.event.instance.notifier.events import *
+from modules.event.tenant.events import *
+from modules.event.topology.events import *
+from subscriber import EventSubscriber
class CartridgeAgent(threading.Thread):
@@ -71,6 +68,9 @@ class CartridgeAgent(threading.Thread):
else:
self.__event_handler.create_dummy_interface()
+ # request complete topology event from CC by publishing CompleteTopologyRequestEvent
+ publisher.publish_complete_topology_request_event()
+
# wait until complete topology message is received to get LB IP
self.wait_for_complete_topology()
@@ -88,6 +88,9 @@ class CartridgeAgent(threading.Thread):
# start application signup event listener
self.register_application_signup_event_listeners()
+ # request complete tenant event from CC by publishing CompleteTenantRequestEvent
+ publisher.publish_complete_tenant_request_event()
+
# Execute instance started shell script
self.__event_handler.on_instance_started_event()
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
index 301cd47..8c7a7b0 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
@@ -82,6 +82,7 @@ TENANT_REPO_PATH = "tenant.repository.path"
INSTANCE_NOTIFIER_TOPIC = "instance/#"
HEALTH_STAT_TOPIC = "health/#"
TOPOLOGY_TOPIC = "topology/#"
+INITIALIZER_TOPIC = "initializer/"
TENANT_TOPIC = "tenant/#"
INSTANCE_STATUS_TOPIC = "instance/status/"
APPLICATION_SIGNUP = "application/signup/#"
@@ -98,6 +99,8 @@ INSTANCE_READY_TO_SHUTDOWN_EVENT = "InstanceReadyToShutdownEvent"
INSTANCE_CLEANUP_CLUSTER_EVENT = "InstanceCleanupClusterEvent"
INSTANCE_CLEANUP_MEMBER_EVENT = "InstanceCleanupMemberEvent"
COMPLETE_TOPOLOGY_EVENT = "CompleteTopologyEvent"
+COMPLETE_TOPOLOGY_REQUEST_EVENT = "CompleteTopologyRequestEvent"
+COMPLETE_TENANT_REQUEST_EVENT = "CompleteTenantRequestEvent"
COMPLETE_TENANT_EVENT = "CompleteTenantEvent"
DOMAIN_MAPPING_ADDED_EVENT = "DomainMappingAddedEvent"
DOMAIN_MAPPING_REMOVED_EVENT = "DomainMappingRemovedEvent"
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py
index db35987..f14e4af 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py
@@ -108,6 +108,22 @@ class InstanceReadyToShutdownEvent:
return to_json(self)
+class CompleteTopologyRequestEvent:
+ def __init__(self):
+ pass
+
+ def to_json(self):
+ return to_json(self)
+
+
+class CompleteTenantRequestEvent:
+ def __init__(self):
+ pass
+
+ def to_json(self):
+ return to_json(self)
+
+
def to_json(instance):
"""
common function to serialize status event object
@@ -115,4 +131,4 @@ def to_json(instance):
:return: serialized json string
:rtype str
"""
- return json.dumps(instance, default=lambda o: o.__dict__, sort_keys=True, indent=4)
\ No newline at end of file
+ return json.dumps(instance, default=lambda o: o.__dict__, sort_keys=True, indent=4)
[3/3] stratos git commit: Closing STRATOS-1544, STRATOS-1612,
STRATOS-1611: topology, tenant, application model initialize optimization
Posted by ra...@apache.org.
Closing STRATOS-1544, STRATOS-1612, STRATOS-1611: topology, tenant, application model initialize optimization
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/60b80114
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/60b80114
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/60b80114
Branch: refs/heads/stratos-4.1.x
Commit: 60b801144dcb05e3664353c412bc1c3fffd7c55c
Parents: 00f624b
Author: Akila Perera <ra...@gmail.com>
Authored: Wed Nov 11 15:15:12 2015 +0530
Committer: Akila Perera <ra...@gmail.com>
Committed: Wed Nov 11 15:15:12 2015 +0530
----------------------------------------------------------------------
.../ApplicationEventSynchronizer.java | 9 +-
.../applications/topic/ApplicationBuilder.java | 13 --
.../topic/ApplicationsEventPublisher.java | 11 +-
.../AutoscalerInitializerTopicReceiver.java | 72 ++++++++++
.../AutoscalerTopologyEventReceiver.java | 5 +-
.../internal/AutoscalerServiceComponent.java | 81 +++++------
.../CloudControllerServiceComponent.java | 51 +++----
.../publisher/TopologyEventPublisher.java | 15 +-
.../publisher/TopologyEventSynchronizer.java | 8 +-
.../initializer/InitializerTopicReceiver.java | 72 ++++++++++
.../messaging/topology/TopologyBuilder.java | 144 +++++++++----------
.../messaging/topology/TopologyHolder.java | 118 +++++++++++++++
.../messaging/topology/TopologyManager.java | 118 ---------------
.../impl/CloudControllerServiceImpl.java | 12 +-
.../StratosManagerServiceComponent.java | 95 ++++++------
.../synchronizer/TenantEventSynchronizer.java | 6 +-
.../StratosManagerInitializerTopicReceiver.java | 91 ++++++++++++
.../CompleteApplicationSignUpsRequestEvent.java | 26 ++++
.../CompleteApplicationsRequestEvent.java | 26 ++++
.../initializer/CompleteTenantRequestEvent.java | 26 ++++
.../CompleteTopologyRequestEvent.java | 27 ++++
.../event/initializer/InitializerEvent.java | 26 ++++
...eApplicationSignUpsRequestEventListener.java | 24 ++++
...ompleteApplicationsRequestEventListener.java | 24 ++++
.../CompleteTenantRequestEventListener.java | 24 ++++
.../CompleteTopologyRequestEventListener.java | 26 ++++
.../CompleteApplicationsMessageProcessor.java | 3 +
...plicationSignUpsRequestMessageProcessor.java | 54 +++++++
...leteApplicationsRequestMessageProcessor.java | 54 +++++++
.../CompleteTenantRequestMessageProcessor.java | 53 +++++++
...CompleteTopologyRequestMessageProcessor.java | 54 +++++++
.../InitializerMessageProcessorChain.java | 70 +++++++++
.../application/ApplicationsEventReceiver.java | 32 ++++-
.../signup/ApplicationSignUpEventReceiver.java | 30 +++-
.../InitializerEventMessageDelegator.java | 88 ++++++++++++
.../InitializerEventMessageListener.java | 48 +++++++
.../InitializerEventMessageQueue.java | 26 ++++
.../initializer/InitializerEventReceiver.java | 78 ++++++++++
.../receiver/tenant/TenantEventReceiver.java | 24 +++-
.../topology/TopologyEventReceiver.java | 27 +++-
.../stratos/messaging/util/MessagingUtil.java | 3 +-
.../cartridge.agent/cartridge.agent/agent.py | 19 +--
.../cartridge.agent/constants.py | 3 +
.../modules/event/instance/status/events.py | 18 ++-
.../cartridge.agent/publisher.py | 22 ++-
.../integration/tests/AgentStartupTestCase.java | 83 +++++++----
.../tests/PythonAgentIntegrationTest.java | 56 ++++----
.../integration/common/TopologyHandler.java | 111 +++++++++++---
.../src/test/resources/common/log4j.properties | 2 +-
49 files changed, 1648 insertions(+), 460 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
index fc7a528..562a6cb 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
@@ -21,7 +21,7 @@ package org.apache.stratos.autoscaler.applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder;
+import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
public class ApplicationEventSynchronizer implements Runnable {
@@ -30,11 +30,8 @@ public class ApplicationEventSynchronizer implements Runnable {
@Override
public void run() {
if (log.isDebugEnabled()) {
- log.debug("Executing topology synchronization task");
- }
- // publish to the topic
- if (ApplicationHolder.getApplications() != null) {
- ApplicationBuilder.handleCompleteApplication(ApplicationHolder.getApplications());
+ log.debug("Executing applications synchronization task");
}
+ ApplicationsEventPublisher.sendCompleteApplicationsEvent(ApplicationHolder.getApplications());
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
index 165c4f8..5fd4d5a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
@@ -57,19 +57,6 @@ import java.util.Set;
public class ApplicationBuilder {
private static final Log log = LogFactory.getLog(ApplicationBuilder.class);
- public static synchronized void handleCompleteApplication(Applications applications) {
- if (log.isDebugEnabled()) {
- log.debug("Handling complete application event");
- }
-
- try {
- ApplicationHolder.acquireReadLock();
- ApplicationsEventPublisher.sendCompleteApplicationsEvent(applications);
- } finally {
- ApplicationHolder.releaseReadLock();
- }
- }
-
/**
* Create application clusters in cloud controller and send application created event.
*
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
index f66e525..2ec6e78 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
@@ -20,6 +20,7 @@ package org.apache.stratos.autoscaler.applications.topic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.ApplicationHolder;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.application.Application;
@@ -40,7 +41,15 @@ public class ApplicationsEventPublisher {
private static final Log log = LogFactory.getLog(ApplicationsEventPublisher.class);
public static void sendCompleteApplicationsEvent(Applications completeApplications) {
- publishEvent(new CompleteApplicationsEvent(completeApplications));
+ ApplicationHolder.acquireReadLock();
+ try{
+ if (log.isDebugEnabled()) {
+ log.debug("Publishing complete applications event...");
+ }
+ publishEvent(new CompleteApplicationsEvent(completeApplications));
+ }finally {
+ ApplicationHolder.releaseReadLock();
+ }
}
public static void sendApplicationCreatedEvent(Application application) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
new file mode 100644
index 0000000..da6b270
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.event.receiver.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.ApplicationHolder;
+import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.listener.initializer.CompleteApplicationsRequestEventListener;
+import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
+
+import java.util.concurrent.ExecutorService;
+
+public class AutoscalerInitializerTopicReceiver {
+ private static final Log log = LogFactory.getLog(AutoscalerInitializerTopicReceiver.class);
+ private InitializerEventReceiver initializerEventReceiver;
+ private ExecutorService executorService;
+
+ public AutoscalerInitializerTopicReceiver() {
+ this.initializerEventReceiver = new InitializerEventReceiver();
+ addEventListeners();
+ }
+
+ public void execute() {
+ initializerEventReceiver.setExecutorService(executorService);
+ initializerEventReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller initializer topic receiver started");
+ }
+ }
+
+ private void addEventListeners() {
+ initializerEventReceiver.addEventListener(new CompleteApplicationsRequestEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ if (log.isDebugEnabled()) {
+ log.debug("Handling CompleteApplicationsRequestEvent");
+ }
+ try {
+ ApplicationsEventPublisher.sendCompleteApplicationsEvent(ApplicationHolder.getApplications());
+ } catch (Exception e) {
+ log.error("Failed to process CompleteApplicationsRequestEvent", e);
+ }
+ }
+ });
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index ef31b97..500b95a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -35,6 +35,8 @@ import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
import org.apache.stratos.autoscaler.util.AutoscalerUtil;
import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.application.Application;
import org.apache.stratos.messaging.domain.application.Applications;
import org.apache.stratos.messaging.domain.instance.ClusterInstance;
@@ -42,10 +44,12 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.ClusterStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent;
import org.apache.stratos.messaging.event.topology.*;
import org.apache.stratos.messaging.listener.topology.*;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.util.MessagingUtil;
import java.util.concurrent.ExecutorService;
@@ -75,7 +79,6 @@ public class AutoscalerTopologyEventReceiver {
if (log.isInfoEnabled()) {
log.info("Autoscaler topology receiver thread started");
}
-
}
private void addEventListeners() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index 48ee481..5011dd2 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -26,6 +26,7 @@ import org.apache.stratos.autoscaler.algorithms.networkpartition.NetworkPartitio
import org.apache.stratos.autoscaler.applications.ApplicationEventSynchronizer;
import org.apache.stratos.autoscaler.context.AutoscalerContext;
import org.apache.stratos.autoscaler.event.receiver.health.AutoscalerHealthStatEventReceiver;
+import org.apache.stratos.autoscaler.event.receiver.initializer.AutoscalerInitializerTopicReceiver;
import org.apache.stratos.autoscaler.event.receiver.topology.AutoscalerTopologyEventReceiver;
import org.apache.stratos.autoscaler.exception.AutoScalerException;
import org.apache.stratos.autoscaler.exception.AutoScalingPolicyAlreadyExistException;
@@ -44,7 +45,6 @@ import org.apache.stratos.autoscaler.util.AutoscalerUtil;
import org.apache.stratos.autoscaler.util.ConfUtil;
import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
import org.apache.stratos.common.Component;
-import org.apache.stratos.common.services.ComponentActivationEventListener;
import org.apache.stratos.common.services.ComponentStartUpSynchronizer;
import org.apache.stratos.common.services.DistributedObjectProvider;
import org.apache.stratos.common.threading.StratosThreadPool;
@@ -68,9 +68,11 @@ import java.util.concurrent.TimeUnit;
* cardinality="1..1" policy="dynamic" bind="setTaskService" unbind="unsetTaskService"
* @scr.reference name="hazelcast.instance.service" interface="com.hazelcast.core.HazelcastInstance"
* cardinality="0..1"policy="dynamic" bind="setHazelcastInstance" unbind="unsetHazelcastInstance"
- * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.services.DistributedObjectProvider"
+ * @scr.reference name="distributedObjectProvider"
+ * interface="org.apache.stratos.common.services.DistributedObjectProvider"
* cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
- * @scr.reference name="componentStartUpSynchronizer" interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
+ * @scr.reference name="componentStartUpSynchronizer"
+ * interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
* cardinality="1..1" policy="dynamic" bind="setComponentStartUpSynchronizer" unbind="unsetComponentStartUpSynchronizer"
* @scr.reference name="config.context.service" interface="org.wso2.carbon.utils.ConfigurationContextService"
* cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
@@ -81,6 +83,7 @@ public class AutoscalerServiceComponent {
private static final String AUTOSCALER_COORDINATOR_LOCK = "AUTOSCALER_COORDINATOR_LOCK";
private AutoscalerTopologyEventReceiver asTopologyReceiver;
private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
+ private AutoscalerInitializerTopicReceiver autoscalerInitializerTopicReceiver;
private ExecutorService executorService;
private ScheduledExecutorService scheduler;
@@ -90,25 +93,25 @@ public class AutoscalerServiceComponent {
}
try {
XMLConfiguration conf = ConfUtil.getInstance(AutoscalerConstants.COMPONENTS_CONFIG).getConfiguration();
- int threadPoolSize = conf.getInt(AutoscalerConstants.THREAD_POOL_SIZE_KEY,
- AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE);
- executorService = StratosThreadPool.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID,
- threadPoolSize);
+ int threadPoolSize = conf
+ .getInt(AutoscalerConstants.THREAD_POOL_SIZE_KEY, AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE);
+ executorService = StratosThreadPool
+ .getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID, threadPoolSize);
int schedulerThreadPoolSize = conf.getInt(AutoscalerConstants.SCHEDULER_THREAD_POOL_SIZE_KEY,
AutoscalerConstants.AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE);
- scheduler = StratosThreadPool.getScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID,
- schedulerThreadPoolSize);
+ scheduler = StratosThreadPool
+ .getScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID, schedulerThreadPoolSize);
Runnable autoscalerActivator = new Runnable() {
@Override
public void run() {
try {
- ComponentStartUpSynchronizer componentStartUpSynchronizer =
- ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
+ ComponentStartUpSynchronizer componentStartUpSynchronizer = ServiceReferenceHolder.getInstance()
+ .getComponentStartUpSynchronizer();
// Wait for cloud controller component to be activated
- componentStartUpSynchronizer.waitForComponentActivation(Component.Autoscaler,
- Component.CloudController);
+ componentStartUpSynchronizer
+ .waitForComponentActivation(Component.Autoscaler, Component.CloudController);
ServiceReferenceHolder.getInstance().setExecutorService(executorService);
@@ -119,8 +122,8 @@ public class AutoscalerServiceComponent {
ServiceReferenceHolder.getInstance().getHazelcastInstance()
.getLock(AUTOSCALER_COORDINATOR_LOCK).lock();
- log.info("Elected this member [" + ServiceReferenceHolder.getInstance().getHazelcastInstance()
- .getCluster().getLocalMember().getUuid() + "] " +
+ log.info("Elected this member [" + ServiceReferenceHolder.getInstance()
+ .getHazelcastInstance().getCluster().getLocalMember().getUuid() + "] " +
"as the autoscaler coordinator for the cluster");
AutoscalerContext.getInstance().setCoordinator(true);
@@ -136,8 +139,8 @@ public class AutoscalerServiceComponent {
} else {
executeCoordinatorTasks();
}
- componentStartUpSynchronizer.waitForAxisServiceActivation(Component.Autoscaler,
- "AutoscalerService");
+ componentStartUpSynchronizer
+ .waitForAxisServiceActivation(Component.Autoscaler, "AutoscalerService");
componentStartUpSynchronizer.setComponentStatus(Component.Autoscaler, true);
if (log.isInfoEnabled()) {
log.info("Autoscaler service component activated");
@@ -154,14 +157,14 @@ public class AutoscalerServiceComponent {
}
}
- private void executeCoordinatorTasks() throws InvalidPolicyException,
- InvalidDeploymentPolicyException, InvalidApplicationPolicyException, AutoScalingPolicyAlreadyExistException {
+ private void executeCoordinatorTasks()
+ throws InvalidPolicyException, InvalidDeploymentPolicyException, InvalidApplicationPolicyException,
+ AutoScalingPolicyAlreadyExistException {
// Start topology receiver
asTopologyReceiver = new AutoscalerTopologyEventReceiver();
asTopologyReceiver.setExecutorService(executorService);
asTopologyReceiver.execute();
-
if (log.isDebugEnabled()) {
log.debug("Topology receiver executor service started");
}
@@ -174,6 +177,14 @@ public class AutoscalerServiceComponent {
log.debug("Health statistics receiver thread started");
}
+ // Start initializer receiver
+ autoscalerInitializerTopicReceiver = new AutoscalerInitializerTopicReceiver();
+ autoscalerInitializerTopicReceiver.setExecutorService(executorService);
+ autoscalerInitializerTopicReceiver.execute();
+ if (log.isDebugEnabled()) {
+ log.debug("Initializer receiver thread started");
+ }
+
// Add AS policies to information model
List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies();
Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator();
@@ -191,7 +202,6 @@ public class AutoscalerServiceComponent {
PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
}
-
// Add application policies to information model
List<ApplicationPolicy> applicationPolicies = RegistryManager.getInstance().
retrieveApplicationPolicies();
@@ -202,9 +212,10 @@ public class AutoscalerServiceComponent {
}
// Add application policies to information model
- List<NetworkPartitionAlgorithmContext> networkPartitionAlgorithmContexts =
- RegistryManager.getInstance().retrieveNetworkPartitionAlgorithmContexts();
- Iterator<NetworkPartitionAlgorithmContext> networkPartitionAlgoCtxtIterator = networkPartitionAlgorithmContexts.iterator();
+ List<NetworkPartitionAlgorithmContext> networkPartitionAlgorithmContexts = RegistryManager.getInstance()
+ .retrieveNetworkPartitionAlgorithmContexts();
+ Iterator<NetworkPartitionAlgorithmContext> networkPartitionAlgoCtxtIterator = networkPartitionAlgorithmContexts
+ .iterator();
while (networkPartitionAlgoCtxtIterator.hasNext()) {
NetworkPartitionAlgorithmContext algorithmContext = networkPartitionAlgoCtxtIterator.next();
AutoscalerContext.getInstance().addNetworkPartitionAlgorithmContext(algorithmContext);
@@ -223,24 +234,6 @@ public class AutoscalerServiceComponent {
if (log.isInfoEnabled()) {
log.info("Scheduling tasks to publish applications");
}
-
- ComponentStartUpSynchronizer componentStartUpSynchronizer =
- ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
- if (componentStartUpSynchronizer.isEnabled()) {
- componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() {
- @Override
- public void activated(Component component) {
- if (component == Component.StratosManager) {
- scheduleEventSynchronizers();
- }
- }
- });
- } else {
- scheduleEventSynchronizers();
- }
- }
-
- private void scheduleEventSynchronizers() {
Runnable applicationSynchronizer = new ApplicationEventSynchronizer();
scheduler.scheduleAtFixedRate(applicationSynchronizer, 0, 1, TimeUnit.MINUTES);
}
@@ -332,8 +325,8 @@ public class AutoscalerServiceComponent {
}
protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) {
- ServiceReferenceHolder.getInstance().setAxisConfiguration(
- cfgCtxService.getServerConfigContext().getAxisConfiguration());
+ ServiceReferenceHolder.getInstance()
+ .setAxisConfiguration(cfgCtxService.getServerConfigContext().getAxisConfiguration());
}
protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 2e21e8e..808ac5c 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -29,11 +29,11 @@ import org.apache.stratos.cloud.controller.exception.CloudControllerException;
import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventSynchronizer;
import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationEventReceiver;
import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
+import org.apache.stratos.cloud.controller.messaging.receiver.initializer.InitializerTopicReceiver;
import org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver;
import org.apache.stratos.cloud.controller.services.CloudControllerService;
import org.apache.stratos.cloud.controller.services.impl.CloudControllerServiceImpl;
import org.apache.stratos.common.Component;
-import org.apache.stratos.common.services.ComponentActivationEventListener;
import org.apache.stratos.common.services.ComponentStartUpSynchronizer;
import org.apache.stratos.common.services.DistributedObjectProvider;
import org.apache.stratos.common.threading.StratosThreadPool;
@@ -57,9 +57,11 @@ import java.util.concurrent.TimeUnit;
* @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
* @scr.reference name="hazelcast.instance.service" interface="com.hazelcast.core.HazelcastInstance"
* cardinality="0..1"policy="dynamic" bind="setHazelcastInstance" unbind="unsetHazelcastInstance"
- * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.services.DistributedObjectProvider"
+ * @scr.reference name="distributedObjectProvider"
+ * interface="org.apache.stratos.common.services.DistributedObjectProvider"
* cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
- * @scr.reference name="componentStartUpSynchronizer" interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
+ * @scr.reference name="componentStartUpSynchronizer"
+ * interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
* cardinality="1..1" policy="dynamic" bind="setComponentStartUpSynchronizer" unbind="unsetComponentStartUpSynchronizer"
* @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
* cardinality="1..1" policy="dynamic" bind="setTaskService" unbind="unsetTaskService"
@@ -79,6 +81,7 @@ public class CloudControllerServiceComponent {
private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
private ApplicationEventReceiver applicationEventReceiver;
+ private InitializerTopicReceiver initializerTopicReceiver;
private ExecutorService executorService;
private ScheduledExecutorService scheduler;
@@ -88,15 +91,15 @@ public class CloudControllerServiceComponent {
}
try {
executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE);
- scheduler = StratosThreadPool.getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID,
- SCHEDULER_THREAD_POOL_SIZE);
+ scheduler = StratosThreadPool
+ .getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, SCHEDULER_THREAD_POOL_SIZE);
Runnable cloudControllerActivator = new Runnable() {
@Override
public void run() {
try {
- ComponentStartUpSynchronizer componentStartUpSynchronizer =
- ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
+ ComponentStartUpSynchronizer componentStartUpSynchronizer = ServiceReferenceHolder.getInstance()
+ .getComponentStartUpSynchronizer();
// Register cloud controller service
BundleContext bundleContext = context.getBundleContext();
@@ -125,8 +128,8 @@ public class CloudControllerServiceComponent {
executeCoordinatorTasks();
}
- componentStartUpSynchronizer.waitForAxisServiceActivation(Component.CloudController,
- "CloudControllerService");
+ componentStartUpSynchronizer
+ .waitForAxisServiceActivation(Component.CloudController, "CloudControllerService");
componentStartUpSynchronizer.setComponentStatus(Component.CloudController, true);
log.info("Cloud controller service component activated");
} catch (Exception e) {
@@ -166,27 +169,17 @@ public class CloudControllerServiceComponent {
log.info("Instance status event receiver thread started");
}
+ initializerTopicReceiver = new InitializerTopicReceiver();
+ initializerTopicReceiver.setExecutorService(executorService);
+ initializerTopicReceiver.execute();
+
if (log.isInfoEnabled()) {
- log.info("Scheduling topology synchronizer task");
+ log.info("Initializer event receiver thread started");
}
- ComponentStartUpSynchronizer componentStartUpSynchronizer =
- ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
- if (componentStartUpSynchronizer.isEnabled()) {
- componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() {
- @Override
- public void activated(Component component) {
- if (component == Component.StratosManager) {
- scheduleEventSynchronizers();
- }
- }
- });
- } else {
- scheduleEventSynchronizers();
+ if (log.isInfoEnabled()) {
+ log.info("Scheduling topology synchronizer task");
}
- }
-
- private void scheduleEventSynchronizers() {
Runnable topologySynchronizer = new TopologyEventSynchronizer();
scheduler.scheduleAtFixedRate(topologySynchronizer, 0, 1, TimeUnit.MINUTES);
}
@@ -228,8 +221,8 @@ public class CloudControllerServiceComponent {
}
protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) {
- ServiceReferenceHolder.getInstance().setAxisConfiguration(
- cfgCtxService.getServerConfigContext().getAxisConfiguration());
+ ServiceReferenceHolder.getInstance()
+ .setAxisConfiguration(cfgCtxService.getServerConfigContext().getAxisConfiguration());
}
protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
@@ -296,4 +289,4 @@ public class CloudControllerServiceComponent {
log.warn("An error occurred while shutting down executor service", e);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java
index b55d3a2..12f7685 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java
@@ -25,6 +25,7 @@ import org.apache.stratos.cloud.controller.domain.Cartridge;
import org.apache.stratos.cloud.controller.domain.ClusterContext;
import org.apache.stratos.cloud.controller.domain.MemberContext;
import org.apache.stratos.cloud.controller.domain.PortMapping;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder;
import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
@@ -314,12 +315,16 @@ public class TopologyEventPublisher {
}
public static void sendCompleteTopologyEvent(Topology topology) {
- CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Publishing complete topology event"));
+ TopologyHolder.acquireReadLock();
+ try {
+ CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
+ if (log.isDebugEnabled()) {
+ log.debug("Publishing complete topology event...");
+ }
+ publishEvent(completeTopologyEvent);
+ } finally {
+ TopologyHolder.releaseReadLock();
}
- publishEvent(completeTopologyEvent);
}
public static void sendClusterTerminatingEvent(ClusterInstanceTerminatingEvent clusterTerminatingEvent) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
index 832cae9..fcfd965 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
import org.apache.stratos.cloud.controller.context.CloudControllerContext;
-import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder;
/**
* Topology event synchronizer publishes complete topology event periodically.
@@ -53,10 +53,8 @@ public class TopologyEventSynchronizer implements Runnable {
try {
// Publish complete topology event
- if (TopologyManager.getTopology() != null) {
- CloudControllerContext.getInstance().setTopologySyncRunning(true);
- TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology());
- }
+ CloudControllerContext.getInstance().setTopologySyncRunning(true);
+ TopologyEventPublisher.sendCompleteTopologyEvent(TopologyHolder.getTopology());
} finally {
CloudControllerContext.getInstance().setTopologySyncRunning(false);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
new file mode 100644
index 0000000..0f8538c
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.messaging.receiver.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPublisher;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.listener.initializer.CompleteTopologyRequestEventListener;
+import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
+
+import java.util.concurrent.ExecutorService;
+
+public class InitializerTopicReceiver {
+ private static final Log log = LogFactory.getLog(InitializerTopicReceiver.class);
+ private InitializerEventReceiver initializerEventReceiver;
+ private ExecutorService executorService;
+
+ public InitializerTopicReceiver() {
+ this.initializerEventReceiver = new InitializerEventReceiver();
+ addEventListeners();
+ }
+
+ public void execute() {
+ initializerEventReceiver.setExecutorService(executorService);
+ initializerEventReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller initializer topic receiver started");
+ }
+ }
+
+ private void addEventListeners() {
+ initializerEventReceiver.addEventListener(new CompleteTopologyRequestEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ if (log.isDebugEnabled()) {
+ log.debug("Handling CompleteTopologyRequestEvent");
+ }
+ try {
+ TopologyEventPublisher.sendCompleteTopologyEvent(TopologyHolder.getTopology());
+ } catch (Exception e) {
+ log.error("Failed to process CompleteTopologyRequestEvent", e);
+ }
+ }
+ });
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index 09670e0..da38337 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -60,12 +60,12 @@ public class TopologyBuilder {
public static void handleServiceCreated(List<Cartridge> cartridgeList) throws RegistryException {
Service service;
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
if (cartridgeList == null) {
throw new RuntimeException("Cartridge list is empty");
}
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
for (Cartridge cartridge : cartridgeList) {
if (!topology.serviceExists(cartridge.getType())) {
ServiceType serviceType = cartridge.isMultiTenant() ?
@@ -104,17 +104,17 @@ public class TopologyBuilder {
}
}
topology.addService(service);
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
}
}
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
TopologyEventPublisher.sendServiceCreateEvent(cartridgeList);
}
public static void handleServiceRemoved(List<Cartridge> cartridgeList) throws RegistryException {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
for (Cartridge cartridge : cartridgeList) {
Service service = topology.getService(cartridge.getType());
if (service == null) {
@@ -122,11 +122,11 @@ public class TopologyBuilder {
}
if (service.getClusters().size() == 0) {
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
topology.removeService(cartridge.getType());
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList);
} else {
@@ -138,9 +138,9 @@ public class TopologyBuilder {
public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters)
throws RegistryException {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
try {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
for (Cluster cluster : appClusters) {
Service service = topology.getService(cluster.getServiceName());
if (service == null) {
@@ -150,9 +150,9 @@ public class TopologyBuilder {
service.addCluster(cluster);
log.info("Cluster created: [cluster] " + cluster.getClusterId());
}
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
log.debug("Creating cluster port mappings: [application-id] " + appId);
@@ -184,10 +184,10 @@ public class TopologyBuilder {
public static void handleApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusterData)
throws RegistryException {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
CloudControllerContext context = CloudControllerContext.getInstance();
try {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
if (clusterData != null) {
// remove clusters from CC topology model and remove runtime information
@@ -208,9 +208,9 @@ public class TopologyBuilder {
} else {
log.info("No cluster data found for application " + appId + " to remove");
}
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
// Remove cluster port mappings of application
@@ -220,9 +220,9 @@ public class TopologyBuilder {
}
public static void handleClusterReset(ClusterStatusClusterResetEvent event) throws RegistryException {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
try {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(event.getServiceName());
if (service == null) {
throw new RuntimeException("Service " + event.getServiceName() +
@@ -246,7 +246,7 @@ public class TopologyBuilder {
if (context.isStateTransitionValid(status)) {
context.setStatus(status);
log.info("Cluster Created adding status started for" + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
//publishing data
TopologyEventPublisher
.sendClusterResetEvent(event.getAppId(), event.getServiceName(), event.getClusterId(),
@@ -258,16 +258,16 @@ public class TopologyBuilder {
}
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
}
public static void handleClusterInstanceCreated(String serviceType, String clusterId, String alias,
String instanceId, String partitionId, String networkPartitionId) throws RegistryException {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
try {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(serviceType);
if (service == null) {
throw new RuntimeException("Service " + serviceType +
@@ -286,18 +286,18 @@ public class TopologyBuilder {
clusterInstance.setNetworkPartitionId(networkPartitionId);
clusterInstance.setPartitionId(partitionId);
cluster.addInstanceContext(instanceId, clusterInstance);
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = new ClusterInstanceCreatedEvent(serviceType,
clusterId, clusterInstance);
clusterInstanceCreatedEvent.setPartitionId(partitionId);
TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
}
public static void handleClusterRemoved(ClusterContext ctxt) throws RegistryException {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(ctxt.getCartridgeType());
String deploymentPolicy;
if (service == null) {
@@ -308,12 +308,12 @@ public class TopologyBuilder {
ctxt.getCartridgeType()));
}
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
Cluster cluster = service.removeCluster(ctxt.getClusterId());
deploymentPolicy = cluster.getDeploymentPolicyName();
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
}
@@ -324,7 +324,7 @@ public class TopologyBuilder {
* @param memberContext
*/
public static void handleMemberCreatedEvent(MemberContext memberContext) throws RegistryException {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(memberContext.getCartridgeType());
String clusterId = memberContext.getClusterId();
Cluster cluster = service.getCluster(clusterId);
@@ -340,14 +340,14 @@ public class TopologyBuilder {
throw new RuntimeException(String.format("Member %s already exists", memberId));
}
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId,
networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime);
member.setStatus(MemberStatus.Created);
member.setLbClusterId(lbClusterId);
member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
cluster.addMember(member);
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
//member created time
Long timestamp = System.currentTimeMillis();
@@ -368,7 +368,7 @@ public class TopologyBuilder {
}
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
}
@@ -379,7 +379,7 @@ public class TopologyBuilder {
* @param memberContext
*/
public static void handleMemberInitializedEvent(MemberContext memberContext) throws RegistryException {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(memberContext.getCartridgeType());
if (service == null) {
throw new RuntimeException(String.format("Service %s does not exist", memberContext.getCartridgeType()));
@@ -397,7 +397,7 @@ public class TopologyBuilder {
throw new RuntimeException(String.format("Member %s does not exist", memberContext.getMemberId()));
}
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
// Set instance id returned by the IaaS
member.setInstanceId(memberContext.getInstanceId());
@@ -430,7 +430,7 @@ public class TopologyBuilder {
member.setStatus(MemberStatus.Initialized);
log.info("Member status updated to initialized");
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
//member intialized time
Long timestamp = System.currentTimeMillis();
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
@@ -465,7 +465,7 @@ public class TopologyBuilder {
}
}
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
}
@@ -482,7 +482,7 @@ public class TopologyBuilder {
public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) {
try {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(instanceStartedEvent.getServiceName());
if (service == null) {
throw new RuntimeException(
@@ -504,7 +504,7 @@ public class TopologyBuilder {
}
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
// try update lifecycle state
if (!member.isStateTransitionValid(MemberStatus.Starting)) {
log.error("Invalid State Transition from " + member.getStatus() + " to " +
@@ -513,7 +513,7 @@ public class TopologyBuilder {
member.setStatus(MemberStatus.Starting);
log.info("member started event adding status started");
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
//member started time
Long timestamp = System.currentTimeMillis();
//memberStartedEvent.
@@ -538,7 +538,7 @@ public class TopologyBuilder {
}
}
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
} catch (Exception e) {
String message = String.format("Could not handle member started event: [application-id] %s "
@@ -549,7 +549,7 @@ public class TopologyBuilder {
}
public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) throws RegistryException {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(instanceActivatedEvent.getServiceName());
if (service == null) {
throw new RuntimeException(
@@ -579,7 +579,7 @@ public class TopologyBuilder {
//TODO
memberActivatedEvent.setApplicationId(null);
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
// try update lifecycle state
if (!member.isStateTransitionValid(MemberStatus.Active)) {
log.error("Invalid state transition from [" + member.getStatus() + "] to [" +
@@ -624,7 +624,7 @@ public class TopologyBuilder {
memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
//member activated time
Long timestamp = System.currentTimeMillis();
@@ -648,13 +648,13 @@ public class TopologyBuilder {
}
}
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
}
public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent)
throws InvalidMemberException, InvalidCartridgeTypeException, RegistryException {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName());
//update the status of the member
@@ -684,7 +684,7 @@ public class TopologyBuilder {
//member ReadyToShutDown state change time
Long timestamp = null;
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
throw new RuntimeException("Invalid State Transition from " + member.getStatus() + " to " +
@@ -693,10 +693,10 @@ public class TopologyBuilder {
member.setStatus(MemberStatus.ReadyToShutDown);
log.info("Member Ready to shut down event adding status started");
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
timestamp = System.currentTimeMillis();
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
//publishing member status to DAS.
@@ -721,7 +721,7 @@ public class TopologyBuilder {
public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent)
throws InvalidMemberException, InvalidCartridgeTypeException, RegistryException {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName());
//update the status of the member
if (service == null) {
@@ -746,7 +746,7 @@ public class TopologyBuilder {
instanceMaintenanceModeEvent.getClusterInstanceId(), instanceMaintenanceModeEvent.getMemberId(),
instanceMaintenanceModeEvent.getNetworkPartitionId(), instanceMaintenanceModeEvent.getPartitionId());
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
// try update lifecycle state
if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
throw new RuntimeException(
@@ -755,9 +755,9 @@ public class TopologyBuilder {
member.setStatus(MemberStatus.In_Maintenance);
log.info("member maintenance mode event adding status started");
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
//publishing data
TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
@@ -775,7 +775,7 @@ public class TopologyBuilder {
*/
public static void handleMemberTerminated(String serviceName, String clusterId, String networkPartitionId,
String partitionId, String memberId) throws RegistryException {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(serviceName);
Properties properties;
if (service == null) {
@@ -799,12 +799,12 @@ public class TopologyBuilder {
//member terminated time
Long timestamp = null;
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
properties = member.getProperties();
cluster.removeMember(member);
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
timestamp = System.currentTimeMillis();
}
/* @TODO leftover from grouping_poc*/
@@ -831,7 +831,7 @@ public class TopologyBuilder {
public static void handleClusterActivatedEvent(
ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) throws RegistryException {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
//update the status of the cluster
if (service == null) {
@@ -857,7 +857,7 @@ public class TopologyBuilder {
clusterStatusClusterActivatedEvent.getAppId(), clusterStatusClusterActivatedEvent.getServiceName(),
clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId());
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
Collection<KubernetesService> kubernetesServices = clusterContext
.getKubernetesServices(clusterStatusClusterActivatedEvent.getInstanceId());
@@ -932,7 +932,7 @@ public class TopologyBuilder {
if (context.isStateTransitionValid(status)) {
context.setStatus(status);
log.info("Cluster activated adding status started for " + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
// publish event
TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
} else {
@@ -942,7 +942,7 @@ public class TopologyBuilder {
clusterStatusClusterActivatedEvent.getInstanceId(), context.getStatus(), status));
}
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
}
@@ -961,7 +961,7 @@ public class TopologyBuilder {
public static void handleClusterInactivateEvent(ClusterStatusClusterInactivateEvent clusterInactivateEvent)
throws RegistryException {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(clusterInactivateEvent.getServiceName());
//update the status of the cluster
if (service == null) {
@@ -980,7 +980,7 @@ public class TopologyBuilder {
clusterInactivateEvent.getAppId(), clusterInactivateEvent.getServiceName(),
clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId());
try {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
if (context == null) {
throw new RuntimeException("Cluster Instance Context is not found for [cluster] " +
@@ -991,7 +991,7 @@ public class TopologyBuilder {
if (context.isStateTransitionValid(status)) {
context.setStatus(status);
log.info("Cluster Inactive adding status started for" + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
//publishing data
TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
} else {
@@ -1001,15 +1001,15 @@ public class TopologyBuilder {
context.getStatus(), status));
}
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
}
public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event)
throws RegistryException {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
try {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Service service = topology.getService(event.getServiceName());
//update the status of the cluster
@@ -1036,7 +1036,7 @@ public class TopologyBuilder {
log.info("Cluster Terminated adding status started for and removing the cluster instance" + cluster
.getClusterId());
cluster.removeInstanceContext(event.getInstanceId());
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
//publishing data
ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(
event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId());
@@ -1048,7 +1048,7 @@ public class TopologyBuilder {
event.getInstanceId(), context.getStatus(), status));
}
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
}
@@ -1056,10 +1056,10 @@ public class TopologyBuilder {
public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event)
throws RegistryException {
- TopologyManager.acquireWriteLock();
+ TopologyHolder.acquireWriteLock();
try {
- Topology topology = TopologyManager.getTopology();
+ Topology topology = TopologyHolder.getTopology();
Cluster cluster = topology.getService(event.getServiceName()).
getCluster(event.getClusterId());
@@ -1077,7 +1077,7 @@ public class TopologyBuilder {
if (context.isStateTransitionValid(status)) {
context.setStatus(status);
log.info("Cluster Terminating started for " + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
+ TopologyHolder.updateTopology(topology);
//publishing data
ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(
event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId());
@@ -1096,7 +1096,7 @@ public class TopologyBuilder {
event.getInstanceId(), context.getStatus(), status));
}
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java
new file mode 100644
index 0000000..d183ca0
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.messaging.topology;
+
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.registry.RegistryManager;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
+import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
+import org.apache.stratos.common.concurrent.locks.ReadWriteLock;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+
+/**
+ * Persistence and retrieval of Topology from Registry
+ */
+public class TopologyHolder {
+ private static final Log log = LogFactory.getLog(TopologyHolder.class);
+
+ private static volatile ReadWriteLock lock = new ReadWriteLock("topology-manager");
+ private static volatile Topology topology;
+
+ private TopologyHolder() {
+ }
+
+ public static void acquireReadLock() {
+ lock.acquireReadLock();
+ if (log.isDebugEnabled()) {
+ log.debug("Read lock acquired");
+ }
+ }
+
+ public static void releaseReadLock() {
+ lock.releaseReadLock();
+ if (log.isDebugEnabled()) {
+ log.debug("Read lock released");
+ }
+ }
+
+ public static void acquireWriteLock() {
+ lock.acquireWriteLock();
+ if (log.isDebugEnabled()) {
+ log.debug("Write lock acquired");
+ }
+ }
+
+ public static void releaseWriteLock() {
+ lock.releaseWriteLock();
+ if (log.isDebugEnabled()) {
+ log.debug("Write lock released");
+ }
+ }
+
+ public static Topology getTopology() {
+ if (topology == null) {
+ synchronized (TopologyHolder.class) {
+ if (topology == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Trying to retrieve topology from registry");
+ }
+ topology = CloudControllerUtil.retrieveTopology();
+ if (topology == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Topology not found in registry, creating new");
+ }
+ topology = new Topology();
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Topology initialized");
+ }
+ }
+ }
+ }
+ return topology;
+ }
+
+ /**
+ * Update in-memory topology and persist it in registry.
+ *
+ * @param updatedTopology
+ */
+ public static void updateTopology(Topology updatedTopology) throws RegistryException {
+ synchronized (TopologyHolder.class) {
+ if (log.isDebugEnabled()) {
+ log.debug("Updating topology");
+ }
+ topology = updatedTopology;
+ RegistryManager.getInstance().persist(CloudControllerConstants.TOPOLOGY_RESOURCE, topology);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Topology updated: %s", toJson(topology)));
+ }
+ }
+
+ }
+
+ private static String toJson(Object object) {
+ Gson gson = new Gson();
+ return gson.toJson(object);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java
deleted file mode 100644
index f6f6036..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.messaging.topology;
-
-import com.google.gson.Gson;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.registry.RegistryManager;
-import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
-import org.apache.stratos.common.concurrent.locks.ReadWriteLock;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.wso2.carbon.registry.core.exceptions.RegistryException;
-
-/**
- * Persistence and retrieval of Topology from Registry
- */
-public class TopologyManager {
- private static final Log log = LogFactory.getLog(TopologyManager.class);
-
- private static volatile ReadWriteLock lock = new ReadWriteLock("topology-manager");
- private static volatile Topology topology;
-
- private TopologyManager() {
- }
-
- public static void acquireReadLock() {
- lock.acquireReadLock();
- if (log.isDebugEnabled()) {
- log.debug("Read lock acquired");
- }
- }
-
- public static void releaseReadLock() {
- lock.releaseReadLock();
- if (log.isDebugEnabled()) {
- log.debug("Read lock released");
- }
- }
-
- public static void acquireWriteLock() {
- lock.acquireWriteLock();
- if (log.isDebugEnabled()) {
- log.debug("Write lock acquired");
- }
- }
-
- public static void releaseWriteLock() {
- lock.releaseWriteLock();
- if (log.isDebugEnabled()) {
- log.debug("Write lock released");
- }
- }
-
- public static Topology getTopology() {
- if (topology == null) {
- synchronized (TopologyManager.class) {
- if (topology == null) {
- if (log.isDebugEnabled()) {
- log.debug("Trying to retrieve topology from registry");
- }
- topology = CloudControllerUtil.retrieveTopology();
- if (topology == null) {
- if (log.isDebugEnabled()) {
- log.debug("Topology not found in registry, creating new");
- }
- topology = new Topology();
- }
- if (log.isDebugEnabled()) {
- log.debug("Topology initialized");
- }
- }
- }
- }
- return topology;
- }
-
- /**
- * Update in-memory topology and persist it in registry.
- *
- * @param updatedTopology
- */
- public static void updateTopology(Topology updatedTopology) throws RegistryException {
- synchronized (TopologyManager.class) {
- if (log.isDebugEnabled()) {
- log.debug("Updating topology");
- }
- topology = updatedTopology;
- RegistryManager.getInstance().persist(CloudControllerConstants.TOPOLOGY_RESOURCE, topology);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Topology updated: %s", toJson(topology)));
- }
- }
-
- }
-
- private static String toJson(Object object) {
- Gson gson = new Gson();
- return gson.toJson(object);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 582e78f..d3fa92d 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -30,7 +30,7 @@ import org.apache.stratos.cloud.controller.domain.kubernetes.KubernetesMaster;
import org.apache.stratos.cloud.controller.exception.*;
import org.apache.stratos.cloud.controller.iaases.Iaas;
import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
-import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder;
import org.apache.stratos.cloud.controller.services.CloudControllerService;
import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
@@ -640,8 +640,8 @@ public class CloudControllerServiceImpl implements CloudControllerService {
}
// check if status == active, if true, then this is a termination on member faulty
- TopologyManager.acquireWriteLock();
- Topology topology = TopologyManager.getTopology();
+ TopologyHolder.acquireWriteLock();
+ Topology topology = TopologyHolder.getTopology();
org.apache.stratos.messaging.domain.topology.Service service = topology
.getService(memberContext.getCartridgeType());
@@ -679,7 +679,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
log.error(message, e);
throw new CloudControllerException(message, e);
} finally {
- TopologyManager.releaseWriteLock();
+ TopologyHolder.releaseWriteLock();
}
return true;
}
@@ -826,7 +826,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
log.error(msg);
return;
}
- Collection<Member> members = TopologyManager.getTopology().
+ Collection<Member> members = TopologyHolder.getTopology().
getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers();
//finding the responding members from the existing members in the topology.
int sizeOfRespondingMembers = 0;
@@ -872,7 +872,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
log.error(msg);
return;
}
- Collection<Member> members = TopologyManager.getTopology().
+ Collection<Member> members = TopologyHolder.getTopology().
getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers();
while (members.size() > 0) {