You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2015/03/22 20:23:28 UTC

stratos git commit: Changing task based event synchronizers to scheduled executor services

Repository: stratos
Updated Branches:
  refs/heads/master a76320b13 -> bf8bd349e


Changing task based event synchronizers to scheduled executor services


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/bf8bd349
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/bf8bd349
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/bf8bd349

Branch: refs/heads/master
Commit: bf8bd349ef5d8c650928ff847e9b578a9d6a6ddd
Parents: a76320b
Author: Imesh Gunaratne <im...@apache.org>
Authored: Mon Mar 23 00:53:13 2015 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Mon Mar 23 00:53:13 2015 +0530

----------------------------------------------------------------------
 .../ApplicationEventSynchronizer.java           | 40 +++++++++
 .../ApplicationSynchronizeTask.java             | 54 -----------
 .../ApplicationSynchronizerTaskScheduler.java   | 87 ------------------
 .../internal/AutoscalerServiceComponent.java    | 24 +++--
 .../autoscaler/util/AutoscalerConstants.java    |  7 +-
 .../CloudControllerServiceComponent.java        | 57 ++++++++----
 .../publisher/TopologyEventSynchronizer.java    | 64 +++++++++++++
 .../publisher/TopologySynchronizerTask.java     | 71 ---------------
 .../TopologySynchronizerTaskScheduler.java      | 83 -----------------
 .../StratosManagerServiceComponent.java         | 74 +++++++++------
 .../ApplicationSignUpEventSynchronizer.java     | 56 ++++++++++++
 .../ApplicationSignUpSynchronizerTask.java      | 66 --------------
 .../synchronizer/SynchronizerTaskScheduler.java | 72 ---------------
 .../synchronizer/TenantEventSynchronizer.java   | 84 +++++++++++++++++
 .../synchronizer/TenantSynzhronizerTask.java    | 94 --------------------
 .../tests/SampleApplicationsTest.java           |  5 +-
 16 files changed, 354 insertions(+), 584 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
new file mode 100644
index 0000000..fc7a528
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder;
+
+public class ApplicationEventSynchronizer implements Runnable {
+
+    private static final Log log = LogFactory.getLog(ApplicationEventSynchronizer.class);
+
+    @Override
+    public void run() {
+        if (log.isDebugEnabled()) {
+            log.debug("Executing topology synchronization task");
+        }
+        // publish to the topic
+        if (ApplicationHolder.getApplications() != null) {
+            ApplicationBuilder.handleCompleteApplication(ApplicationHolder.getApplications());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
deleted file mode 100644
index 81e4fa2..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder;
-import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
-import org.wso2.carbon.ntask.core.Task;
-
-import java.util.Map;
-
-public class ApplicationSynchronizeTask implements Task {
-    private static final Log log = LogFactory.getLog(ApplicationSynchronizeTask.class);
-
-    @Override
-    public void execute() {
-        if (log.isDebugEnabled()) {
-            log.debug("Executing topology synchronization task");
-        }
-        // publish to the topic
-        if (ApplicationHolder.getApplications() != null) {
-            ApplicationBuilder.handleCompleteApplication(ApplicationHolder.getApplications());
-        }
-    }
-
-    @Override
-    public void init() {
-        if(log.isDebugEnabled()) {
-            log.debug("Applications Complete Event publisher task has been started...");
-        }
-
-    }
-
-    @Override
-    public void setProperties(Map<String, String> arg0) {}
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
deleted file mode 100644
index ea692c2..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.util.AutoscalerConstants;
-import org.wso2.carbon.ntask.common.TaskException;
-import org.wso2.carbon.ntask.core.TaskInfo;
-import org.wso2.carbon.ntask.core.TaskManager;
-import org.wso2.carbon.ntask.core.service.TaskService;
-
-import java.util.HashMap;
-
-/**
- * Topology synchronizer task scheduler for scheduling the topology synchronizer task
- * using carbon task service.
- */
-public class ApplicationSynchronizerTaskScheduler {
-
-    private static final Log log = LogFactory.getLog(ApplicationSynchronizerTaskScheduler.class);
-
-    public static void schedule(TaskService taskService) {
-        TaskManager taskManager = null;
-        try {
-
-            //if (!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE)) {
-            // Register task type
-            taskService.registerTaskType(AutoscalerConstants.APPLICATION_SYNC_TASK_TYPE);
-
-                /*// Register task
-                taskManager = taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
-                String cronProp = dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_PROPERTY);
-				String cron = cronProp != null ?  cronProp :CloudControllerConstants.TOPOLOGY_SYNC_CRON ;
-                TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron);
-                TaskInfo taskInfo = new TaskInfo(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME,
-                        TopologySynchronizerTask.class.getName(),
-                        new HashMap<String, String>(), triggerInfo);
-                taskManager.registerTask(taskInfo);*/
-
-            taskManager = taskService.getTaskManager(AutoscalerConstants.APPLICATION_SYNC_TASK_TYPE);
-            String cronProp = AutoscalerConstants.APPLICATION_SYNC_CRON;
-            TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cronProp);
-            TaskInfo taskInfo = new TaskInfo(AutoscalerConstants.APPLICATION_SYNC_TASK_NAME,
-                    ApplicationSynchronizeTask.class.getName(),
-                    new HashMap<String, String>(), triggerInfo);
-            taskManager.registerTask(taskInfo);
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Topology synchronization task scheduled: %s", AutoscalerConstants.APPLICATION_SYNC_TASK_NAME));
-            }
-            //}
-
-        } catch (Exception e) {
-            if (taskManager != null) {
-                try {
-                    taskManager.deleteTask(AutoscalerConstants.APPLICATION_SYNC_TASK_NAME);
-                } catch (TaskException te) {
-                    if (log.isErrorEnabled()) {
-                        log.error(te);
-                    }
-                }
-            }
-
-            String msg = String.format("Could not schedule topology synchronization task: %s",
-                    AutoscalerConstants.APPLICATION_SYNC_TASK_NAME);
-            log.error(msg, e);
-            throw new RuntimeException(msg, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index 3c03807..81608ec 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -23,8 +23,7 @@ import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.algorithms.networkpartition.NetworkPartitionAlgorithmContext;
-import org.apache.stratos.autoscaler.applications.ApplicationSynchronizeTask;
-import org.apache.stratos.autoscaler.applications.ApplicationSynchronizerTaskScheduler;
+import org.apache.stratos.autoscaler.applications.ApplicationEventSynchronizer;
 import org.apache.stratos.autoscaler.context.AutoscalerContext;
 import org.apache.stratos.autoscaler.event.receiver.health.AutoscalerHealthStatEventReceiver;
 import org.apache.stratos.autoscaler.event.receiver.topology.AutoscalerTopologyEventReceiver;
@@ -53,6 +52,8 @@ import org.wso2.carbon.utils.ConfigurationContextService;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @scr.component name=org.apache.stratos.autoscaler.internal.AutoscalerServiceComponent" immediate="true"
@@ -79,6 +80,7 @@ public class AutoscalerServiceComponent {
 	private AutoscalerTopologyEventReceiver asTopologyReceiver;
 	private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
 	private ExecutorService executorService;
+    private ScheduledExecutorService scheduler;
 
 	protected void activate(ComponentContext componentContext) throws Exception {
 		try {
@@ -88,6 +90,11 @@ public class AutoscalerServiceComponent {
             executorService = StratosThreadPool.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID,
                     threadPoolSize);
 
+            int schedulerThreadPoolSize = conf.getInt(AutoscalerConstants.SCHEDULER_THREAD_POOL_SIZE_KEY,
+                    AutoscalerConstants.AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE);
+            scheduler = StratosThreadPool.getScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID,
+                    schedulerThreadPoolSize);
+
             Runnable autoscalerActivator = new Runnable() {
                 @Override
                 public void run() {
@@ -196,16 +203,14 @@ public class AutoscalerServiceComponent {
 			log.info("Scheduling tasks to publish applications");
 		}
 
-		ApplicationSynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
-
         ComponentStartUpSynchronizer componentStartUpSynchronizer =
                 ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
         componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() {
             @Override
             public void activated(Component component) {
                 if(component == Component.StratosManager) {
-                    ApplicationSynchronizeTask applicationSynchronizeTask = new ApplicationSynchronizeTask();
-                    applicationSynchronizeTask.execute();
+                    Runnable applicationSynchronizer = new ApplicationEventSynchronizer();
+                    scheduler.scheduleAtFixedRate(applicationSynchronizer, 0, 1, TimeUnit.MINUTES);
                 }
             }
         });
@@ -229,9 +234,10 @@ public class AutoscalerServiceComponent {
         }
 
         // Shutdown executor service
-        if(executorService != null) {
-            shutdownExecutorService(executorService);
-        }
+        shutdownExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID);
+
+        // Shutdown scheduler
+        shutdownScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID);
 
         // Shutdown application monitor executor service
         shutdownExecutorService(AutoscalerConstants.APPLICATION_MONITOR_THREAD_POOL_ID);

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
index f5d74fc..1423cc4 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
@@ -35,6 +35,9 @@ public final class AutoscalerConstants {
     public static final String PROPERTY_VALUE_ATTR = "value";
     public static final String AUTOSCALER_THREAD_POOL_ID = "autoscaler.thread.pool";
     public static final String THREAD_POOL_SIZE_KEY = "autoscaler.thread.pool.size";
+    public static final String AUTOSCALER_SCHEDULER_ID = "autoscaler.scheduler.thread.pool";
+    public static final String SCHEDULER_THREAD_POOL_SIZE_KEY = "autoscaler.scheduler.thread.pool.size";
+    public static final int AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE = 5;
     public static final int AUTOSCALER_THREAD_POOL_SIZE = 50;
     public static final String COMPONENTS_CONFIG = CarbonUtils.getCarbonConfigDirPath() +
             File.separator + "stratos-config.xml";
@@ -74,7 +77,7 @@ public final class AutoscalerConstants {
     public static final String CLUSTER_MONITOR_SCHEDULER_ID = "cluster.monitor.scheduler";
     public static final String CLUSTER_MONITOR_THREAD_POOL_ID = "cluster.monitor.thread.pool";
     public static final String CLUSTER_MONITOR_THREAD_POOL_SIZE = "cluster.monitor.thread.pool.size";
-	public static final String MEMBER_FAULT_EVENT_NAME = "member_fault";
+    public static final String MEMBER_FAULT_EVENT_NAME = "member_fault";
 	//scheduler
 	public static final int SCHEDULE_DEFAULT_INITIAL_DELAY = 30;
 	public static final int SCHEDULE_DEFAULT_PERIOD = 15;
@@ -117,5 +120,5 @@ public final class AutoscalerConstants {
 	public static final String OAUTH_SERVICE_SFX = "services/OAuthAdminService";
 	public static final String IDENTITY_APPLICATION_SERVICE_SFX = "services/IdentityApplicationManagementService";
 	public static final String TOKEN_ENDPOINT_SFX = "oauth2/token";
-	public static final String TERMINATE_DEPENDENTS = "terminate-dependents";	
+	public static final String TERMINATE_DEPENDENTS = "terminate-dependents";
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index bf8e92c..83973e0 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -26,8 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.context.CloudControllerContext;
 import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-import org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTask;
-import org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTaskScheduler;
+import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventSynchronizer;
 import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationEventReceiver;
 import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
 import org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver;
@@ -49,6 +48,8 @@ import org.wso2.carbon.registry.core.session.UserRegistry;
 import org.wso2.carbon.utils.ConfigurationContextService;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Registering Cloud Controller Service.
@@ -71,18 +72,24 @@ public class CloudControllerServiceComponent {
 
 	private static final Log log = LogFactory.getLog(CloudControllerServiceComponent.class);
 
+    private static final String CLOUD_CONTROLLER_COORDINATOR_LOCK = "cloud.controller.coordinator.lock";
+    private static final String THREAD_POOL_ID = "cloud.controller.thread.pool";
+    private static final String SCHEDULER_THREAD_POOL_ID = "cloud.controller.scheduler.thread.pool";
+    private static final int THREAD_POOL_SIZE = 10;
+    private static final int SCHEDULER_THREAD_POOL_SIZE = 5;
+
 	private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
 	private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
 	private ApplicationEventReceiver applicationEventReceiver;
     private ExecutorService executorService;
-
-	private static final String DEFAULT_IDENTIFIER = "Cloud-Controller";
-	private static final int THREAD_POOL_SIZE = 10;
-    private static final String CLOUD_CONTROLLER_COORDINATOR_LOCK = "CLOUD_CONTROLLER_COORDINATOR_LOCK";
+    private ScheduledExecutorService scheduler;
 
     protected void activate(final ComponentContext context) {
 		try {
-            executorService = StratosThreadPool.getExecutorService(DEFAULT_IDENTIFIER, THREAD_POOL_SIZE);
+            executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE);
+            scheduler = StratosThreadPool.getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID,
+                    SCHEDULER_THREAD_POOL_SIZE);
+
             Runnable cloudControllerActivator = new Runnable() {
                 @Override
                 public void run() {
@@ -160,7 +167,6 @@ public class CloudControllerServiceComponent {
         if (log.isInfoEnabled()) {
             log.info("Scheduling topology synchronizer task");
         }
-        TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
 
         ComponentStartUpSynchronizer componentStartUpSynchronizer =
                 ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
@@ -168,8 +174,8 @@ public class CloudControllerServiceComponent {
             @Override
             public void activated(Component component) {
                 if(component == Component.StratosManager) {
-                    TopologySynchronizerTask topologySynchronizerTask = new TopologySynchronizerTask();
-                    topologySynchronizerTask.execute();
+                    Runnable topologySynchronizer = new TopologyEventSynchronizer();
+                    scheduler.scheduleAtFixedRate(topologySynchronizer, 0, 1, TimeUnit.MINUTES);
                 }
             }
         });
@@ -253,12 +259,31 @@ public class CloudControllerServiceComponent {
         }
 
         // Shutdown executor service
+        shutdownExecutorService(THREAD_POOL_ID);
+
+        // Shutdown scheduler
+        shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID);
+	}
+
+    private void shutdownExecutorService(String executorServiceId) {
+        ExecutorService executorService = StratosThreadPool.getExecutorService(executorServiceId, 1);
         if(executorService != null) {
-            try {
-                executorService.shutdownNow();
-            } catch (Exception e) {
-                log.warn("An error occurred while shutting down cloud controller executor service", e);
-            }
+            shutdownExecutorService(executorService);
         }
-	}
+    }
+
+    private void shutdownScheduledExecutorService(String executorServiceId) {
+        ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
+        if(executorService != null) {
+            shutdownExecutorService(executorService);
+        }
+    }
+
+    private void shutdownExecutorService(ExecutorService executorService) {
+        try {
+            executorService.shutdownNow();
+        } catch (Exception e) {
+            log.warn("An error occurred while shutting down executor service", e);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
new file mode 100644
index 0000000..d465f36
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.messaging.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
+import org.apache.stratos.cloud.controller.context.CloudControllerContext;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager;
+
+/**
+ * Topology event synchronizer publishes complete topology event periodically.
+ */
+public class TopologyEventSynchronizer implements Runnable {
+
+    private static final Log log = LogFactory.getLog(TopologyEventSynchronizer.class);
+
+    @Override
+    public void run() {
+        if (log.isDebugEnabled()) {
+            log.debug("Executing topology synchronizer");
+        }
+        
+        if(!CloudControllerConfig.getInstance().isTopologySyncEnabled()) {
+            if(log.isWarnEnabled()) {
+                log.warn("Topology synchronization is disabled");
+            }
+            return;
+        }
+
+        if(CloudControllerContext.getInstance().isTopologySyncRunning()) {
+            if(log.isWarnEnabled()) {
+                log.warn("Topology synchronization is already running");
+            }
+            return;
+        }
+
+        try {
+            // Publish complete topology event
+            if (TopologyManager.getTopology() != null) {
+                CloudControllerContext.getInstance().setTopologySyncRunning(true);
+                TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology());
+            }
+        } finally {
+            CloudControllerContext.getInstance().setTopologySyncRunning(false);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java
deleted file mode 100644
index aa0ee67..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.messaging.publisher;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
-import org.apache.stratos.cloud.controller.context.CloudControllerContext;
-import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager;
-import org.wso2.carbon.ntask.core.Task;
-
-import java.util.Map;
-
-public class TopologySynchronizerTask implements Task {
-
-    private static final Log log = LogFactory.getLog(TopologySynchronizerTask.class);
-
-    @Override
-    public void execute() {
-        if (log.isDebugEnabled()) {
-            log.debug("Executing topology synchronization task");
-        }
-        
-        if(!CloudControllerConfig.getInstance().isTopologySyncEnabled()) {
-            if(log.isWarnEnabled()) {
-                log.warn("Topology synchronization is disabled");
-            }
-            return;
-        }
-
-        if(CloudControllerContext.getInstance().isTopologySyncRunning()) {
-            if(log.isWarnEnabled()) {
-                log.warn("Topology synchronization is already running");
-            }
-            return;
-        }
-
-        try {
-            // Publish complete topology event
-            if (TopologyManager.getTopology() != null) {
-                CloudControllerContext.getInstance().setTopologySyncRunning(true);
-                TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology());
-            }
-        } finally {
-            CloudControllerContext.getInstance().setTopologySyncRunning(false);
-        }
-    }
-    
-    @Override
-    public void init() {
-    }
-
-    @Override
-    public void setProperties(Map<String, String> arg0) {}
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
deleted file mode 100644
index 9575ce4..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.messaging.publisher;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
-import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.wso2.carbon.ntask.common.TaskException;
-import org.wso2.carbon.ntask.core.TaskInfo;
-import org.wso2.carbon.ntask.core.TaskManager;
-import org.wso2.carbon.ntask.core.service.TaskService;
-
-import java.util.HashMap;
-
-/**
- * Topology synchronizer task scheduler for scheduling the topology synchronizer task
- * using carbon task service.
- */
-public class TopologySynchronizerTaskScheduler {
-
-    private static final Log log = LogFactory.getLog(TopologySynchronizerTaskScheduler.class);
-
-    public static void schedule(TaskService taskService) {
-        TaskManager taskManager = null;
-        try {
-
-            if (!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE)) {
-                // Register task type
-                taskService.registerTaskType(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
-
-                // Register task
-                taskManager = taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
-                String cronProp = CloudControllerConfig.getInstance().
-                        getTopologyConfig().getProperty(CloudControllerConstants.CRON_PROPERTY);
-				String cron = cronProp != null ?  cronProp :CloudControllerConstants.TOPOLOGY_SYNC_CRON ;
-                TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron);
-
-                TaskInfo taskInfo = new TaskInfo(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME,
-                        TopologySynchronizerTask.class.getName(),
-                        new HashMap<String, String>(), triggerInfo);
-                taskManager.registerTask(taskInfo);
-                if(log.isInfoEnabled()) {
-                    log.info(String.format("Synchronization task scheduled: [task] %s [cron] %s",
-                            CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME, cron));
-                }
-            }
-
-        } catch (Exception e) {
-            if (taskManager != null) {
-                try {
-                    taskManager.deleteTask(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME);
-                } catch (TaskException te) {
-                    if (log.isErrorEnabled()) {
-                        log.error(te);
-                    }
-                }
-            }
-            
-            String msg = String.format("Could not schedule synchronization task: %s",
-                    CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME);
-            log.error(msg, e);
-			throw new RuntimeException(msg, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 4c1dbf4..00fc68d 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -28,16 +28,14 @@ import org.apache.stratos.common.services.DistributedObjectProvider;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.manager.context.StratosManagerContext;
 import org.apache.stratos.manager.messaging.publisher.TenantEventPublisher;
-import org.apache.stratos.manager.messaging.publisher.synchronizer.ApplicationSignUpSynchronizerTask;
-import org.apache.stratos.manager.messaging.publisher.synchronizer.SynchronizerTaskScheduler;
-import org.apache.stratos.manager.messaging.publisher.synchronizer.TenantSynzhronizerTask;
+import org.apache.stratos.manager.messaging.publisher.synchronizer.ApplicationSignUpEventSynchronizer;
+import org.apache.stratos.manager.messaging.publisher.synchronizer.TenantEventSynchronizer;
 import org.apache.stratos.manager.messaging.receiver.StratosManagerApplicationEventReceiver;
 import org.apache.stratos.manager.messaging.receiver.StratosManagerInstanceStatusEventReceiver;
 import org.apache.stratos.manager.messaging.receiver.StratosManagerTopologyEventReceiver;
 import org.apache.stratos.manager.user.management.TenantUserRoleManager;
 import org.apache.stratos.manager.user.management.exception.UserManagerException;
 import org.apache.stratos.manager.utils.CartridgeConfigFileReader;
-import org.apache.stratos.manager.utils.StratosManagerConstants;
 import org.apache.stratos.manager.utils.UserRoleCreator;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.util.MessagingUtil;
@@ -51,6 +49,8 @@ import org.wso2.carbon.user.core.service.RealmService;
 import org.wso2.carbon.utils.ConfigurationContextService;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @scr.component name="org.wso2.carbon.hosting.mgt.internal.StratosManagerServiceComponent"
@@ -81,18 +81,25 @@ import java.util.concurrent.ExecutorService;
 public class StratosManagerServiceComponent {
 
 	private static final Log log = LogFactory.getLog(StratosManagerServiceComponent.class);
-	private static final String THREAD_EXECUTOR_ID = "stratos.manager.thread.pool";
-    private static final String STRATOS_MANAGER_COORDINATOR_LOCK = "STRATOS_MANAGER_COORDINATOR_LOCK";
+
+    private static final String THREAD_POOL_ID = "stratos.manager.thread.pool";
+    private static final String SCHEDULER_THREAD_POOL_ID = "stratos.manager.scheduler.thread.pool";
+    private static final String STRATOS_MANAGER_COORDINATOR_LOCK = "stratos.manager.coordinator.lock";
     private static final int THREAD_POOL_SIZE = 20;
+    private static final int SCHEDULER_THREAD_POOL_SIZE = 5;
 
     private StratosManagerTopologyEventReceiver topologyEventReceiver;
     private StratosManagerInstanceStatusEventReceiver instanceStatusEventReceiver;
     private StratosManagerApplicationEventReceiver applicationEventReceiver;
 	private ExecutorService executorService;
+    private ScheduledExecutorService scheduler;
 
     protected void activate(final ComponentContext componentContext) throws Exception {
 		try {
-            executorService = StratosThreadPool.getExecutorService(THREAD_EXECUTOR_ID, THREAD_POOL_SIZE);
+            executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE);
+            scheduler = StratosThreadPool.getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID,
+                    SCHEDULER_THREAD_POOL_SIZE);
+
             Runnable stratosManagerActivator = new Runnable() {
                 @Override
                 public void run() {
@@ -167,11 +174,9 @@ public class StratosManagerServiceComponent {
     private void executeCoordinatorTasks(ComponentContext componentContext) throws UserStoreException,
             UserManagerException {
 
-        // Initialize tenant event publisher
         initializeTenantEventPublisher(componentContext);
-
-        // Initialize instance status receiver
         initializeInstanceStatusEventReceiver();
+        registerComponentStartUpEventListeners();
 
         // Create internal/user Role at server start-up
         createInternalUserRole(componentContext);
@@ -227,20 +232,6 @@ public class StratosManagerServiceComponent {
      * @param componentContext
      */
     private void initializeTenantEventPublisher(ComponentContext componentContext) {
-        // Schedule complete tenant event synchronizer
-        if(log.isDebugEnabled()) {
-            log.debug("Scheduling tenant synchronizer task...");
-        }
-        SynchronizerTaskScheduler.schedule(StratosManagerConstants.TENANT_SYNC_TASK_TYPE,
-                StratosManagerConstants.TENANT_SYNC_TASK_NAME, TenantSynzhronizerTask.class);
-
-        // Schedule complete application signup event synchronizer
-        if(log.isDebugEnabled()) {
-            log.debug("Scheduling application signup synchronizer task...");
-        }
-        SynchronizerTaskScheduler.schedule(StratosManagerConstants.APPLICATION_SIGNUP_SYNC_TASK_TYPE,
-                StratosManagerConstants.APPLICATION_SIGNUP_SYNC_TASK_NAME, ApplicationSignUpSynchronizerTask.class);
-
         // Register tenant event publisher
         if(log.isDebugEnabled()) {
             log.debug("Initializing tenant event publisher...");
@@ -252,18 +243,20 @@ public class StratosManagerServiceComponent {
         if(log.isInfoEnabled()) {
             log.info("Tenant event publisher initialized");
         }
+    }
 
+    private void registerComponentStartUpEventListeners() {
         ComponentStartUpSynchronizer componentStartUpSynchronizer =
                 ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
         componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() {
             @Override
             public void activated(Component component) {
                 if(component == Component.StratosManager) {
-                    TenantSynzhronizerTask tenantSynzhronizerTask = new TenantSynzhronizerTask();
-                    tenantSynzhronizerTask.execute();
+                    Runnable tenantSynchronizer = new TenantEventSynchronizer();
+                    scheduler.scheduleAtFixedRate(tenantSynchronizer, 0, 1, TimeUnit.MINUTES);
 
-                    ApplicationSignUpSynchronizerTask applicationSignUpSynchronizerTask = new ApplicationSignUpSynchronizerTask();
-                    applicationSignUpSynchronizerTask.execute();
+                    Runnable applicationSignUpSynchronizer = new ApplicationSignUpEventSynchronizer();
+                    scheduler.scheduleAtFixedRate(applicationSignUpSynchronizer, 0, 1, TimeUnit.MINUTES);
                 }
             }
         });
@@ -343,6 +336,29 @@ public class StratosManagerServiceComponent {
         EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
         EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());
 
-	    executorService.shutdownNow();
+        shutdownExecutorService(THREAD_POOL_ID);
+        shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID);
+    }
+
+    private void shutdownExecutorService(String executorServiceId) {
+        ExecutorService executorService = StratosThreadPool.getExecutorService(executorServiceId, 1);
+        if(executorService != null) {
+            shutdownExecutorService(executorService);
+        }
+    }
+
+    private void shutdownScheduledExecutorService(String executorServiceId) {
+        ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
+        if(executorService != null) {
+            shutdownExecutorService(executorService);
+        }
+    }
+
+    private void shutdownExecutorService(ExecutorService executorService) {
+        try {
+            executorService.shutdownNow();
+        } catch (Exception e) {
+            log.warn("An error occurred while shutting down executor service", e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpEventSynchronizer.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpEventSynchronizer.java
new file mode 100644
index 0000000..c64c5e8
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpEventSynchronizer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.messaging.publisher.synchronizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.components.ApplicationSignUpHandler;
+import org.apache.stratos.manager.messaging.publisher.ApplicationSignUpEventPublisher;
+import org.apache.stratos.messaging.domain.application.signup.ApplicationSignUp;
+
+import java.util.List;
+
+/**
+ * Application signup synchronizer publishes complete application signup event periodically.
+ */
+public class ApplicationSignUpEventSynchronizer implements Runnable {
+
+    private static final Log log = LogFactory.getLog(ApplicationSignUpEventSynchronizer.class);
+
+    private ApplicationSignUpHandler applicationSignUpHandler;
+
+    public ApplicationSignUpEventSynchronizer() {
+        applicationSignUpHandler = new ApplicationSignUpHandler();
+    }
+
+    @Override
+    public void run() {
+        try {
+            List<ApplicationSignUp> applicationSignUps = applicationSignUpHandler.getApplicationSignUps();
+            if((applicationSignUps != null) && (applicationSignUps.size() > 0)) {
+                log.debug("Publishing complete application signup event");
+                ApplicationSignUpEventPublisher.publishCompleteApplicationSignUpsEvent(applicationSignUps);
+            }
+        } catch (Exception e) {
+            String message = "Could not publish complete application signup event";
+            log.error(message, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpSynchronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpSynchronizerTask.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpSynchronizerTask.java
deleted file mode 100644
index adf6756..0000000
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpSynchronizerTask.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.manager.messaging.publisher.synchronizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.components.ApplicationSignUpHandler;
-import org.apache.stratos.manager.messaging.publisher.ApplicationSignUpEventPublisher;
-import org.apache.stratos.messaging.domain.application.signup.ApplicationSignUp;
-import org.wso2.carbon.ntask.core.Task;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Application signup synchronizer task.
- */
-public class ApplicationSignUpSynchronizerTask implements Task {
-
-    private static final Log log = LogFactory.getLog(ApplicationSignUpSynchronizerTask.class);
-
-    private ApplicationSignUpHandler applicationSignUpHandler;
-
-    public ApplicationSignUpSynchronizerTask() {
-        init();
-    }
-
-    @Override
-    public void setProperties(Map<String, String> map) {
-    }
-
-    @Override
-    public void init() {
-        applicationSignUpHandler = new ApplicationSignUpHandler();
-    }
-
-    @Override
-    public void execute() {
-        try {
-            List<ApplicationSignUp> applicationSignUps = applicationSignUpHandler.getApplicationSignUps();
-            if((applicationSignUps != null) && (applicationSignUps.size() > 0)) {
-                ApplicationSignUpEventPublisher.publishCompleteApplicationSignUpsEvent(applicationSignUps);
-            }
-        } catch (Exception e) {
-            String message = "Could not publish complete application signup event";
-            log.error(message, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/SynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/SynchronizerTaskScheduler.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/SynchronizerTaskScheduler.java
deleted file mode 100644
index 41976cd..0000000
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/SynchronizerTaskScheduler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.manager.messaging.publisher.synchronizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.internal.ServiceReferenceHolder;
-import org.apache.stratos.manager.utils.StratosManagerConstants;
-import org.wso2.carbon.ntask.common.TaskException;
-import org.wso2.carbon.ntask.core.TaskInfo;
-import org.wso2.carbon.ntask.core.TaskManager;
-import org.wso2.carbon.ntask.core.service.TaskService;
-
-import java.util.HashMap;
-
-/**
- * Synchronizer task scheduler for scheduling tasks using carbon task service.
- */
-public class SynchronizerTaskScheduler {
-
-    private static final Log log = LogFactory.getLog(SynchronizerTaskScheduler.class);
-
-    public static void schedule(String taskType, String taskName, Class taskClass) {
-        TaskManager taskManager = null;
-        try {
-            TaskService taskService = ServiceReferenceHolder.getInstance().getTaskService();
-
-            if (!taskService.getRegisteredTaskTypes().contains(taskType)) {
-                // Register task type
-                taskService.registerTaskType(taskType);
-
-                // Register task
-                taskManager = taskService.getTaskManager(taskType);
-                String cron = StratosManagerConstants.DEFAULT_CRON;
-                TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron);
-                TaskInfo taskInfo = new TaskInfo(taskName, taskClass.getName(), new HashMap<String, String>(), triggerInfo);
-                taskManager.registerTask(taskInfo);
-                if(log.isInfoEnabled()) {
-                    log.info(String.format("Synchronization task scheduled: [task] %s [cron] %s", taskName, cron));
-                }
-            }
-        } catch (Exception e) {
-            if (taskManager != null) {
-                try {
-                    taskManager.deleteTask(taskName);
-                } catch (TaskException te) {
-                    if (log.isErrorEnabled()) {
-                        log.error(te);
-                    }
-                }
-            }
-            throw new RuntimeException(String.format("Could not schedule synchronization task: %s", taskName), e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
new file mode 100644
index 0000000..380bf69
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.messaging.publisher.synchronizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.internal.ServiceReferenceHolder;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
+import org.apache.stratos.messaging.util.MessagingUtil;
+import org.wso2.carbon.stratos.common.beans.TenantInfoBean;
+import org.wso2.carbon.user.core.tenant.TenantManager;
+
+import java.util.*;
+
+/**
+ * Tenant event synchronizer publishes complete tenant event periodically.
+ */
+public class TenantEventSynchronizer implements Runnable {
+
+	private static final Log log = LogFactory.getLog(TenantEventSynchronizer.class);
+
+	@Override
+	public void run() {
+		try {
+			if (log.isDebugEnabled()) {
+				log.debug(String.format("Publishing complete tenant event"));
+			}
+			Tenant tenant;
+			List<Tenant> tenants = new ArrayList<Tenant>();
+			TenantManager tenantManager = ServiceReferenceHolder.getRealmService().getTenantManager();
+			org.wso2.carbon.user.api.Tenant[] carbonTenants = tenantManager.getAllTenants();
+			for (org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) {
+				// Create tenant
+				if (log.isDebugEnabled()) {
+					log.debug(String.format("Tenant found: [tenant-id] %d [tenant-domain] %s",
+					                        carbonTenant.getId(), carbonTenant.getDomain()));
+				}
+				tenant = new Tenant(carbonTenant.getId(), carbonTenant.getDomain());
+
+				if (!org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance()
+				                                                                       .tenantExists(carbonTenant.getId())) {
+					// if the tenant is not already there in TenantManager,
+					// trigger TenantCreatedEvent
+					TenantInfoBean tenantBean = new TenantInfoBean();
+					tenantBean.setTenantId(carbonTenant.getId());
+					tenantBean.setTenantDomain(carbonTenant.getDomain());
+
+					// Add tenant to Tenant Manager
+					org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance()
+					                                                                  .addTenant(tenant);
+				}
+				tenants.add(tenant);
+			}
+			CompleteTenantEvent event = new CompleteTenantEvent(tenants);
+			String topic = MessagingUtil.getMessageTopicName(event);
+			EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
+			eventPublisher.publish(event);
+		} catch (Exception e) {
+			if (log.isErrorEnabled()) {
+				log.error("Could not publish complete tenant event", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantSynzhronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantSynzhronizerTask.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantSynzhronizerTask.java
deleted file mode 100644
index f38e6e1..0000000
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantSynzhronizerTask.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.manager.messaging.publisher.synchronizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.internal.ServiceReferenceHolder;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
-import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
-import org.apache.stratos.messaging.domain.tenant.Tenant;
-import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
-import org.apache.stratos.messaging.util.MessagingUtil;
-import org.wso2.carbon.ntask.core.Task;
-import org.wso2.carbon.stratos.common.beans.TenantInfoBean;
-import org.wso2.carbon.user.core.tenant.TenantManager;
-
-import java.util.*;
-
-/**
- * Tenant synchronizer task for publishing complete tenant event periodically
- * to message broker.
- */
-public class TenantSynzhronizerTask implements Task {
-
-	private static final Log log = LogFactory.getLog(TenantSynzhronizerTask.class);
-
-	@Override
-	public void init() {
-	}
-
-	@Override
-	public void execute() {
-		try {
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Publishing complete tenant event"));
-			}
-			Tenant tenant;
-			List<Tenant> tenants = new ArrayList<Tenant>();
-			TenantManager tenantManager = ServiceReferenceHolder.getRealmService().getTenantManager();
-			org.wso2.carbon.user.api.Tenant[] carbonTenants = tenantManager.getAllTenants();
-			for (org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) {
-				// Create tenant
-				if (log.isDebugEnabled()) {
-					log.debug(String.format("Tenant found: [tenant-id] %d [tenant-domain] %s",
-					                        carbonTenant.getId(), carbonTenant.getDomain()));
-				}
-				tenant = new Tenant(carbonTenant.getId(), carbonTenant.getDomain());
-
-				if (!org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance()
-				                                                                       .tenantExists(carbonTenant.getId())) {
-					// if the tenant is not already there in TenantManager,
-					// trigger TenantCreatedEvent
-					TenantInfoBean tenantBean = new TenantInfoBean();
-					tenantBean.setTenantId(carbonTenant.getId());
-					tenantBean.setTenantDomain(carbonTenant.getDomain());
-
-					// Add tenant to Tenant Manager
-					org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance()
-					                                                                  .addTenant(tenant);
-				}
-				tenants.add(tenant);
-			}
-			CompleteTenantEvent event = new CompleteTenantEvent(tenants);
-			String topic = MessagingUtil.getMessageTopicName(event);
-			EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
-			eventPublisher.publish(event);
-		} catch (Exception e) {
-			if (log.isErrorEnabled()) {
-				log.error("Could not publish complete tenant event", e);
-			}
-		}
-	}
-
-	@Override
-	public void setProperties(Map<String, String> stringStringMap) {
-	}
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/products/stratos/modules/integration/src/test/java/org/apache/stratos/integration/tests/SampleApplicationsTest.java
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/src/test/java/org/apache/stratos/integration/tests/SampleApplicationsTest.java b/products/stratos/modules/integration/src/test/java/org/apache/stratos/integration/tests/SampleApplicationsTest.java
index 6ff13d8..08f6b91 100644
--- a/products/stratos/modules/integration/src/test/java/org/apache/stratos/integration/tests/SampleApplicationsTest.java
+++ b/products/stratos/modules/integration/src/test/java/org/apache/stratos/integration/tests/SampleApplicationsTest.java
@@ -59,7 +59,6 @@ public class SampleApplicationsTest extends StratosTestServerManager {
     public void testSingleCartridgeApplication() {
         try {
             initializeApplicationEventReceiver();
-
             runApplicationTest("single-cartridge", "single-cartridge-app");
         } catch (Exception e) {
             log.error(e);
@@ -117,6 +116,10 @@ public class SampleApplicationsTest extends StratosTestServerManager {
         long startTime = System.currentTimeMillis();
         Application application = ApplicationManager.getApplications().getApplication(applicationName);
         while(!((application != null) && (application.getStatus() == ApplicationStatus.Active))) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
             application = ApplicationManager.getApplications().getApplication(applicationName);
             if((System.currentTimeMillis() - startTime) > APPLICATION_ACTIVATION_TIMEOUT) {
                 break;