You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ra...@apache.org on 2015/11/11 10:45:42 UTC

[1/3] stratos git commit: Closing STRATOS-1544, STRATOS-1612, STRATOS-1611: topology, tenant, application model initialize optimization

Repository: stratos
Updated Branches:
  refs/heads/stratos-4.1.x 00f624b48 -> 60b801144


http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
index f78f460..0994048 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
@@ -17,12 +17,12 @@
 
 import paho.mqtt.publish as publish
 
-from modules.event.instance.status.events import *
-from modules.util.log import *
-from modules.util import cartridgeagentutils
-import healthstats
 import constants
+import healthstats
 from config import Config
+from modules.event.instance.status.events import *
+from modules.util import cartridgeagentutils
+from modules.util.log import *
 
 log = LogFactory().get_log(__name__)
 publishers = {}
@@ -183,6 +183,20 @@ def publish_instance_ready_to_shutdown_event():
         log.warn("Instance already in a ReadyToShutDown event...")
 
 
+def publish_complete_topology_request_event():
+    complete_topology_request_event = CompleteTopologyRequestEvent()
+    publisher = get_publisher(constants.INITIALIZER_TOPIC + constants.COMPLETE_TOPOLOGY_REQUEST_EVENT)
+    publisher.publish(complete_topology_request_event)
+    log.info("Complete topology request event published")
+
+
+def publish_complete_tenant_request_event():
+    complete_tenant_request_event = CompleteTenantRequestEvent()
+    publisher = get_publisher(constants.INITIALIZER_TOPIC + constants.COMPLETE_TENANT_REQUEST_EVENT)
+    publisher.publish(complete_tenant_request_event)
+    log.info("Complete tenant request event published")
+
+
 def get_publisher(topic):
     if topic not in publishers:
         publishers[topic] = EventPublisher(topic)

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
index 8f80013..f0a70d4 100755
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentStartupTestCase.java
@@ -22,10 +22,14 @@ package org.apache.stratos.python.cartridge.agent.integration.tests;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.domain.LoadBalancingIPType;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
 import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
 import org.apache.stratos.messaging.event.topology.MemberInitializedEvent;
+import org.apache.stratos.messaging.listener.initializer.CompleteTenantRequestEventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteTopologyRequestEventListener;
 import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -55,6 +59,7 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
     private static final String SERVICE_NAME = "php";
     private boolean startupTestCompleted = false;
     private boolean topologyContextTestCompleted = false;
+    private boolean completeTenantInitialized = false;
     private boolean thriftTestCompleted = false;
     private Topology topology = createTestTopology();
 
@@ -74,7 +79,6 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
         startServerSocket(8080);
     }
 
-
     /**
      * TearDown method for test method testPythonCartridgeAgent
      */
@@ -83,8 +87,9 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
         tearDown();
     }
 
-    @Test(timeOut = STARTUP_TIMEOUT, description = "Test PCA initialization, activation, health stat publishing and " +
-            "topology context update", groups = {"smoke"})
+    @Test(timeOut = STARTUP_TIMEOUT,
+          description = "Test PCA initialization, activation, health stat publishing and " + "topology context update",
+          groups = { "smoke" })
     public void testPythonCartridgeAgent() {
         startCommunicatorThread();
         subscribeToThriftDatabridge();
@@ -99,24 +104,6 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
                     List<String> newLines = getNewLines(outputLines, outputStream.toString());
                     if (newLines.size() > 0) {
                         for (String line : newLines) {
-                            if (line.contains("Subscribed to 'topology/#'")) {
-                                sleep(2000);
-                                // Send complete topology event
-                                log.info("Publishing complete topology event...");
-                                CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
-                                publishEvent(completeTopologyEvent);
-                                log.info("Complete topology event published");
-
-                                // Publish member initialized event
-                                log.info("Publishing member initialized event...");
-                                MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent(
-                                        SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID,
-                                        PARTITION_ID, INSTANCE_ID
-                                );
-                                publishEvent(memberInitializedEvent);
-                                log.info("Member initialized event published");
-                            }
-
                             if (line.contains("Published event to thrift stream")) {
                                 startupTestCompleted = true;
                             }
@@ -125,6 +112,11 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
                             if (line.contains("Topology context update test passed!")) {
                                 topologyContextTestCompleted = true;
                             }
+
+                            // assert complete tenant initialization
+                            if (line.contains("Tenant context updated with")){
+                                completeTenantInitialized = true;
+                            }
                         }
                     }
                     sleep(1000);
@@ -134,6 +126,35 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
 
         startupTestThread.start();
 
+        initializerEventReceiver.addEventListener(new CompleteTopologyRequestEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                // Send complete topology event
+                log.info("CompleteTopologyRequestEvent received. Publishing complete topology event...");
+                CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
+                publishEvent(completeTopologyEvent);
+                log.info("Complete topology event published");
+
+                // Publish member initialized event
+                log.info("Publishing member initialized event...");
+                MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent(SERVICE_NAME, CLUSTER_ID,
+                        CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, PARTITION_ID, INSTANCE_ID);
+                publishEvent(memberInitializedEvent);
+                log.info("Member initialized event published");
+            }
+        });
+
+        initializerEventReceiver.addEventListener(new CompleteTenantRequestEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                // Send complete tenant event
+                log.info("CompleteTenantRequestEvent received. Publishing complete tenant event...");
+                CompleteTenantEvent completeTenantEvent = new CompleteTenantEvent(createTestTenantList());
+                publishEvent(completeTenantEvent);
+                log.info("Complete tenant event published");
+            }
+        });
+
         instanceStatusEventReceiver.addEventListener(new InstanceActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
@@ -148,7 +169,7 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
         });
 
         while (!instanceStarted || !instanceActivated || !startupTestCompleted || !topologyContextTestCompleted ||
-                !thriftTestCompleted) {
+                !thriftTestCompleted || !completeTenantInitialized) {
             // wait until the instance activated event is received.
             // this will assert whether instance got activated within timeout period; no need for explicit assertions
             sleep(2000);
@@ -180,6 +201,19 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
     }
 
     /**
+     * Create test tenant list
+     *
+     * @return List of tenant objects with mock information
+     */
+    private List<Tenant> createTestTenantList() {
+        List<Tenant> tenantList = new ArrayList<>();
+        tenantList.add(new Tenant(1, "test.one.domain"));
+        tenantList.add(new Tenant(2, "test.two.domain"));
+        tenantList.add(new Tenant(3, "test.three.domain"));
+        return tenantList;
+    }
+
+    /**
      * Create test topology
      *
      * @return Topology object with mock information
@@ -193,9 +227,8 @@ public class AgentStartupTestCase extends PythonAgentIntegrationTest {
                 AUTOSCALING_POLICY_NAME, APP_ID);
         service.addCluster(cluster);
 
-        Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID,
-                CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private,
-                System.currentTimeMillis());
+        Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, CLUSTER_INSTANCE_ID,
+                NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, System.currentTimeMillis());
 
         member.setDefaultPrivateIP("10.0.0.1");
         member.setDefaultPublicIP("20.0.0.1");

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index 66b9290..d441c1e 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener;
 import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener;
+import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
 import org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.util.MessagingUtil;
@@ -72,6 +73,7 @@ public class PythonAgentIntegrationTest {
     protected boolean eventReceiverInitiated = false;
     protected TopologyEventReceiver topologyEventReceiver;
     protected InstanceStatusEventReceiver instanceStatusEventReceiver;
+    protected InitializerEventReceiver initializerEventReceiver;
     protected boolean instanceStarted;
     protected boolean instanceActivated;
     protected ByteArrayOutputStreamLocal outputStream;
@@ -112,14 +114,17 @@ public class PythonAgentIntegrationTest {
                 }
             });
 
+            initializerEventReceiver = new InitializerEventReceiver();
+            initializerEventReceiver.setExecutorService(executorService);
+            initializerEventReceiver.execute();
+
             this.eventReceiverInitiated = true;
         }
 
         // Start CEP Thrift test server
         thriftTestServer = new ThriftTestServer();
 
-        File file =
-                new File(getResourcesPath() + PATH_SEP + "common" + PATH_SEP + "stratos-health-stream-def.json");
+        File file = new File(getResourcesPath() + PATH_SEP + "common" + PATH_SEP + "stratos-health-stream-def.json");
         FileInputStream fis = new FileInputStream(file);
         byte[] data = new byte[(int) file.length()];
         fis.read(data);
@@ -139,7 +144,6 @@ public class PythonAgentIntegrationTest {
         this.outputStream = executeCommand("python " + agentPath + PATH_SEP + "agent.py", timeout);
     }
 
-
     protected void tearDown() {
         tearDown(null);
     }
@@ -155,8 +159,7 @@ public class PythonAgentIntegrationTest {
                 log.info("Terminating process: " + commandText);
                 executor.setExitValue(0);
                 executor.getWatchdog().destroyProcess();
-            }
-            catch (Exception ignore) {
+            } catch (Exception ignore) {
             }
         }
         // wait until everything cleans up to avoid connection errors
@@ -165,36 +168,34 @@ public class PythonAgentIntegrationTest {
             try {
                 log.info("Stopping socket server: " + serverSocket.getLocalSocketAddress());
                 serverSocket.close();
-            }
-            catch (IOException ignore) {
+            } catch (IOException ignore) {
             }
         }
         try {
             if (thriftTestServer != null) {
                 thriftTestServer.stop();
             }
-        }
-        catch (Exception ignore) {
+        } catch (Exception ignore) {
         }
 
         if (sourcePath != null) {
             try {
                 log.info("Deleting source checkout folder...");
                 FileUtils.deleteDirectory(new File(sourcePath));
-            }
-            catch (Exception ignore) {
+            } catch (Exception ignore) {
             }
         }
+        log.info("Terminating event receivers...");
         this.instanceStatusEventReceiver.terminate();
         this.topologyEventReceiver.terminate();
+        this.initializerEventReceiver.terminate();
 
         this.instanceActivated = false;
         this.instanceStarted = false;
         try {
             broker.stop();
             broker = null;
-        }
-        catch (Exception ignore) {
+        } catch (Exception ignore) {
         }
         // TODO: use thread synchronization and assert all connections are properly closed
         // leave some room to clear up active connections
@@ -203,8 +204,7 @@ public class PythonAgentIntegrationTest {
 
     public PythonAgentIntegrationTest() throws IOException {
         integrationProperties
-                .load(PythonAgentIntegrationTest.class
-                        .getResourceAsStream(PATH_SEP + "integration-test.properties"));
+                .load(PythonAgentIntegrationTest.class.getResourceAsStream(PATH_SEP + "integration-test.properties"));
         distributionName = integrationProperties.getProperty(DISTRIBUTION_NAME);
         amqpBindAddress = integrationProperties.getProperty(ACTIVEMQ_AMQP_BIND_ADDRESS);
         mqttBindAddress = integrationProperties.getProperty(ACTIVEMQ_MQTT_BIND_ADDRESS);
@@ -223,7 +223,7 @@ public class PythonAgentIntegrationTest {
         AuthenticationUser authenticationUser = new AuthenticationUser("system", "manager", "users,admins");
         List<AuthenticationUser> authUserList = new ArrayList<>();
         authUserList.add(authenticationUser);
-        broker.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(authUserList)});
+        broker.setPlugins(new BrokerPlugin[] { new SimpleAuthenticationPlugin(authUserList) });
         broker.setBrokerName("testBroker");
         broker.setDataDirectory(
                 PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." + PATH_SEP +
@@ -244,8 +244,7 @@ public class PythonAgentIntegrationTest {
                             if (line.contains("Exception in thread") || line.contains("ERROR")) {
                                 try {
                                     throw new RuntimeException(line);
-                                }
-                                catch (Exception e) {
+                                } catch (Exception e) {
                                     log.error("ERROR found in PCA log", e);
                                 }
                             }
@@ -284,8 +283,7 @@ public class PythonAgentIntegrationTest {
                             log.info("Message received for [port] " + port + ", [message] " + output);
                         }
                     }
-                }
-                catch (IOException e) {
+                } catch (IOException e) {
                     String message = "Could not start server socket: [port] " + port;
                     log.error(message, e);
                     throw new RuntimeException(message, e);
@@ -300,7 +298,6 @@ public class PythonAgentIntegrationTest {
                 ".." + PATH_SEP + "src" + PATH_SEP + "test" + PATH_SEP + "resources" + PATH_SEP + "common";
     }
 
-
     public static String getResourcesPath() {
         return PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." + PATH_SEP +
                 ".." + PATH_SEP + "src" + PATH_SEP + "test" + PATH_SEP + "resources";
@@ -320,13 +317,12 @@ public class PythonAgentIntegrationTest {
         try {
             log.info("Setting up python cartridge agent...");
 
-
             String srcAgentPath = PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() +
                     PATH_SEP + ".." + PATH_SEP + ".." + PATH_SEP + ".." + PATH_SEP + ".." + PATH_SEP + "distribution" +
                     PATH_SEP + "target" + PATH_SEP + distributionName + ".zip";
-            String unzipDestPath =
-                    PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." + PATH_SEP +
-                            PYTHON_AGENT_DIR_NAME + PATH_SEP;
+            String unzipDestPath = PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".."
+                    + PATH_SEP +
+                    PYTHON_AGENT_DIR_NAME + PATH_SEP;
             //FileUtils.copyFile(new File(srcAgentPath), new File(destAgentPath));
             unzip(srcAgentPath, unzipDestPath);
             String destAgentPath = PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." +
@@ -370,8 +366,7 @@ public class PythonAgentIntegrationTest {
             log.info("Python cartridge agent setup completed");
 
             return destAgentPath;
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             String message = "Could not copy cartridge agent distribution";
             log.error(message, e);
             throw new RuntimeException(message, e);
@@ -442,8 +437,7 @@ public class PythonAgentIntegrationTest {
             });
             executorList.put(commandText, exec);
             return outputStream;
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.error(outputStream.toString(), e);
             throw new RuntimeException(e);
         }
@@ -457,8 +451,7 @@ public class PythonAgentIntegrationTest {
     protected void sleep(long time) {
         try {
             Thread.sleep(time);
-        }
-        catch (InterruptedException ignore) {
+        } catch (InterruptedException ignore) {
         }
     }
 
@@ -495,7 +488,6 @@ public class PythonAgentIntegrationTest {
         eventPublisher.publish(event);
     }
 
-
     /**
      * Implements ByteArrayOutputStream.isClosed() method
      */

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
index 721a5c6..e4650e4 100644
--- a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
+++ b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
@@ -35,6 +35,10 @@ import org.apache.stratos.messaging.listener.application.*;
 import org.apache.stratos.messaging.listener.topology.*;
 import org.apache.stratos.messaging.message.receiver.application.ApplicationManager;
 import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
+import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
+import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpManager;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.mock.iaas.client.MockIaasApiClient;
@@ -55,12 +59,18 @@ public class TopologyHandler {
     public static final int APPLICATION_ACTIVATION_TIMEOUT = 500000;
     public static final int APPLICATION_UNDEPLOYMENT_TIMEOUT = 500000;
     public static final int MEMBER_TERMINATION_TIMEOUT = 500000;
-    public static final int APPLICATION_TOPOLOGY_TIMEOUT = 120000;
+    public static final int APPLICATION_INIT_TIMEOUT = 20000;
+    public static final int TENANT_INIT_TIMEOUT = 20000;
+    public static final int APPLICATION_SIGNUP_INIT_TIMEOUT = 20000;
+    public static final int TOPOLOGY_INIT_TIMEOUT = 20000;
     public static final String APPLICATION_STATUS_CREATED = "Created";
     public static final String APPLICATION_STATUS_UNDEPLOYING = "Undeploying";
     private ApplicationsEventReceiver applicationsEventReceiver;
     private TopologyEventReceiver topologyEventReceiver;
+    private TenantEventReceiver tenantEventReceiver;
+    private ApplicationSignUpEventReceiver applicationSignUpEventReceiver;
     public static TopologyHandler topologyHandler;
+    private ExecutorService executorService = StratosThreadPool.getExecutorService("stratos.integration.test.pool", 10);
     private Map<String, Long> terminatedMembers = new ConcurrentHashMap<String, Long>();
     private Map<String, Long> terminatingMembers = new ConcurrentHashMap<String, Long>();
     private Map<String, Long> createdMembers = new ConcurrentHashMap<String, Long>();
@@ -70,12 +80,28 @@ public class TopologyHandler {
     private TopologyHandler() {
         initializeApplicationEventReceiver();
         initializeTopologyEventReceiver();
+        initializeTenantEventReceiver();
+        initializeApplicationSignUpEventReceiver();
         assertApplicationTopologyInitialized();
         assertTopologyInitialized();
+        assertTenantInitialized();
+        assertApplicationSignUpInitialized();
         addTopologyEventListeners();
         addApplicationEventListeners();
     }
 
+    private void initializeApplicationSignUpEventReceiver() {
+        applicationSignUpEventReceiver = new ApplicationSignUpEventReceiver();
+        applicationSignUpEventReceiver.setExecutorService(executorService);
+        applicationSignUpEventReceiver.execute();
+    }
+
+    private void initializeTenantEventReceiver() {
+        tenantEventReceiver = new TenantEventReceiver();
+        tenantEventReceiver.setExecutorService(executorService);
+        tenantEventReceiver.execute();
+    }
+
     public static TopologyHandler getInstance() {
         if (topologyHandler == null) {
             synchronized (TopologyHandler.class) {
@@ -91,30 +117,25 @@ public class TopologyHandler {
      * Initialize application event receiver
      */
     private void initializeApplicationEventReceiver() {
-        if (applicationsEventReceiver == null) {
-            applicationsEventReceiver = new ApplicationsEventReceiver();
-            ExecutorService executorService = StratosThreadPool.getExecutorService("STRATOS_TEST_SERVER", 1);
-            applicationsEventReceiver.setExecutorService(executorService);
-            applicationsEventReceiver.execute();
-        }
+        applicationsEventReceiver = new ApplicationsEventReceiver();
+        applicationsEventReceiver.setExecutorService(executorService);
+        applicationsEventReceiver.execute();
     }
 
     /**
      * Initialize Topology event receiver
      */
     private void initializeTopologyEventReceiver() {
-        if (topologyEventReceiver == null) {
-            topologyEventReceiver = new TopologyEventReceiver();
-            ExecutorService executorService = StratosThreadPool.getExecutorService("STRATOS_TEST_SERVER1", 1);
-            topologyEventReceiver.setExecutorService(executorService);
-            topologyEventReceiver.execute();
-        }
+        topologyEventReceiver = new TopologyEventReceiver();
+        topologyEventReceiver.setExecutorService(executorService);
+        topologyEventReceiver.execute();
     }
 
     /**
      * Assert application Topology initialization
      */
     private void assertApplicationTopologyInitialized() {
+        log.info(String.format("Asserting application topology initialization within %d ms", APPLICATION_INIT_TIMEOUT));
         long startTime = System.currentTimeMillis();
         boolean applicationTopologyInitialized = ApplicationManager.getApplications().isInitialized();
         while (!applicationTopologyInitialized) {
@@ -123,18 +144,24 @@ public class TopologyHandler {
             } catch (InterruptedException ignore) {
             }
             applicationTopologyInitialized = ApplicationManager.getApplications().isInitialized();
-            if ((System.currentTimeMillis() - startTime) > APPLICATION_TOPOLOGY_TIMEOUT) {
+            if ((System.currentTimeMillis() - startTime) > APPLICATION_INIT_TIMEOUT) {
                 break;
             }
         }
-        assertEquals(String.format("Application Topology didn't get initialized "), applicationTopologyInitialized,
-                true);
+        if (applicationTopologyInitialized) {
+            log.info(String.format("Application topology initialized under %d ms",
+                    (System.currentTimeMillis() - startTime)));
+        }
+        assertEquals(
+                String.format("Application topology didn't get initialized within %d ms", APPLICATION_INIT_TIMEOUT),
+                applicationTopologyInitialized, true);
     }
 
     /**
      * Assert Topology initialization
      */
     private void assertTopologyInitialized() {
+        log.info(String.format("Asserting topology initialization within %d ms", TOPOLOGY_INIT_TIMEOUT));
         long startTime = System.currentTimeMillis();
         boolean topologyInitialized = TopologyManager.getTopology().isInitialized();
         while (!topologyInitialized) {
@@ -143,11 +170,59 @@ public class TopologyHandler {
             } catch (InterruptedException ignore) {
             }
             topologyInitialized = TopologyManager.getTopology().isInitialized();
-            if ((System.currentTimeMillis() - startTime) > APPLICATION_TOPOLOGY_TIMEOUT) {
+            if ((System.currentTimeMillis() - startTime) > TOPOLOGY_INIT_TIMEOUT) {
                 break;
             }
         }
-        assertEquals(String.format("Topology didn't get initialized "), topologyInitialized, true);
+        if (topologyInitialized) {
+            log.info(String.format("Topology initialized under %d ms", (System.currentTimeMillis() - startTime)));
+        }
+        assertEquals(String.format("Topology didn't get initialized within %d ms", TOPOLOGY_INIT_TIMEOUT),
+                topologyInitialized, true);
+    }
+
+    private void assertTenantInitialized() {
+        log.info(String.format("Asserting tenant model initialization within %d ms", TENANT_INIT_TIMEOUT));
+        long startTime = System.currentTimeMillis();
+        boolean tenantInitialized = TenantManager.getInstance().isInitialized();
+        while (!tenantInitialized) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+            tenantInitialized = TenantManager.getInstance().isInitialized();
+            if ((System.currentTimeMillis() - startTime) > TENANT_INIT_TIMEOUT) {
+                break;
+            }
+        }
+        if (tenantInitialized) {
+            log.info(String.format("Tenant model initialized under %d ms", (System.currentTimeMillis() - startTime)));
+        }
+        assertEquals(String.format("Tenant model didn't get initialized within %d ms", TENANT_INIT_TIMEOUT),
+                tenantInitialized, true);
+    }
+
+    private void assertApplicationSignUpInitialized() {
+        log.info(String.format("Asserting application signup initialization within %d ms",
+                APPLICATION_SIGNUP_INIT_TIMEOUT));
+        long startTime = System.currentTimeMillis();
+        boolean applicationSignUpInitialized = ApplicationSignUpManager.getInstance().isInitialized();
+        while (!applicationSignUpInitialized) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+            applicationSignUpInitialized = ApplicationSignUpManager.getInstance().isInitialized();
+            if ((System.currentTimeMillis() - startTime) > APPLICATION_SIGNUP_INIT_TIMEOUT) {
+                break;
+            }
+        }
+        if (applicationSignUpInitialized) {
+            log.info(String.format("Application signup initialized under %d ms",
+                    (System.currentTimeMillis() - startTime)));
+        }
+        assertEquals(String.format("Application signup didn't get initialized within %d ms",
+                APPLICATION_SIGNUP_INIT_TIMEOUT), applicationSignUpInitialized, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/products/stratos/modules/integration/test-integration/src/test/resources/common/log4j.properties
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-integration/src/test/resources/common/log4j.properties b/products/stratos/modules/integration/test-integration/src/test/resources/common/log4j.properties
index 6fc6f45..b99dd63 100644
--- a/products/stratos/modules/integration/test-integration/src/test/resources/common/log4j.properties
+++ b/products/stratos/modules/integration/test-integration/src/test/resources/common/log4j.properties
@@ -59,7 +59,7 @@ log4j.logger.org.apache.stratos.cloud.controller=DEBUG
 log4j.logger.org.wso2.andes.client=ERROR
 # Autoscaler rule logs
 log4j.logger.org.apache.stratos.autoscaler.rule.RuleLog=DEBUG
-log4j.logger.org.apache.stratos.cloud.controller.messaging.topology.TopologyManager=INFO
+log4j.logger.org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder=INFO
 log4j.logger.org.apache.stratos.mock.iaas.client=DEBUG
 log4j.logger.org.apache.stratos.mock.iaas.services=DEBUG
 log4j.logger.org.apache.stratos.metadata.service=DEBUG


[2/3] stratos git commit: Closing STRATOS-1544, STRATOS-1612, STRATOS-1611: topology, tenant, application model initialize optimization

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 5f7bd01..bd09d7e 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -22,7 +22,6 @@ import com.hazelcast.core.HazelcastInstance;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.Component;
-import org.apache.stratos.common.services.ComponentActivationEventListener;
 import org.apache.stratos.common.services.ComponentStartUpSynchronizer;
 import org.apache.stratos.common.services.DistributedObjectProvider;
 import org.apache.stratos.common.threading.StratosThreadPool;
@@ -31,6 +30,7 @@ import org.apache.stratos.manager.messaging.publisher.TenantEventPublisher;
 import org.apache.stratos.manager.messaging.publisher.synchronizer.ApplicationSignUpEventSynchronizer;
 import org.apache.stratos.manager.messaging.publisher.synchronizer.TenantEventSynchronizer;
 import org.apache.stratos.manager.messaging.receiver.StratosManagerApplicationEventReceiver;
+import org.apache.stratos.manager.messaging.receiver.StratosManagerInitializerTopicReceiver;
 import org.apache.stratos.manager.messaging.receiver.StratosManagerInstanceStatusEventReceiver;
 import org.apache.stratos.manager.messaging.receiver.StratosManagerTopologyEventReceiver;
 import org.apache.stratos.manager.user.management.TenantUserRoleManager;
@@ -73,9 +73,11 @@ import java.util.concurrent.TimeUnit;
  * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
  * cardinality="1..1" policy="dynamic" bind="setTaskService"
  * unbind="unsetTaskService"
- * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.services.DistributedObjectProvider"
+ * @scr.reference name="distributedObjectProvider"
+ * interface="org.apache.stratos.common.services.DistributedObjectProvider"
  * cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
- * @scr.reference name="componentStartUpSynchronizer" interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
+ * @scr.reference name="componentStartUpSynchronizer"
+ * interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
  * cardinality="1..1" policy="dynamic" bind="setComponentStartUpSynchronizer" unbind="unsetComponentStartUpSynchronizer"
  */
 public class StratosManagerServiceComponent {
@@ -89,6 +91,7 @@ public class StratosManagerServiceComponent {
     private StratosManagerTopologyEventReceiver topologyEventReceiver;
     private StratosManagerInstanceStatusEventReceiver instanceStatusEventReceiver;
     private StratosManagerApplicationEventReceiver applicationEventReceiver;
+    private StratosManagerInitializerTopicReceiver initializerTopicReceiver;
     private ExecutorService executorService;
     private ScheduledExecutorService scheduler;
 
@@ -98,21 +101,21 @@ public class StratosManagerServiceComponent {
         }
         try {
             executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE);
-            scheduler = StratosThreadPool.getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID,
-                    SCHEDULER_THREAD_POOL_SIZE);
+            scheduler = StratosThreadPool
+                    .getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, SCHEDULER_THREAD_POOL_SIZE);
 
             Runnable stratosManagerActivator = new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        ComponentStartUpSynchronizer componentStartUpSynchronizer =
-                                ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
+                        ComponentStartUpSynchronizer componentStartUpSynchronizer = ServiceReferenceHolder.getInstance()
+                                .getComponentStartUpSynchronizer();
 
                         // Wait for cloud controller and autoscaler components to be activated
-                        componentStartUpSynchronizer.waitForComponentActivation(Component.StratosManager,
-                                Component.CloudController);
-                        componentStartUpSynchronizer.waitForComponentActivation(Component.StratosManager,
-                                Component.Autoscaler);
+                        componentStartUpSynchronizer
+                                .waitForComponentActivation(Component.StratosManager, Component.CloudController);
+                        componentStartUpSynchronizer
+                                .waitForComponentActivation(Component.StratosManager, Component.Autoscaler);
 
                         CartridgeConfigFileReader.readProperties();
                         if (StratosManagerContext.getInstance().isClustered()) {
@@ -123,8 +126,8 @@ public class StratosManagerServiceComponent {
                                         ServiceReferenceHolder.getInstance().getHazelcastInstance()
                                                 .getLock(STRATOS_MANAGER_COORDINATOR_LOCK).lock();
 
-                                        String localMemberId = ServiceReferenceHolder.getInstance().getHazelcastInstance()
-                                                .getCluster().getLocalMember().getUuid();
+                                        String localMemberId = ServiceReferenceHolder.getInstance()
+                                                .getHazelcastInstance().getCluster().getLocalMember().getUuid();
                                         log.info("Elected this member [" + localMemberId + "] " +
                                                 "as the stratos manager coordinator for the cluster");
 
@@ -149,8 +152,8 @@ public class StratosManagerServiceComponent {
                         // Initialize application event receiver
                         initializeApplicationEventReceiver();
 
-                        componentStartUpSynchronizer.waitForAxisServiceActivation(Component.StratosManager,
-                                "StratosManagerService");
+                        componentStartUpSynchronizer
+                                .waitForAxisServiceActivation(Component.StratosManager, "StratosManagerService");
                         componentStartUpSynchronizer.setComponentStatus(Component.StratosManager, true);
                         if (log.isInfoEnabled()) {
                             log.info("Stratos manager component is activated");
@@ -174,17 +177,25 @@ public class StratosManagerServiceComponent {
      * @throws UserStoreException
      * @throws UserManagerException
      */
-    private void executeCoordinatorTasks(ComponentContext componentContext) throws UserStoreException,
-            UserManagerException {
-
+    private void executeCoordinatorTasks(ComponentContext componentContext)
+            throws UserStoreException, UserManagerException {
         initializeTenantEventPublisher(componentContext);
         initializeInstanceStatusEventReceiver();
-        registerComponentStartUpEventListeners();
-
+        initializeInitializerEventReceiver();
+        Runnable tenantSynchronizer = new TenantEventSynchronizer();
+        scheduler.scheduleAtFixedRate(tenantSynchronizer, 0, 1, TimeUnit.MINUTES);
+        Runnable applicationSignUpSynchronizer = new ApplicationSignUpEventSynchronizer();
+        scheduler.scheduleAtFixedRate(applicationSignUpSynchronizer, 0, 1, TimeUnit.MINUTES);
         // Create internal/user Role at server start-up
         createInternalUserRole(componentContext);
     }
 
+    private void initializeInitializerEventReceiver() {
+        initializerTopicReceiver = new StratosManagerInitializerTopicReceiver();
+        initializerTopicReceiver.setExecutorService(executorService);
+        initializerTopicReceiver.execute();
+    }
+
     /**
      * Initialize instance status event receiver
      */
@@ -219,16 +230,17 @@ public class StratosManagerServiceComponent {
      * @throws UserStoreException
      * @throws UserManagerException
      */
-    private void createInternalUserRole(ComponentContext componentContext) throws UserStoreException, UserManagerException {
+    private void createInternalUserRole(ComponentContext componentContext)
+            throws UserStoreException, UserManagerException {
         RealmService realmService = ServiceReferenceHolder.getRealmService();
         UserRealm realm = realmService.getBootstrapRealm();
         UserStoreManager userStoreManager = realm.getUserStoreManager();
         UserRoleCreator.createInternalUserRole(userStoreManager);
 
         TenantUserRoleManager tenantUserRoleManager = new TenantUserRoleManager();
-        componentContext.getBundleContext().registerService(
-                org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(),
-                tenantUserRoleManager, null);
+        componentContext.getBundleContext()
+                .registerService(org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(),
+                        tenantUserRoleManager, null);
     }
 
     /**
@@ -242,44 +254,19 @@ public class StratosManagerServiceComponent {
             log.debug("Initializing tenant event publisher...");
         }
         final TenantEventPublisher tenantEventPublisher = new TenantEventPublisher();
-        componentContext.getBundleContext().registerService(
-                org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(),
-                tenantEventPublisher, null);
+        componentContext.getBundleContext()
+                .registerService(org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(),
+                        tenantEventPublisher, null);
         if (log.isInfoEnabled()) {
             log.info("Tenant event publisher initialized");
         }
     }
 
-    private void registerComponentStartUpEventListeners() {
-        ComponentStartUpSynchronizer componentStartUpSynchronizer =
-                ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
-        if (componentStartUpSynchronizer.isEnabled()) {
-            componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() {
-                @Override
-                public void activated(Component component) {
-                    if (component == Component.StratosManager) {
-                        scheduleEventSynchronizers();
-                    }
-                }
-            });
-        } else {
-            scheduleEventSynchronizers();
-        }
-    }
-
-    private void scheduleEventSynchronizers() {
-        Runnable tenantSynchronizer = new TenantEventSynchronizer();
-        scheduler.scheduleAtFixedRate(tenantSynchronizer, 0, 1, TimeUnit.MINUTES);
-
-        Runnable applicationSignUpSynchronizer = new ApplicationSignUpEventSynchronizer();
-        scheduler.scheduleAtFixedRate(applicationSignUpSynchronizer, 0, 1, TimeUnit.MINUTES);
-    }
-
     protected void setConfigurationContextService(ConfigurationContextService contextService) {
         ServiceReferenceHolder.setClientConfigContext(contextService.getClientConfigContext());
         ServiceReferenceHolder.setServerConfigContext(contextService.getServerConfigContext());
-        ServiceReferenceHolder.getInstance().setAxisConfiguration(
-                contextService.getServerConfigContext().getAxisConfiguration());
+        ServiceReferenceHolder.getInstance()
+                .setAxisConfiguration(contextService.getServerConfigContext().getAxisConfiguration());
     }
 
     protected void unsetConfigurationContextService(ConfigurationContextService contextService) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
index d7d8ef4..f4cfbd8 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
@@ -42,9 +42,13 @@ public class TenantEventSynchronizer implements Runnable {
 
     @Override
     public void run() {
+        sendCompleteTenantEvent();
+    }
+
+    public static synchronized void sendCompleteTenantEvent(){
         try {
             if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing complete tenant event"));
+                log.debug("Publishing complete tenant event");
             }
             Tenant tenant;
             List<Tenant> tenants = new ArrayList<Tenant>();

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
new file mode 100644
index 0000000..ed526a6
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.manager.messaging.receiver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.components.ApplicationSignUpHandler;
+import org.apache.stratos.manager.messaging.publisher.ApplicationSignUpEventPublisher;
+import org.apache.stratos.manager.messaging.publisher.synchronizer.TenantEventSynchronizer;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.listener.initializer.CompleteApplicationSignUpsRequestEventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteTenantRequestEventListener;
+import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
+
+import java.util.concurrent.ExecutorService;
+
+public class StratosManagerInitializerTopicReceiver {
+    private static final Log log = LogFactory.getLog(StratosManagerInitializerTopicReceiver.class);
+    private InitializerEventReceiver initializerEventReceiver;
+    private ExecutorService executorService;
+    private ApplicationSignUpHandler applicationSignUpHandler;
+
+    public StratosManagerInitializerTopicReceiver() {
+        this.initializerEventReceiver = new InitializerEventReceiver();
+        applicationSignUpHandler = new ApplicationSignUpHandler();
+        addEventListeners();
+    }
+
+    public void execute() {
+        initializerEventReceiver.setExecutorService(executorService);
+        initializerEventReceiver.execute();
+        if (log.isInfoEnabled()) {
+            log.info("Cloud controller initializer topic receiver started");
+        }
+    }
+
+    private void addEventListeners() {
+        initializerEventReceiver.addEventListener(new CompleteTenantRequestEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Handling CompleteTenantRequestEvent");
+                }
+                try {
+                    TenantEventSynchronizer.sendCompleteTenantEvent();
+                } catch (Exception e) {
+                    log.error("Failed to process CompleteTenantRequestEvent", e);
+                }
+            }
+        });
+
+        initializerEventReceiver.addEventListener(new CompleteApplicationSignUpsRequestEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Handling CompleteApplicationSignUpsRequestEvent");
+                }
+                try {
+                    ApplicationSignUpEventPublisher
+                            .publishCompleteApplicationSignUpsEvent(applicationSignUpHandler.getApplicationSignUps());
+                } catch (Exception e) {
+                    log.error("Failed to process CompleteApplicationSignUpsRequestEvent", e);
+                }
+            }
+        });
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java
new file mode 100644
index 0000000..ba912fd
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.initializer;
+
+import java.io.Serializable;
+
+public class CompleteApplicationSignUpsRequestEvent extends InitializerEvent implements Serializable {
+    public CompleteApplicationSignUpsRequestEvent() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java
new file mode 100644
index 0000000..621b8a6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.initializer;
+
+import java.io.Serializable;
+
+public class CompleteApplicationsRequestEvent extends InitializerEvent implements Serializable {
+    public CompleteApplicationsRequestEvent() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java
new file mode 100644
index 0000000..32416a6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.initializer;
+
+import java.io.Serializable;
+
+public class CompleteTenantRequestEvent extends InitializerEvent implements Serializable {
+    public CompleteTenantRequestEvent() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java
new file mode 100644
index 0000000..257bf44
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.initializer;
+
+import java.io.Serializable;
+
+public class CompleteTopologyRequestEvent extends InitializerEvent implements Serializable {
+    public CompleteTopologyRequestEvent() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java
new file mode 100644
index 0000000..fa54bb2
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.initializer;
+
+import org.apache.stratos.messaging.event.Event;
+
+import java.io.Serializable;
+
+public class InitializerEvent extends Event implements Serializable {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java
new file mode 100644
index 0000000..08e4c85
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.initializer;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class CompleteApplicationSignUpsRequestEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java
new file mode 100644
index 0000000..f1da2da
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.initializer;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class CompleteApplicationsRequestEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java
new file mode 100644
index 0000000..20bdf13
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.initializer;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class CompleteTenantRequestEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java
new file mode 100644
index 0000000..cbac1c9
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.listener.initializer;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class CompleteTopologyRequestEventListener extends EventListener {
+
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java
index 8346d84..17922dd 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java
@@ -85,6 +85,9 @@ public class CompleteApplicationsMessageProcessor extends MessageProcessor {
                 log.debug("No Application information found in Complete Applications event");
             }
         }
+        if (log.isInfoEnabled()) {
+            log.info("Application topology initialized");
+        }
 
         // Set topology initialized
         applications.setInitialized(true);

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java
new file mode 100644
index 0000000..557de79
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.initializer;
+
+import org.apache.stratos.messaging.event.initializer.CompleteApplicationSignUpsRequestEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.MessagingUtil;
+
+public class CompleteApplicationSignUpsRequestMessageProcessor extends MessageProcessor {
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (CompleteApplicationSignUpsRequestEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            CompleteApplicationSignUpsRequestEvent event = (CompleteApplicationSignUpsRequestEvent) MessagingUtil
+                    .jsonToObject(message, CompleteApplicationSignUpsRequestEvent.class);
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format(
+                        "Failed to process message using available message processors: [type] %s [body] %s", type,
+                        message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java
new file mode 100644
index 0000000..ae80a68
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.initializer;
+
+import org.apache.stratos.messaging.event.initializer.CompleteApplicationsRequestEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.MessagingUtil;
+
+public class CompleteApplicationsRequestMessageProcessor extends MessageProcessor {
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (CompleteApplicationsRequestEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            CompleteApplicationsRequestEvent event = (CompleteApplicationsRequestEvent) MessagingUtil
+                    .jsonToObject(message, CompleteApplicationsRequestEvent.class);
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format(
+                        "Failed to process message using available message processors: [type] %s [body] %s", type,
+                        message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java
new file mode 100644
index 0000000..a6af9fd
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.initializer;
+
+import org.apache.stratos.messaging.event.initializer.CompleteTenantRequestEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.MessagingUtil;
+
+public class CompleteTenantRequestMessageProcessor extends MessageProcessor {
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (CompleteTenantRequestEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            CompleteTenantRequestEvent event = (CompleteTenantRequestEvent) MessagingUtil
+                    .jsonToObject(message, CompleteTenantRequestEvent.class);
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format(
+                        "Failed to process message using available message processors: [type] %s [body] %s", type,
+                        message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java
new file mode 100644
index 0000000..73e4bf7
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.initializer;
+
+import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.MessagingUtil;
+
+public class CompleteTopologyRequestMessageProcessor extends MessageProcessor {
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (CompleteTopologyRequestEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            CompleteTopologyRequestEvent event = (CompleteTopologyRequestEvent) MessagingUtil
+                    .jsonToObject(message, CompleteTopologyRequestEvent.class);
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format(
+                        "Failed to process message using available message processors: [type] %s [body] %s", type,
+                        message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java
new file mode 100644
index 0000000..f3e292f
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteApplicationSignUpsRequestEventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteApplicationsRequestEventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteTenantRequestEventListener;
+import org.apache.stratos.messaging.listener.initializer.CompleteTopologyRequestEventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+public class InitializerMessageProcessorChain extends MessageProcessorChain {
+    private static final Log log = LogFactory.getLog(InitializerMessageProcessorChain.class);
+    private CompleteTopologyRequestMessageProcessor completeTopologyRequestMessageProcessor;
+    private CompleteApplicationsRequestMessageProcessor completeApplicationsRequestMessageProcessor;
+    private CompleteTenantRequestMessageProcessor completeTenantRequestMessageProcessor;
+    private CompleteApplicationSignUpsRequestMessageProcessor completeApplicationSignUpsRequestMessageProcessor;
+
+    @Override
+    protected void initialize() {
+        completeTopologyRequestMessageProcessor = new CompleteTopologyRequestMessageProcessor();
+        add(completeTopologyRequestMessageProcessor);
+
+        completeApplicationsRequestMessageProcessor = new CompleteApplicationsRequestMessageProcessor();
+        add(completeApplicationsRequestMessageProcessor);
+
+        completeTenantRequestMessageProcessor = new CompleteTenantRequestMessageProcessor();
+        add(completeTenantRequestMessageProcessor);
+
+        completeApplicationSignUpsRequestMessageProcessor = new CompleteApplicationSignUpsRequestMessageProcessor();
+        add(completeApplicationSignUpsRequestMessageProcessor);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Initializer message processor chain initialized");
+        }
+    }
+
+    @Override
+    public void addEventListener(EventListener eventListener) {
+        if (eventListener instanceof CompleteTopologyRequestEventListener) {
+            completeTopologyRequestMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof CompleteApplicationsRequestEventListener) {
+            completeApplicationsRequestMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof CompleteTenantRequestEventListener) {
+            completeTenantRequestMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof CompleteApplicationSignUpsRequestEventListener) {
+            completeApplicationSignUpsRequestMessageProcessor.addEventListener(eventListener);
+        } else {
+            throw new RuntimeException("Unknown event listener");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
index 6b9085f..d7e7196 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
@@ -20,7 +20,10 @@ package org.apache.stratos.messaging.message.receiver.application;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
+import org.apache.stratos.messaging.event.initializer.CompleteApplicationsRequestEvent;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.MessagingUtil;
 
@@ -32,7 +35,6 @@ public class ApplicationsEventReceiver {
     private ApplicationsEventMessageDelegator messageDelegator;
     private ApplicationsEventMessageListener messageListener;
     private EventSubscriber eventSubscriber;
-    private boolean terminated;
     private ExecutorService executorService;
 
     public ApplicationsEventReceiver() {
@@ -45,11 +47,11 @@ public class ApplicationsEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
-
     public void execute() {
         try {
             // Start topic subscriber thread
-            eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(), messageListener);
+            eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(),
+                    messageListener);
             executorService.execute(eventSubscriber);
 
             if (log.isDebugEnabled()) {
@@ -62,8 +64,7 @@ public class ApplicationsEventReceiver {
             if (log.isDebugEnabled()) {
                 log.debug("Application status event message delegator thread started");
             }
-
-
+            initializeCompleteApplicationsModel();
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("Application status failed", e);
@@ -74,7 +75,26 @@ public class ApplicationsEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        terminated = true;
+    }
+
+    public void initializeCompleteApplicationsModel() {
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                while (!eventSubscriber.isSubscribed()) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignore) {
+                    }
+                }
+
+                CompleteApplicationsRequestEvent completeApplicationsRequestEvent
+                        = new CompleteApplicationsRequestEvent();
+                String topic = MessagingUtil.getMessageTopicName(completeApplicationsRequestEvent);
+                EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
+                eventPublisher.publish(completeApplicationsRequestEvent);
+            }
+        });
     }
 
     public ExecutorService getExecutorService() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index 7863ee4..55e3fd1 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -21,7 +21,10 @@ package org.apache.stratos.messaging.message.receiver.application.signup;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
+import org.apache.stratos.messaging.event.initializer.CompleteApplicationSignUpsRequestEvent;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.MessagingUtil;
 
@@ -49,15 +52,14 @@ public class ApplicationSignUpEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
-
     public void execute() {
         try {
             // Start topic subscriber thread
-            eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_SIGNUP_TOPIC.getTopicName(), messageListener);
+            eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_SIGNUP_TOPIC.getTopicName(),
+                    messageListener);
             // subscriber.setMessageListener(messageListener);
             executorService.execute(eventSubscriber);
 
-
             if (log.isDebugEnabled()) {
                 log.debug("Application signup event message receiver thread started");
             }
@@ -68,7 +70,7 @@ public class ApplicationSignUpEventReceiver {
                 log.debug("Application signup event message delegator thread started");
             }
 
-
+            initializeCompleteApplicationSignUps();
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("Application signup receiver failed", e);
@@ -76,6 +78,26 @@ public class ApplicationSignUpEventReceiver {
         }
     }
 
+    public void initializeCompleteApplicationSignUps() {
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                while (!eventSubscriber.isSubscribed()) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignore) {
+                    }
+                }
+
+                CompleteApplicationSignUpsRequestEvent completeApplicationSignUpsRequestEvent
+                        = new CompleteApplicationSignUpsRequestEvent();
+                String topic = MessagingUtil.getMessageTopicName(completeApplicationSignUpsRequestEvent);
+                EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
+                eventPublisher.publish(completeApplicationSignUpsRequestEvent);
+            }
+        });
+    }
+
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
new file mode 100644
index 0000000..ffd2ae4
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.Message;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.initializer.InitializerMessageProcessorChain;
+
+public class InitializerEventMessageDelegator implements Runnable {
+    private static final Log log = LogFactory.getLog(InitializerEventMessageDelegator.class);
+
+    private MessageProcessorChain processorChain;
+    private InitializerEventMessageQueue messageQueue;
+    private boolean terminated;
+
+    public InitializerEventMessageDelegator(InitializerEventMessageQueue initializerEventMessageQueue) {
+        this.messageQueue = initializerEventMessageQueue;
+        this.processorChain = new InitializerMessageProcessorChain();
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        processorChain.addEventListener(eventListener);
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (log.isInfoEnabled()) {
+                log.info("Initializer event message delegator started");
+            }
+
+            while (!terminated) {
+                try {
+                    Message message = messageQueue.take();
+                    String type = message.getEventClassName();
+
+                    // Retrieve the actual message
+                    String json = message.getText();
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Initializer event message [%s] received from queue: %s", type,
+                                messageQueue.getClass()));
+                    }
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Delegating initializer event message: %s", type));
+                    }
+                    processorChain.process(type, json, null);
+                } catch (InterruptedException ignore) {
+                    log.info("Shutting down initializer event message delegator...");
+                    terminate();
+                } catch (Exception e) {
+                    log.error("Failed to retrieve initializer event message", e);
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Initializer event message delegator failed", e);
+            }
+        }
+    }
+
+    /**
+     * Terminate initializer event message delegator thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java
new file mode 100644
index 0000000..bdfe875
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.MessageListener;
+import org.apache.stratos.messaging.domain.Message;
+
+public class InitializerEventMessageListener implements MessageListener {
+    private static final Log log = LogFactory.getLog(InitializerEventMessageListener.class);
+
+    private final InitializerEventMessageQueue messageQueue;
+
+    public InitializerEventMessageListener(InitializerEventMessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+    @Override
+    public void messageReceived(Message message) {
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Initializer event message received: %s", message.getText()));
+            }
+            // Add received message to the queue
+            messageQueue.add(message);
+
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java
new file mode 100644
index 0000000..326c5b8
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.initializer;
+
+import org.apache.stratos.messaging.domain.Message;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class InitializerEventMessageQueue extends LinkedBlockingQueue<Message> {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
new file mode 100644
index 0000000..90d358c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.MessagingUtil;
+
+import java.util.concurrent.ExecutorService;
+
+public class InitializerEventReceiver {
+    private static final Log log = LogFactory.getLog(InitializerEventReceiver.class);
+
+    private InitializerEventMessageDelegator messageDelegator;
+    private InitializerEventMessageListener messageListener;
+    private EventSubscriber eventSubscriber;
+    private ExecutorService executorService;
+
+    public InitializerEventReceiver() {
+        InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue();
+        this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue);
+        this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue);
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        messageDelegator.addEventListener(eventListener);
+    }
+
+    public void execute() {
+        try {
+            // Start topic subscriber thread
+            eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INITIALIZER_TOPIC.getTopicName(),
+                    messageListener);
+            executorService.execute(eventSubscriber);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Initializer event message delegator thread started");
+            }
+            // Start initializer event message delegator thread
+            executorService.execute(messageDelegator);
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Initializer receiver failed", e);
+            }
+        }
+    }
+
+    public void terminate() {
+        eventSubscriber.terminate();
+        messageDelegator.terminate();
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index 1d0529a..988a2ce 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -21,7 +21,10 @@ package org.apache.stratos.messaging.message.receiver.tenant;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
+import org.apache.stratos.messaging.event.initializer.CompleteTenantRequestEvent;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.MessagingUtil;
 
@@ -68,7 +71,7 @@ public class TenantEventReceiver {
                 log.debug("Tenant event message delegator thread started");
             }
 
-
+            initializeCompleteTenant();
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("Tenant receiver failed", e);
@@ -76,6 +79,25 @@ public class TenantEventReceiver {
         }
     }
 
+    public void initializeCompleteTenant() {
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                while (!eventSubscriber.isSubscribed()) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignore) {
+                    }
+                }
+
+                CompleteTenantRequestEvent completeTenantRequestEvent = new CompleteTenantRequestEvent();
+                String topic = MessagingUtil.getMessageTopicName(completeTenantRequestEvent);
+                EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
+                eventPublisher.publish(completeTenantRequestEvent);
+            }
+        });
+    }
+
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index 58f2c36..b841d0a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -21,7 +21,10 @@ package org.apache.stratos.messaging.message.receiver.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
+import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.MessagingUtil;
 
@@ -50,15 +53,12 @@ public class TopologyEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
-
     public void execute() {
         try {
             // Start topic subscriber thread
             eventSubscriber = new EventSubscriber(MessagingUtil.Topics.TOPOLOGY_TOPIC.getTopicName(), messageListener);
-            // subscriber.setMessageListener(messageListener);
             executorService.execute(eventSubscriber);
 
-
             if (log.isDebugEnabled()) {
                 log.debug("Topology event message receiver thread started");
             }
@@ -69,7 +69,7 @@ public class TopologyEventReceiver {
                 log.debug("Topology event message delegator thread started");
             }
 
-
+            initializeCompleteTopology();
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("Topology receiver failed", e);
@@ -82,6 +82,25 @@ public class TopologyEventReceiver {
         messageDelegator.terminate();
     }
 
+    public void initializeCompleteTopology() {
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                while (!eventSubscriber.isSubscribed()) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignore) {
+                    }
+                }
+
+                CompleteTopologyRequestEvent completeTopologyRequestEvent = new CompleteTopologyRequestEvent();
+                String topic = MessagingUtil.getMessageTopicName(completeTopologyRequestEvent);
+                EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
+                eventPublisher.publish(completeTopologyRequestEvent);
+            }
+        });
+    }
+
     public ExecutorService getExecutorService() {
         return executorService;
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java
index c4bfb9d..b3efbfb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java
@@ -63,6 +63,7 @@ public class MessagingUtil {
      */
     public static enum Topics {
         TOPOLOGY_TOPIC("topology/#"),
+        INITIALIZER_TOPIC("initializer/#"),
         HEALTH_STAT_TOPIC("summarized-health-stats/#"),
         INSTANCE_STATUS_TOPIC("instance/status/#"),
         INSTANCE_NOTIFIER_TOPIC("instance/notifier/#"),
@@ -248,4 +249,4 @@ public class MessagingUtil {
         return UUID.randomUUID().toString().replace(HYPHEN_MINUS, EMPTY_SPACE).substring(START_INDEX, len);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
index 54a7421..9c1159c 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
@@ -18,18 +18,15 @@
 
 import threading
 
-from subscriber import EventSubscriber
 import publisher
-from modules.event.instance.notifier.events import *
-from modules.event.tenant.events import *
-from modules.event.topology.events import *
+from logpublisher import *
 from modules.event.application.signup.events import *
 from modules.event.domain.mapping.events import *
-from entity import *
-from logpublisher import *
-from config import Config
 from modules.event.eventhandler import EventHandler
-import constants
+from modules.event.instance.notifier.events import *
+from modules.event.tenant.events import *
+from modules.event.topology.events import *
+from subscriber import EventSubscriber
 
 
 class CartridgeAgent(threading.Thread):
@@ -71,6 +68,9 @@ class CartridgeAgent(threading.Thread):
         else:
             self.__event_handler.create_dummy_interface()
 
+        # request complete topology event from CC by publishing CompleteTopologyRequestEvent
+        publisher.publish_complete_topology_request_event()
+
         # wait until complete topology message is received to get LB IP
         self.wait_for_complete_topology()
 
@@ -88,6 +88,9 @@ class CartridgeAgent(threading.Thread):
         # start application signup event listener
         self.register_application_signup_event_listeners()
 
+        # request complete tenant event from CC by publishing CompleteTenantRequestEvent
+        publisher.publish_complete_tenant_request_event()
+
         # Execute instance started shell script
         self.__event_handler.on_instance_started_event()
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
index 301cd47..8c7a7b0 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
@@ -82,6 +82,7 @@ TENANT_REPO_PATH = "tenant.repository.path"
 INSTANCE_NOTIFIER_TOPIC = "instance/#"
 HEALTH_STAT_TOPIC = "health/#"
 TOPOLOGY_TOPIC = "topology/#"
+INITIALIZER_TOPIC = "initializer/"
 TENANT_TOPIC = "tenant/#"
 INSTANCE_STATUS_TOPIC = "instance/status/"
 APPLICATION_SIGNUP = "application/signup/#"
@@ -98,6 +99,8 @@ INSTANCE_READY_TO_SHUTDOWN_EVENT = "InstanceReadyToShutdownEvent"
 INSTANCE_CLEANUP_CLUSTER_EVENT = "InstanceCleanupClusterEvent"
 INSTANCE_CLEANUP_MEMBER_EVENT = "InstanceCleanupMemberEvent"
 COMPLETE_TOPOLOGY_EVENT = "CompleteTopologyEvent"
+COMPLETE_TOPOLOGY_REQUEST_EVENT = "CompleteTopologyRequestEvent"
+COMPLETE_TENANT_REQUEST_EVENT = "CompleteTenantRequestEvent"
 COMPLETE_TENANT_EVENT = "CompleteTenantEvent"
 DOMAIN_MAPPING_ADDED_EVENT = "DomainMappingAddedEvent"
 DOMAIN_MAPPING_REMOVED_EVENT = "DomainMappingRemovedEvent"

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py
index db35987..f14e4af 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py
@@ -108,6 +108,22 @@ class InstanceReadyToShutdownEvent:
         return to_json(self)
 
 
+class CompleteTopologyRequestEvent:
+    def __init__(self):
+        pass
+
+    def to_json(self):
+        return to_json(self)
+
+
+class CompleteTenantRequestEvent:
+    def __init__(self):
+        pass
+
+    def to_json(self):
+        return to_json(self)
+
+
 def to_json(instance):
     """
     common function to serialize status event object
@@ -115,4 +131,4 @@ def to_json(instance):
     :return: serialized json string
     :rtype str
     """
-    return json.dumps(instance, default=lambda o: o.__dict__, sort_keys=True, indent=4)
\ No newline at end of file
+    return json.dumps(instance, default=lambda o: o.__dict__, sort_keys=True, indent=4)


[3/3] stratos git commit: Closing STRATOS-1544, STRATOS-1612, STRATOS-1611: topology, tenant, application model initialize optimization

Posted by ra...@apache.org.
Closing STRATOS-1544, STRATOS-1612, STRATOS-1611: topology, tenant, application model initialize optimization


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/60b80114
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/60b80114
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/60b80114

Branch: refs/heads/stratos-4.1.x
Commit: 60b801144dcb05e3664353c412bc1c3fffd7c55c
Parents: 00f624b
Author: Akila Perera <ra...@gmail.com>
Authored: Wed Nov 11 15:15:12 2015 +0530
Committer: Akila Perera <ra...@gmail.com>
Committed: Wed Nov 11 15:15:12 2015 +0530

----------------------------------------------------------------------
 .../ApplicationEventSynchronizer.java           |   9 +-
 .../applications/topic/ApplicationBuilder.java  |  13 --
 .../topic/ApplicationsEventPublisher.java       |  11 +-
 .../AutoscalerInitializerTopicReceiver.java     |  72 ++++++++++
 .../AutoscalerTopologyEventReceiver.java        |   5 +-
 .../internal/AutoscalerServiceComponent.java    |  81 +++++------
 .../CloudControllerServiceComponent.java        |  51 +++----
 .../publisher/TopologyEventPublisher.java       |  15 +-
 .../publisher/TopologyEventSynchronizer.java    |   8 +-
 .../initializer/InitializerTopicReceiver.java   |  72 ++++++++++
 .../messaging/topology/TopologyBuilder.java     | 144 +++++++++----------
 .../messaging/topology/TopologyHolder.java      | 118 +++++++++++++++
 .../messaging/topology/TopologyManager.java     | 118 ---------------
 .../impl/CloudControllerServiceImpl.java        |  12 +-
 .../StratosManagerServiceComponent.java         |  95 ++++++------
 .../synchronizer/TenantEventSynchronizer.java   |   6 +-
 .../StratosManagerInitializerTopicReceiver.java |  91 ++++++++++++
 .../CompleteApplicationSignUpsRequestEvent.java |  26 ++++
 .../CompleteApplicationsRequestEvent.java       |  26 ++++
 .../initializer/CompleteTenantRequestEvent.java |  26 ++++
 .../CompleteTopologyRequestEvent.java           |  27 ++++
 .../event/initializer/InitializerEvent.java     |  26 ++++
 ...eApplicationSignUpsRequestEventListener.java |  24 ++++
 ...ompleteApplicationsRequestEventListener.java |  24 ++++
 .../CompleteTenantRequestEventListener.java     |  24 ++++
 .../CompleteTopologyRequestEventListener.java   |  26 ++++
 .../CompleteApplicationsMessageProcessor.java   |   3 +
 ...plicationSignUpsRequestMessageProcessor.java |  54 +++++++
 ...leteApplicationsRequestMessageProcessor.java |  54 +++++++
 .../CompleteTenantRequestMessageProcessor.java  |  53 +++++++
 ...CompleteTopologyRequestMessageProcessor.java |  54 +++++++
 .../InitializerMessageProcessorChain.java       |  70 +++++++++
 .../application/ApplicationsEventReceiver.java  |  32 ++++-
 .../signup/ApplicationSignUpEventReceiver.java  |  30 +++-
 .../InitializerEventMessageDelegator.java       |  88 ++++++++++++
 .../InitializerEventMessageListener.java        |  48 +++++++
 .../InitializerEventMessageQueue.java           |  26 ++++
 .../initializer/InitializerEventReceiver.java   |  78 ++++++++++
 .../receiver/tenant/TenantEventReceiver.java    |  24 +++-
 .../topology/TopologyEventReceiver.java         |  27 +++-
 .../stratos/messaging/util/MessagingUtil.java   |   3 +-
 .../cartridge.agent/cartridge.agent/agent.py    |  19 +--
 .../cartridge.agent/constants.py                |   3 +
 .../modules/event/instance/status/events.py     |  18 ++-
 .../cartridge.agent/publisher.py                |  22 ++-
 .../integration/tests/AgentStartupTestCase.java |  83 +++++++----
 .../tests/PythonAgentIntegrationTest.java       |  56 ++++----
 .../integration/common/TopologyHandler.java     | 111 +++++++++++---
 .../src/test/resources/common/log4j.properties  |   2 +-
 49 files changed, 1648 insertions(+), 460 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
index fc7a528..562a6cb 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
@@ -21,7 +21,7 @@ package org.apache.stratos.autoscaler.applications;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder;
+import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
 
 public class ApplicationEventSynchronizer implements Runnable {
 
@@ -30,11 +30,8 @@ public class ApplicationEventSynchronizer implements Runnable {
     @Override
     public void run() {
         if (log.isDebugEnabled()) {
-            log.debug("Executing topology synchronization task");
-        }
-        // publish to the topic
-        if (ApplicationHolder.getApplications() != null) {
-            ApplicationBuilder.handleCompleteApplication(ApplicationHolder.getApplications());
+            log.debug("Executing applications synchronization task");
         }
+        ApplicationsEventPublisher.sendCompleteApplicationsEvent(ApplicationHolder.getApplications());
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
index 165c4f8..5fd4d5a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
@@ -57,19 +57,6 @@ import java.util.Set;
 public class ApplicationBuilder {
     private static final Log log = LogFactory.getLog(ApplicationBuilder.class);
 
-    public static synchronized void handleCompleteApplication(Applications applications) {
-        if (log.isDebugEnabled()) {
-            log.debug("Handling complete application event");
-        }
-
-        try {
-            ApplicationHolder.acquireReadLock();
-            ApplicationsEventPublisher.sendCompleteApplicationsEvent(applications);
-        } finally {
-            ApplicationHolder.releaseReadLock();
-        }
-    }
-
     /**
      * Create application clusters in cloud controller and send application created event.
      *

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
index f66e525..2ec6e78 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
@@ -20,6 +20,7 @@ package org.apache.stratos.autoscaler.applications.topic;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.ApplicationHolder;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.application.Application;
@@ -40,7 +41,15 @@ public class ApplicationsEventPublisher {
     private static final Log log = LogFactory.getLog(ApplicationsEventPublisher.class);
 
     public static void sendCompleteApplicationsEvent(Applications completeApplications) {
-        publishEvent(new CompleteApplicationsEvent(completeApplications));
+        ApplicationHolder.acquireReadLock();
+        try{
+            if (log.isDebugEnabled()) {
+                log.debug("Publishing complete applications event...");
+            }
+            publishEvent(new CompleteApplicationsEvent(completeApplications));
+        }finally {
+            ApplicationHolder.releaseReadLock();
+        }
     }
 
     public static void sendApplicationCreatedEvent(Application application) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
new file mode 100644
index 0000000..da6b270
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.event.receiver.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.ApplicationHolder;
+import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.listener.initializer.CompleteApplicationsRequestEventListener;
+import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
+
+import java.util.concurrent.ExecutorService;
+
+public class AutoscalerInitializerTopicReceiver {
+    private static final Log log = LogFactory.getLog(AutoscalerInitializerTopicReceiver.class);
+    private InitializerEventReceiver initializerEventReceiver;
+    private ExecutorService executorService;
+
+    public AutoscalerInitializerTopicReceiver() {
+        this.initializerEventReceiver = new InitializerEventReceiver();
+        addEventListeners();
+    }
+
+    public void execute() {
+        initializerEventReceiver.setExecutorService(executorService);
+        initializerEventReceiver.execute();
+        if (log.isInfoEnabled()) {
+            log.info("Cloud controller initializer topic receiver started");
+        }
+    }
+
+    private void addEventListeners() {
+        initializerEventReceiver.addEventListener(new CompleteApplicationsRequestEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Handling CompleteApplicationsRequestEvent");
+                }
+                try {
+                    ApplicationsEventPublisher.sendCompleteApplicationsEvent(ApplicationHolder.getApplications());
+                } catch (Exception e) {
+                    log.error("Failed to process CompleteApplicationsRequestEvent", e);
+                }
+            }
+        });
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index ef31b97..500b95a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -35,6 +35,8 @@ import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
 import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
 import org.apache.stratos.autoscaler.util.AutoscalerUtil;
 import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.application.Application;
 import org.apache.stratos.messaging.domain.application.Applications;
 import org.apache.stratos.messaging.domain.instance.ClusterInstance;
@@ -42,10 +44,12 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.ClusterStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent;
 import org.apache.stratos.messaging.event.topology.*;
 import org.apache.stratos.messaging.listener.topology.*;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.util.MessagingUtil;
 
 import java.util.concurrent.ExecutorService;
 
@@ -75,7 +79,6 @@ public class AutoscalerTopologyEventReceiver {
         if (log.isInfoEnabled()) {
             log.info("Autoscaler topology receiver thread started");
         }
-
     }
 
     private void addEventListeners() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index 48ee481..5011dd2 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -26,6 +26,7 @@ import org.apache.stratos.autoscaler.algorithms.networkpartition.NetworkPartitio
 import org.apache.stratos.autoscaler.applications.ApplicationEventSynchronizer;
 import org.apache.stratos.autoscaler.context.AutoscalerContext;
 import org.apache.stratos.autoscaler.event.receiver.health.AutoscalerHealthStatEventReceiver;
+import org.apache.stratos.autoscaler.event.receiver.initializer.AutoscalerInitializerTopicReceiver;
 import org.apache.stratos.autoscaler.event.receiver.topology.AutoscalerTopologyEventReceiver;
 import org.apache.stratos.autoscaler.exception.AutoScalerException;
 import org.apache.stratos.autoscaler.exception.AutoScalingPolicyAlreadyExistException;
@@ -44,7 +45,6 @@ import org.apache.stratos.autoscaler.util.AutoscalerUtil;
 import org.apache.stratos.autoscaler.util.ConfUtil;
 import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
 import org.apache.stratos.common.Component;
-import org.apache.stratos.common.services.ComponentActivationEventListener;
 import org.apache.stratos.common.services.ComponentStartUpSynchronizer;
 import org.apache.stratos.common.services.DistributedObjectProvider;
 import org.apache.stratos.common.threading.StratosThreadPool;
@@ -68,9 +68,11 @@ import java.util.concurrent.TimeUnit;
  * cardinality="1..1" policy="dynamic" bind="setTaskService" unbind="unsetTaskService"
  * @scr.reference name="hazelcast.instance.service" interface="com.hazelcast.core.HazelcastInstance"
  * cardinality="0..1"policy="dynamic" bind="setHazelcastInstance" unbind="unsetHazelcastInstance"
- * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.services.DistributedObjectProvider"
+ * @scr.reference name="distributedObjectProvider"
+ * interface="org.apache.stratos.common.services.DistributedObjectProvider"
  * cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
- * @scr.reference name="componentStartUpSynchronizer" interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
+ * @scr.reference name="componentStartUpSynchronizer"
+ * interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
  * cardinality="1..1" policy="dynamic" bind="setComponentStartUpSynchronizer" unbind="unsetComponentStartUpSynchronizer"
  * @scr.reference name="config.context.service" interface="org.wso2.carbon.utils.ConfigurationContextService"
  * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
@@ -81,6 +83,7 @@ public class AutoscalerServiceComponent {
     private static final String AUTOSCALER_COORDINATOR_LOCK = "AUTOSCALER_COORDINATOR_LOCK";
     private AutoscalerTopologyEventReceiver asTopologyReceiver;
     private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
+    private AutoscalerInitializerTopicReceiver autoscalerInitializerTopicReceiver;
     private ExecutorService executorService;
     private ScheduledExecutorService scheduler;
 
@@ -90,25 +93,25 @@ public class AutoscalerServiceComponent {
         }
         try {
             XMLConfiguration conf = ConfUtil.getInstance(AutoscalerConstants.COMPONENTS_CONFIG).getConfiguration();
-            int threadPoolSize = conf.getInt(AutoscalerConstants.THREAD_POOL_SIZE_KEY,
-                    AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE);
-            executorService = StratosThreadPool.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID,
-                    threadPoolSize);
+            int threadPoolSize = conf
+                    .getInt(AutoscalerConstants.THREAD_POOL_SIZE_KEY, AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE);
+            executorService = StratosThreadPool
+                    .getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID, threadPoolSize);
 
             int schedulerThreadPoolSize = conf.getInt(AutoscalerConstants.SCHEDULER_THREAD_POOL_SIZE_KEY,
                     AutoscalerConstants.AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE);
-            scheduler = StratosThreadPool.getScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID,
-                    schedulerThreadPoolSize);
+            scheduler = StratosThreadPool
+                    .getScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID, schedulerThreadPoolSize);
 
             Runnable autoscalerActivator = new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        ComponentStartUpSynchronizer componentStartUpSynchronizer =
-                                ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
+                        ComponentStartUpSynchronizer componentStartUpSynchronizer = ServiceReferenceHolder.getInstance()
+                                .getComponentStartUpSynchronizer();
                         // Wait for cloud controller component to be activated
-                        componentStartUpSynchronizer.waitForComponentActivation(Component.Autoscaler,
-                                Component.CloudController);
+                        componentStartUpSynchronizer
+                                .waitForComponentActivation(Component.Autoscaler, Component.CloudController);
 
                         ServiceReferenceHolder.getInstance().setExecutorService(executorService);
 
@@ -119,8 +122,8 @@ public class AutoscalerServiceComponent {
                                     ServiceReferenceHolder.getInstance().getHazelcastInstance()
                                             .getLock(AUTOSCALER_COORDINATOR_LOCK).lock();
 
-                                    log.info("Elected this member [" + ServiceReferenceHolder.getInstance().getHazelcastInstance()
-                                            .getCluster().getLocalMember().getUuid() + "] " +
+                                    log.info("Elected this member [" + ServiceReferenceHolder.getInstance()
+                                            .getHazelcastInstance().getCluster().getLocalMember().getUuid() + "] " +
                                             "as the autoscaler coordinator for the cluster");
 
                                     AutoscalerContext.getInstance().setCoordinator(true);
@@ -136,8 +139,8 @@ public class AutoscalerServiceComponent {
                         } else {
                             executeCoordinatorTasks();
                         }
-                        componentStartUpSynchronizer.waitForAxisServiceActivation(Component.Autoscaler,
-                                "AutoscalerService");
+                        componentStartUpSynchronizer
+                                .waitForAxisServiceActivation(Component.Autoscaler, "AutoscalerService");
                         componentStartUpSynchronizer.setComponentStatus(Component.Autoscaler, true);
                         if (log.isInfoEnabled()) {
                             log.info("Autoscaler service component activated");
@@ -154,14 +157,14 @@ public class AutoscalerServiceComponent {
         }
     }
 
-    private void executeCoordinatorTasks() throws InvalidPolicyException,
-            InvalidDeploymentPolicyException, InvalidApplicationPolicyException, AutoScalingPolicyAlreadyExistException {
+    private void executeCoordinatorTasks()
+            throws InvalidPolicyException, InvalidDeploymentPolicyException, InvalidApplicationPolicyException,
+                   AutoScalingPolicyAlreadyExistException {
 
         // Start topology receiver
         asTopologyReceiver = new AutoscalerTopologyEventReceiver();
         asTopologyReceiver.setExecutorService(executorService);
         asTopologyReceiver.execute();
-
         if (log.isDebugEnabled()) {
             log.debug("Topology receiver executor service started");
         }
@@ -174,6 +177,14 @@ public class AutoscalerServiceComponent {
             log.debug("Health statistics receiver thread started");
         }
 
+        // Start initializer receiver
+        autoscalerInitializerTopicReceiver = new AutoscalerInitializerTopicReceiver();
+        autoscalerInitializerTopicReceiver.setExecutorService(executorService);
+        autoscalerInitializerTopicReceiver.execute();
+        if (log.isDebugEnabled()) {
+            log.debug("Initializer receiver thread started");
+        }
+
         // Add AS policies to information model
         List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies();
         Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator();
@@ -191,7 +202,6 @@ public class AutoscalerServiceComponent {
             PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
         }
 
-
         // Add application policies to information model
         List<ApplicationPolicy> applicationPolicies = RegistryManager.getInstance().
                 retrieveApplicationPolicies();
@@ -202,9 +212,10 @@ public class AutoscalerServiceComponent {
         }
 
         // Add application policies to information model
-        List<NetworkPartitionAlgorithmContext> networkPartitionAlgorithmContexts =
-                RegistryManager.getInstance().retrieveNetworkPartitionAlgorithmContexts();
-        Iterator<NetworkPartitionAlgorithmContext> networkPartitionAlgoCtxtIterator = networkPartitionAlgorithmContexts.iterator();
+        List<NetworkPartitionAlgorithmContext> networkPartitionAlgorithmContexts = RegistryManager.getInstance()
+                .retrieveNetworkPartitionAlgorithmContexts();
+        Iterator<NetworkPartitionAlgorithmContext> networkPartitionAlgoCtxtIterator = networkPartitionAlgorithmContexts
+                .iterator();
         while (networkPartitionAlgoCtxtIterator.hasNext()) {
             NetworkPartitionAlgorithmContext algorithmContext = networkPartitionAlgoCtxtIterator.next();
             AutoscalerContext.getInstance().addNetworkPartitionAlgorithmContext(algorithmContext);
@@ -223,24 +234,6 @@ public class AutoscalerServiceComponent {
         if (log.isInfoEnabled()) {
             log.info("Scheduling tasks to publish applications");
         }
-
-        ComponentStartUpSynchronizer componentStartUpSynchronizer =
-                ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
-        if (componentStartUpSynchronizer.isEnabled()) {
-            componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() {
-                @Override
-                public void activated(Component component) {
-                    if (component == Component.StratosManager) {
-                        scheduleEventSynchronizers();
-                    }
-                }
-            });
-        } else {
-            scheduleEventSynchronizers();
-        }
-    }
-
-    private void scheduleEventSynchronizers() {
         Runnable applicationSynchronizer = new ApplicationEventSynchronizer();
         scheduler.scheduleAtFixedRate(applicationSynchronizer, 0, 1, TimeUnit.MINUTES);
     }
@@ -332,8 +325,8 @@ public class AutoscalerServiceComponent {
     }
 
     protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) {
-        ServiceReferenceHolder.getInstance().setAxisConfiguration(
-                cfgCtxService.getServerConfigContext().getAxisConfiguration());
+        ServiceReferenceHolder.getInstance()
+                .setAxisConfiguration(cfgCtxService.getServerConfigContext().getAxisConfiguration());
     }
 
     protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 2e21e8e..808ac5c 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -29,11 +29,11 @@ import org.apache.stratos.cloud.controller.exception.CloudControllerException;
 import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventSynchronizer;
 import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationEventReceiver;
 import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
+import org.apache.stratos.cloud.controller.messaging.receiver.initializer.InitializerTopicReceiver;
 import org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver;
 import org.apache.stratos.cloud.controller.services.CloudControllerService;
 import org.apache.stratos.cloud.controller.services.impl.CloudControllerServiceImpl;
 import org.apache.stratos.common.Component;
-import org.apache.stratos.common.services.ComponentActivationEventListener;
 import org.apache.stratos.common.services.ComponentStartUpSynchronizer;
 import org.apache.stratos.common.services.DistributedObjectProvider;
 import org.apache.stratos.common.threading.StratosThreadPool;
@@ -57,9 +57,11 @@ import java.util.concurrent.TimeUnit;
  * @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
  * @scr.reference name="hazelcast.instance.service" interface="com.hazelcast.core.HazelcastInstance"
  * cardinality="0..1"policy="dynamic" bind="setHazelcastInstance" unbind="unsetHazelcastInstance"
- * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.services.DistributedObjectProvider"
+ * @scr.reference name="distributedObjectProvider"
+ * interface="org.apache.stratos.common.services.DistributedObjectProvider"
  * cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
- * @scr.reference name="componentStartUpSynchronizer" interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
+ * @scr.reference name="componentStartUpSynchronizer"
+ * interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer"
  * cardinality="1..1" policy="dynamic" bind="setComponentStartUpSynchronizer" unbind="unsetComponentStartUpSynchronizer"
  * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
  * cardinality="1..1" policy="dynamic" bind="setTaskService" unbind="unsetTaskService"
@@ -79,6 +81,7 @@ public class CloudControllerServiceComponent {
     private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
     private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
     private ApplicationEventReceiver applicationEventReceiver;
+    private InitializerTopicReceiver initializerTopicReceiver;
     private ExecutorService executorService;
     private ScheduledExecutorService scheduler;
 
@@ -88,15 +91,15 @@ public class CloudControllerServiceComponent {
         }
         try {
             executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE);
-            scheduler = StratosThreadPool.getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID,
-                    SCHEDULER_THREAD_POOL_SIZE);
+            scheduler = StratosThreadPool
+                    .getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, SCHEDULER_THREAD_POOL_SIZE);
 
             Runnable cloudControllerActivator = new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        ComponentStartUpSynchronizer componentStartUpSynchronizer =
-                                ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
+                        ComponentStartUpSynchronizer componentStartUpSynchronizer = ServiceReferenceHolder.getInstance()
+                                .getComponentStartUpSynchronizer();
 
                         // Register cloud controller service
                         BundleContext bundleContext = context.getBundleContext();
@@ -125,8 +128,8 @@ public class CloudControllerServiceComponent {
                             executeCoordinatorTasks();
                         }
 
-                        componentStartUpSynchronizer.waitForAxisServiceActivation(Component.CloudController,
-                                "CloudControllerService");
+                        componentStartUpSynchronizer
+                                .waitForAxisServiceActivation(Component.CloudController, "CloudControllerService");
                         componentStartUpSynchronizer.setComponentStatus(Component.CloudController, true);
                         log.info("Cloud controller service component activated");
                     } catch (Exception e) {
@@ -166,27 +169,17 @@ public class CloudControllerServiceComponent {
             log.info("Instance status event receiver thread started");
         }
 
+        initializerTopicReceiver = new InitializerTopicReceiver();
+        initializerTopicReceiver.setExecutorService(executorService);
+        initializerTopicReceiver.execute();
+
         if (log.isInfoEnabled()) {
-            log.info("Scheduling topology synchronizer task");
+            log.info("Initializer event receiver thread started");
         }
 
-        ComponentStartUpSynchronizer componentStartUpSynchronizer =
-                ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
-        if (componentStartUpSynchronizer.isEnabled()) {
-            componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() {
-                @Override
-                public void activated(Component component) {
-                    if (component == Component.StratosManager) {
-                        scheduleEventSynchronizers();
-                    }
-                }
-            });
-        } else {
-            scheduleEventSynchronizers();
+        if (log.isInfoEnabled()) {
+            log.info("Scheduling topology synchronizer task");
         }
-    }
-
-    private void scheduleEventSynchronizers() {
         Runnable topologySynchronizer = new TopologyEventSynchronizer();
         scheduler.scheduleAtFixedRate(topologySynchronizer, 0, 1, TimeUnit.MINUTES);
     }
@@ -228,8 +221,8 @@ public class CloudControllerServiceComponent {
     }
 
     protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) {
-        ServiceReferenceHolder.getInstance().setAxisConfiguration(
-                cfgCtxService.getServerConfigContext().getAxisConfiguration());
+        ServiceReferenceHolder.getInstance()
+                .setAxisConfiguration(cfgCtxService.getServerConfigContext().getAxisConfiguration());
     }
 
     protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
@@ -296,4 +289,4 @@ public class CloudControllerServiceComponent {
             log.warn("An error occurred while shutting down executor service", e);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java
index b55d3a2..12f7685 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java
@@ -25,6 +25,7 @@ import org.apache.stratos.cloud.controller.domain.Cartridge;
 import org.apache.stratos.cloud.controller.domain.ClusterContext;
 import org.apache.stratos.cloud.controller.domain.MemberContext;
 import org.apache.stratos.cloud.controller.domain.PortMapping;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
@@ -314,12 +315,16 @@ public class TopologyEventPublisher {
     }
 
     public static void sendCompleteTopologyEvent(Topology topology) {
-        CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
-
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Publishing complete topology event"));
+        TopologyHolder.acquireReadLock();
+        try {
+            CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
+            if (log.isDebugEnabled()) {
+                log.debug("Publishing complete topology event...");
+            }
+            publishEvent(completeTopologyEvent);
+        } finally {
+            TopologyHolder.releaseReadLock();
         }
-        publishEvent(completeTopologyEvent);
     }
 
     public static void sendClusterTerminatingEvent(ClusterInstanceTerminatingEvent clusterTerminatingEvent) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
index 832cae9..fcfd965 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
 import org.apache.stratos.cloud.controller.context.CloudControllerContext;
-import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder;
 
 /**
  * Topology event synchronizer publishes complete topology event periodically.
@@ -53,10 +53,8 @@ public class TopologyEventSynchronizer implements Runnable {
 
         try {
             // Publish complete topology event
-            if (TopologyManager.getTopology() != null) {
-                CloudControllerContext.getInstance().setTopologySyncRunning(true);
-                TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology());
-            }
+            CloudControllerContext.getInstance().setTopologySyncRunning(true);
+            TopologyEventPublisher.sendCompleteTopologyEvent(TopologyHolder.getTopology());
         } finally {
             CloudControllerContext.getInstance().setTopologySyncRunning(false);
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
new file mode 100644
index 0000000..0f8538c
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.messaging.receiver.initializer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPublisher;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.listener.initializer.CompleteTopologyRequestEventListener;
+import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
+
+import java.util.concurrent.ExecutorService;
+
+public class InitializerTopicReceiver {
+    private static final Log log = LogFactory.getLog(InitializerTopicReceiver.class);
+    private InitializerEventReceiver initializerEventReceiver;
+    private ExecutorService executorService;
+
+    public InitializerTopicReceiver() {
+        this.initializerEventReceiver = new InitializerEventReceiver();
+        addEventListeners();
+    }
+
+    public void execute() {
+        initializerEventReceiver.setExecutorService(executorService);
+        initializerEventReceiver.execute();
+        if (log.isInfoEnabled()) {
+            log.info("Cloud controller initializer topic receiver started");
+        }
+    }
+
+    private void addEventListeners() {
+        initializerEventReceiver.addEventListener(new CompleteTopologyRequestEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Handling CompleteTopologyRequestEvent");
+                }
+                try {
+                    TopologyEventPublisher.sendCompleteTopologyEvent(TopologyHolder.getTopology());
+                } catch (Exception e) {
+                    log.error("Failed to process CompleteTopologyRequestEvent", e);
+                }
+            }
+        });
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index 09670e0..da38337 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -60,12 +60,12 @@ public class TopologyBuilder {
 
     public static void handleServiceCreated(List<Cartridge> cartridgeList) throws RegistryException {
         Service service;
-        Topology topology = TopologyManager.getTopology();
+        Topology topology = TopologyHolder.getTopology();
         if (cartridgeList == null) {
             throw new RuntimeException("Cartridge list is empty");
         }
         try {
-            TopologyManager.acquireWriteLock();
+            TopologyHolder.acquireWriteLock();
             for (Cartridge cartridge : cartridgeList) {
                 if (!topology.serviceExists(cartridge.getType())) {
                     ServiceType serviceType = cartridge.isMultiTenant() ?
@@ -104,17 +104,17 @@ public class TopologyBuilder {
                         }
                     }
                     topology.addService(service);
-                    TopologyManager.updateTopology(topology);
+                    TopologyHolder.updateTopology(topology);
                 }
             }
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
         TopologyEventPublisher.sendServiceCreateEvent(cartridgeList);
     }
 
     public static void handleServiceRemoved(List<Cartridge> cartridgeList) throws RegistryException {
-        Topology topology = TopologyManager.getTopology();
+        Topology topology = TopologyHolder.getTopology();
         for (Cartridge cartridge : cartridgeList) {
             Service service = topology.getService(cartridge.getType());
             if (service == null) {
@@ -122,11 +122,11 @@ public class TopologyBuilder {
             }
             if (service.getClusters().size() == 0) {
                 try {
-                    TopologyManager.acquireWriteLock();
+                    TopologyHolder.acquireWriteLock();
                     topology.removeService(cartridge.getType());
-                    TopologyManager.updateTopology(topology);
+                    TopologyHolder.updateTopology(topology);
                 } finally {
-                    TopologyManager.releaseWriteLock();
+                    TopologyHolder.releaseWriteLock();
                 }
                 TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList);
             } else {
@@ -138,9 +138,9 @@ public class TopologyBuilder {
 
     public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters)
             throws RegistryException {
-        TopologyManager.acquireWriteLock();
+        TopologyHolder.acquireWriteLock();
         try {
-            Topology topology = TopologyManager.getTopology();
+            Topology topology = TopologyHolder.getTopology();
             for (Cluster cluster : appClusters) {
                 Service service = topology.getService(cluster.getServiceName());
                 if (service == null) {
@@ -150,9 +150,9 @@ public class TopologyBuilder {
                 service.addCluster(cluster);
                 log.info("Cluster created: [cluster] " + cluster.getClusterId());
             }
-            TopologyManager.updateTopology(topology);
+            TopologyHolder.updateTopology(topology);
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
 
         log.debug("Creating cluster port mappings: [application-id] " + appId);
@@ -184,10 +184,10 @@ public class TopologyBuilder {
 
     public static void handleApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusterData)
             throws RegistryException {
-        TopologyManager.acquireWriteLock();
+        TopologyHolder.acquireWriteLock();
         CloudControllerContext context = CloudControllerContext.getInstance();
         try {
-            Topology topology = TopologyManager.getTopology();
+            Topology topology = TopologyHolder.getTopology();
 
             if (clusterData != null) {
                 // remove clusters from CC topology model and remove runtime information
@@ -208,9 +208,9 @@ public class TopologyBuilder {
             } else {
                 log.info("No cluster data found for application " + appId + " to remove");
             }
-            TopologyManager.updateTopology(topology);
+            TopologyHolder.updateTopology(topology);
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
 
         // Remove cluster port mappings of application
@@ -220,9 +220,9 @@ public class TopologyBuilder {
     }
 
     public static void handleClusterReset(ClusterStatusClusterResetEvent event) throws RegistryException {
-        TopologyManager.acquireWriteLock();
+        TopologyHolder.acquireWriteLock();
         try {
-            Topology topology = TopologyManager.getTopology();
+            Topology topology = TopologyHolder.getTopology();
             Service service = topology.getService(event.getServiceName());
             if (service == null) {
                 throw new RuntimeException("Service " + event.getServiceName() +
@@ -246,7 +246,7 @@ public class TopologyBuilder {
             if (context.isStateTransitionValid(status)) {
                 context.setStatus(status);
                 log.info("Cluster Created adding status started for" + cluster.getClusterId());
-                TopologyManager.updateTopology(topology);
+                TopologyHolder.updateTopology(topology);
                 //publishing data
                 TopologyEventPublisher
                         .sendClusterResetEvent(event.getAppId(), event.getServiceName(), event.getClusterId(),
@@ -258,16 +258,16 @@ public class TopologyBuilder {
             }
 
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
 
     }
 
     public static void handleClusterInstanceCreated(String serviceType, String clusterId, String alias,
             String instanceId, String partitionId, String networkPartitionId) throws RegistryException {
-        TopologyManager.acquireWriteLock();
+        TopologyHolder.acquireWriteLock();
         try {
-            Topology topology = TopologyManager.getTopology();
+            Topology topology = TopologyHolder.getTopology();
             Service service = topology.getService(serviceType);
             if (service == null) {
                 throw new RuntimeException("Service " + serviceType +
@@ -286,18 +286,18 @@ public class TopologyBuilder {
             clusterInstance.setNetworkPartitionId(networkPartitionId);
             clusterInstance.setPartitionId(partitionId);
             cluster.addInstanceContext(instanceId, clusterInstance);
-            TopologyManager.updateTopology(topology);
+            TopologyHolder.updateTopology(topology);
             ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = new ClusterInstanceCreatedEvent(serviceType,
                     clusterId, clusterInstance);
             clusterInstanceCreatedEvent.setPartitionId(partitionId);
             TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
     }
 
     public static void handleClusterRemoved(ClusterContext ctxt) throws RegistryException {
-        Topology topology = TopologyManager.getTopology();
+        Topology topology = TopologyHolder.getTopology();
         Service service = topology.getService(ctxt.getCartridgeType());
         String deploymentPolicy;
         if (service == null) {
@@ -308,12 +308,12 @@ public class TopologyBuilder {
                     ctxt.getCartridgeType()));
         }
         try {
-            TopologyManager.acquireWriteLock();
+            TopologyHolder.acquireWriteLock();
             Cluster cluster = service.removeCluster(ctxt.getClusterId());
             deploymentPolicy = cluster.getDeploymentPolicyName();
-            TopologyManager.updateTopology(topology);
+            TopologyHolder.updateTopology(topology);
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
         TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
     }
@@ -324,7 +324,7 @@ public class TopologyBuilder {
      * @param memberContext
      */
     public static void handleMemberCreatedEvent(MemberContext memberContext) throws RegistryException {
-        Topology topology = TopologyManager.getTopology();
+        Topology topology = TopologyHolder.getTopology();
         Service service = topology.getService(memberContext.getCartridgeType());
         String clusterId = memberContext.getClusterId();
         Cluster cluster = service.getCluster(clusterId);
@@ -340,14 +340,14 @@ public class TopologyBuilder {
             throw new RuntimeException(String.format("Member %s already exists", memberId));
         }
         try {
-            TopologyManager.acquireWriteLock();
+            TopologyHolder.acquireWriteLock();
             Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId,
                     networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime);
             member.setStatus(MemberStatus.Created);
             member.setLbClusterId(lbClusterId);
             member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
             cluster.addMember(member);
-            TopologyManager.updateTopology(topology);
+            TopologyHolder.updateTopology(topology);
 
             //member created time
             Long timestamp = System.currentTimeMillis();
@@ -368,7 +368,7 @@ public class TopologyBuilder {
             }
 
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
         TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
     }
@@ -379,7 +379,7 @@ public class TopologyBuilder {
      * @param memberContext
      */
     public static void handleMemberInitializedEvent(MemberContext memberContext) throws RegistryException {
-        Topology topology = TopologyManager.getTopology();
+        Topology topology = TopologyHolder.getTopology();
         Service service = topology.getService(memberContext.getCartridgeType());
         if (service == null) {
             throw new RuntimeException(String.format("Service %s does not exist", memberContext.getCartridgeType()));
@@ -397,7 +397,7 @@ public class TopologyBuilder {
             throw new RuntimeException(String.format("Member %s does not exist", memberContext.getMemberId()));
         }
         try {
-            TopologyManager.acquireWriteLock();
+            TopologyHolder.acquireWriteLock();
 
             // Set instance id returned by the IaaS
             member.setInstanceId(memberContext.getInstanceId());
@@ -430,7 +430,7 @@ public class TopologyBuilder {
                 member.setStatus(MemberStatus.Initialized);
                 log.info("Member status updated to initialized");
 
-                TopologyManager.updateTopology(topology);
+                TopologyHolder.updateTopology(topology);
                 //member intialized time
                 Long timestamp = System.currentTimeMillis();
                 TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
@@ -465,7 +465,7 @@ public class TopologyBuilder {
                 }
             }
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
     }
 
@@ -482,7 +482,7 @@ public class TopologyBuilder {
 
     public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) {
         try {
-            Topology topology = TopologyManager.getTopology();
+            Topology topology = TopologyHolder.getTopology();
             Service service = topology.getService(instanceStartedEvent.getServiceName());
             if (service == null) {
                 throw new RuntimeException(
@@ -504,7 +504,7 @@ public class TopologyBuilder {
             }
 
             try {
-                TopologyManager.acquireWriteLock();
+                TopologyHolder.acquireWriteLock();
                 // try update lifecycle state
                 if (!member.isStateTransitionValid(MemberStatus.Starting)) {
                     log.error("Invalid State Transition from " + member.getStatus() + " to " +
@@ -513,7 +513,7 @@ public class TopologyBuilder {
                     member.setStatus(MemberStatus.Starting);
                     log.info("member started event adding status started");
 
-                    TopologyManager.updateTopology(topology);
+                    TopologyHolder.updateTopology(topology);
                     //member started time
                     Long timestamp = System.currentTimeMillis();
                     //memberStartedEvent.
@@ -538,7 +538,7 @@ public class TopologyBuilder {
                     }
                 }
             } finally {
-                TopologyManager.releaseWriteLock();
+                TopologyHolder.releaseWriteLock();
             }
         } catch (Exception e) {
             String message = String.format("Could not handle member started event: [application-id] %s "
@@ -549,7 +549,7 @@ public class TopologyBuilder {
     }
 
     public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) throws RegistryException {
-        Topology topology = TopologyManager.getTopology();
+        Topology topology = TopologyHolder.getTopology();
         Service service = topology.getService(instanceActivatedEvent.getServiceName());
         if (service == null) {
             throw new RuntimeException(
@@ -579,7 +579,7 @@ public class TopologyBuilder {
         //TODO
         memberActivatedEvent.setApplicationId(null);
         try {
-            TopologyManager.acquireWriteLock();
+            TopologyHolder.acquireWriteLock();
             // try update lifecycle state
             if (!member.isStateTransitionValid(MemberStatus.Active)) {
                 log.error("Invalid state transition from [" + member.getStatus() + "] to [" +
@@ -624,7 +624,7 @@ public class TopologyBuilder {
                 memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
                 memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
                 memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
-                TopologyManager.updateTopology(topology);
+                TopologyHolder.updateTopology(topology);
 
                 //member activated time
                 Long timestamp = System.currentTimeMillis();
@@ -648,13 +648,13 @@ public class TopologyBuilder {
                 }
             }
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
     }
 
     public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent)
             throws InvalidMemberException, InvalidCartridgeTypeException, RegistryException {
-        Topology topology = TopologyManager.getTopology();
+        Topology topology = TopologyHolder.getTopology();
         Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName());
 
         //update the status of the member
@@ -684,7 +684,7 @@ public class TopologyBuilder {
         //member ReadyToShutDown state change time
         Long timestamp = null;
         try {
-            TopologyManager.acquireWriteLock();
+            TopologyHolder.acquireWriteLock();
 
             if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
                 throw new RuntimeException("Invalid State Transition from " + member.getStatus() + " to " +
@@ -693,10 +693,10 @@ public class TopologyBuilder {
             member.setStatus(MemberStatus.ReadyToShutDown);
             log.info("Member Ready to shut down event adding status started");
 
-            TopologyManager.updateTopology(topology);
+            TopologyHolder.updateTopology(topology);
             timestamp = System.currentTimeMillis();
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
         TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
         //publishing member status to DAS.
@@ -721,7 +721,7 @@ public class TopologyBuilder {
 
     public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent)
             throws InvalidMemberException, InvalidCartridgeTypeException, RegistryException {
-        Topology topology = TopologyManager.getTopology();
+        Topology topology = TopologyHolder.getTopology();
         Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName());
         //update the status of the member
         if (service == null) {
@@ -746,7 +746,7 @@ public class TopologyBuilder {
                 instanceMaintenanceModeEvent.getClusterInstanceId(), instanceMaintenanceModeEvent.getMemberId(),
                 instanceMaintenanceModeEvent.getNetworkPartitionId(), instanceMaintenanceModeEvent.getPartitionId());
         try {
-            TopologyManager.acquireWriteLock();
+            TopologyHolder.acquireWriteLock();
             // try update lifecycle state
             if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
                 throw new RuntimeException(
@@ -755,9 +755,9 @@ public class TopologyBuilder {
             member.setStatus(MemberStatus.In_Maintenance);
             log.info("member maintenance mode event adding status started");
 
-            TopologyManager.updateTopology(topology);
+            TopologyHolder.updateTopology(topology);
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
         //publishing data
         TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
@@ -775,7 +775,7 @@ public class TopologyBuilder {
      */
     public static void handleMemberTerminated(String serviceName, String clusterId, String networkPartitionId,
             String partitionId, String memberId) throws RegistryException {
-        Topology topology = TopologyManager.getTopology();
+        Topology topology = TopologyHolder.getTopology();
         Service service = topology.getService(serviceName);
         Properties properties;
         if (service == null) {
@@ -799,12 +799,12 @@ public class TopologyBuilder {
         //member terminated time
         Long timestamp = null;
         try {
-            TopologyManager.acquireWriteLock();
+            TopologyHolder.acquireWriteLock();
             properties = member.getProperties();
             cluster.removeMember(member);
-            TopologyManager.updateTopology(topology);
+            TopologyHolder.updateTopology(topology);
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
             timestamp = System.currentTimeMillis();
         }
         /* @TODO leftover from grouping_poc*/
@@ -831,7 +831,7 @@ public class TopologyBuilder {
     public static void handleClusterActivatedEvent(
             ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) throws RegistryException {
 
-        Topology topology = TopologyManager.getTopology();
+        Topology topology = TopologyHolder.getTopology();
         Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
         //update the status of the cluster
         if (service == null) {
@@ -857,7 +857,7 @@ public class TopologyBuilder {
                 clusterStatusClusterActivatedEvent.getAppId(), clusterStatusClusterActivatedEvent.getServiceName(),
                 clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId());
         try {
-            TopologyManager.acquireWriteLock();
+            TopologyHolder.acquireWriteLock();
 
             Collection<KubernetesService> kubernetesServices = clusterContext
                     .getKubernetesServices(clusterStatusClusterActivatedEvent.getInstanceId());
@@ -932,7 +932,7 @@ public class TopologyBuilder {
             if (context.isStateTransitionValid(status)) {
                 context.setStatus(status);
                 log.info("Cluster activated adding status started for " + cluster.getClusterId());
-                TopologyManager.updateTopology(topology);
+                TopologyHolder.updateTopology(topology);
                 // publish event
                 TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
             } else {
@@ -942,7 +942,7 @@ public class TopologyBuilder {
                         clusterStatusClusterActivatedEvent.getInstanceId(), context.getStatus(), status));
             }
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
 
     }
@@ -961,7 +961,7 @@ public class TopologyBuilder {
 
     public static void handleClusterInactivateEvent(ClusterStatusClusterInactivateEvent clusterInactivateEvent)
             throws RegistryException {
-        Topology topology = TopologyManager.getTopology();
+        Topology topology = TopologyHolder.getTopology();
         Service service = topology.getService(clusterInactivateEvent.getServiceName());
         //update the status of the cluster
         if (service == null) {
@@ -980,7 +980,7 @@ public class TopologyBuilder {
                 clusterInactivateEvent.getAppId(), clusterInactivateEvent.getServiceName(),
                 clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId());
         try {
-            TopologyManager.acquireWriteLock();
+            TopologyHolder.acquireWriteLock();
             ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
             if (context == null) {
                 throw new RuntimeException("Cluster Instance Context is not found for [cluster] " +
@@ -991,7 +991,7 @@ public class TopologyBuilder {
             if (context.isStateTransitionValid(status)) {
                 context.setStatus(status);
                 log.info("Cluster Inactive adding status started for" + cluster.getClusterId());
-                TopologyManager.updateTopology(topology);
+                TopologyHolder.updateTopology(topology);
                 //publishing data
                 TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
             } else {
@@ -1001,15 +1001,15 @@ public class TopologyBuilder {
                         context.getStatus(), status));
             }
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
     }
 
     public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event)
             throws RegistryException {
-        TopologyManager.acquireWriteLock();
+        TopologyHolder.acquireWriteLock();
         try {
-            Topology topology = TopologyManager.getTopology();
+            Topology topology = TopologyHolder.getTopology();
             Service service = topology.getService(event.getServiceName());
 
             //update the status of the cluster
@@ -1036,7 +1036,7 @@ public class TopologyBuilder {
                 log.info("Cluster Terminated adding status started for and removing the cluster instance" + cluster
                         .getClusterId());
                 cluster.removeInstanceContext(event.getInstanceId());
-                TopologyManager.updateTopology(topology);
+                TopologyHolder.updateTopology(topology);
                 //publishing data
                 ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(
                         event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId());
@@ -1048,7 +1048,7 @@ public class TopologyBuilder {
                         event.getInstanceId(), context.getStatus(), status));
             }
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
 
     }
@@ -1056,10 +1056,10 @@ public class TopologyBuilder {
     public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event)
             throws RegistryException {
 
-        TopologyManager.acquireWriteLock();
+        TopologyHolder.acquireWriteLock();
 
         try {
-            Topology topology = TopologyManager.getTopology();
+            Topology topology = TopologyHolder.getTopology();
             Cluster cluster = topology.getService(event.getServiceName()).
                     getCluster(event.getClusterId());
 
@@ -1077,7 +1077,7 @@ public class TopologyBuilder {
             if (context.isStateTransitionValid(status)) {
                 context.setStatus(status);
                 log.info("Cluster Terminating started for " + cluster.getClusterId());
-                TopologyManager.updateTopology(topology);
+                TopologyHolder.updateTopology(topology);
                 //publishing data
                 ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(
                         event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId());
@@ -1096,7 +1096,7 @@ public class TopologyBuilder {
                         event.getInstanceId(), context.getStatus(), status));
             }
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java
new file mode 100644
index 0000000..d183ca0
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.messaging.topology;
+
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.registry.RegistryManager;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
+import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
+import org.apache.stratos.common.concurrent.locks.ReadWriteLock;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+
+/**
+ * Persistence and retrieval of Topology from Registry
+ */
+public class TopologyHolder {
+    private static final Log log = LogFactory.getLog(TopologyHolder.class);
+
+    private static volatile ReadWriteLock lock = new ReadWriteLock("topology-manager");
+    private static volatile Topology topology;
+
+    private TopologyHolder() {
+    }
+
+    public static void acquireReadLock() {
+        lock.acquireReadLock();
+        if (log.isDebugEnabled()) {
+            log.debug("Read lock acquired");
+        }
+    }
+
+    public static void releaseReadLock() {
+        lock.releaseReadLock();
+        if (log.isDebugEnabled()) {
+            log.debug("Read lock released");
+        }
+    }
+
+    public static void acquireWriteLock() {
+        lock.acquireWriteLock();
+        if (log.isDebugEnabled()) {
+            log.debug("Write lock acquired");
+        }
+    }
+
+    public static void releaseWriteLock() {
+        lock.releaseWriteLock();
+        if (log.isDebugEnabled()) {
+            log.debug("Write lock released");
+        }
+    }
+
+    public static Topology getTopology() {
+        if (topology == null) {
+            synchronized (TopologyHolder.class) {
+                if (topology == null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Trying to retrieve topology from registry");
+                    }
+                    topology = CloudControllerUtil.retrieveTopology();
+                    if (topology == null) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Topology not found in registry, creating new");
+                        }
+                        topology = new Topology();
+                    }
+                    if (log.isDebugEnabled()) {
+                        log.debug("Topology initialized");
+                    }
+                }
+            }
+        }
+        return topology;
+    }
+
+    /**
+     * Update in-memory topology and persist it in registry.
+     *
+     * @param updatedTopology
+     */
+    public static void updateTopology(Topology updatedTopology) throws RegistryException {
+        synchronized (TopologyHolder.class) {
+            if (log.isDebugEnabled()) {
+                log.debug("Updating topology");
+            }
+            topology = updatedTopology;
+            RegistryManager.getInstance().persist(CloudControllerConstants.TOPOLOGY_RESOURCE, topology);
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Topology updated: %s", toJson(topology)));
+            }
+        }
+
+    }
+
+    private static String toJson(Object object) {
+        Gson gson = new Gson();
+        return gson.toJson(object);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java
deleted file mode 100644
index f6f6036..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.messaging.topology;
-
-import com.google.gson.Gson;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.registry.RegistryManager;
-import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
-import org.apache.stratos.common.concurrent.locks.ReadWriteLock;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.wso2.carbon.registry.core.exceptions.RegistryException;
-
-/**
- * Persistence and retrieval of Topology from Registry
- */
-public class TopologyManager {
-    private static final Log log = LogFactory.getLog(TopologyManager.class);
-
-    private static volatile ReadWriteLock lock = new ReadWriteLock("topology-manager");
-    private static volatile Topology topology;
-
-    private TopologyManager() {
-    }
-
-    public static void acquireReadLock() {
-        lock.acquireReadLock();
-        if (log.isDebugEnabled()) {
-            log.debug("Read lock acquired");
-        }
-    }
-
-    public static void releaseReadLock() {
-        lock.releaseReadLock();
-        if (log.isDebugEnabled()) {
-            log.debug("Read lock released");
-        }
-    }
-
-    public static void acquireWriteLock() {
-        lock.acquireWriteLock();
-        if (log.isDebugEnabled()) {
-            log.debug("Write lock acquired");
-        }
-    }
-
-    public static void releaseWriteLock() {
-        lock.releaseWriteLock();
-        if (log.isDebugEnabled()) {
-            log.debug("Write lock released");
-        }
-    }
-
-    public static Topology getTopology() {
-        if (topology == null) {
-            synchronized (TopologyManager.class) {
-                if (topology == null) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Trying to retrieve topology from registry");
-                    }
-                    topology = CloudControllerUtil.retrieveTopology();
-                    if (topology == null) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Topology not found in registry, creating new");
-                        }
-                        topology = new Topology();
-                    }
-                    if (log.isDebugEnabled()) {
-                        log.debug("Topology initialized");
-                    }
-                }
-            }
-        }
-        return topology;
-    }
-
-    /**
-     * Update in-memory topology and persist it in registry.
-     *
-     * @param updatedTopology
-     */
-    public static void updateTopology(Topology updatedTopology) throws RegistryException {
-        synchronized (TopologyManager.class) {
-            if (log.isDebugEnabled()) {
-                log.debug("Updating topology");
-            }
-            topology = updatedTopology;
-            RegistryManager.getInstance().persist(CloudControllerConstants.TOPOLOGY_RESOURCE, topology);
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Topology updated: %s", toJson(topology)));
-            }
-        }
-
-    }
-
-    private static String toJson(Object object) {
-        Gson gson = new Gson();
-        return gson.toJson(object);
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 582e78f..d3fa92d 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -30,7 +30,7 @@ import org.apache.stratos.cloud.controller.domain.kubernetes.KubernetesMaster;
 import org.apache.stratos.cloud.controller.exception.*;
 import org.apache.stratos.cloud.controller.iaases.Iaas;
 import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
-import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder;
 import org.apache.stratos.cloud.controller.services.CloudControllerService;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
@@ -640,8 +640,8 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             }
 
             // check if status == active, if true, then this is a termination on member faulty
-            TopologyManager.acquireWriteLock();
-            Topology topology = TopologyManager.getTopology();
+            TopologyHolder.acquireWriteLock();
+            Topology topology = TopologyHolder.getTopology();
             org.apache.stratos.messaging.domain.topology.Service service = topology
                     .getService(memberContext.getCartridgeType());
 
@@ -679,7 +679,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             log.error(message, e);
             throw new CloudControllerException(message, e);
         } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyHolder.releaseWriteLock();
         }
         return true;
     }
@@ -826,7 +826,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
                     log.error(msg);
                     return;
                 }
-                Collection<Member> members = TopologyManager.getTopology().
+                Collection<Member> members = TopologyHolder.getTopology().
                         getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers();
                 //finding the responding members from the existing members in the topology.
                 int sizeOfRespondingMembers = 0;
@@ -872,7 +872,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
                         log.error(msg);
                         return;
                     }
-                    Collection<Member> members = TopologyManager.getTopology().
+                    Collection<Member> members = TopologyHolder.getTopology().
                             getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers();
 
                     while (members.size() > 0) {