You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2015/03/22 20:23:28 UTC
stratos git commit: Changing task based event synchronizers to
scheduled executor services
Repository: stratos
Updated Branches:
refs/heads/master a76320b13 -> bf8bd349e
Changing task based event synchronizers to scheduled executor services
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/bf8bd349
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/bf8bd349
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/bf8bd349
Branch: refs/heads/master
Commit: bf8bd349ef5d8c650928ff847e9b578a9d6a6ddd
Parents: a76320b
Author: Imesh Gunaratne <im...@apache.org>
Authored: Mon Mar 23 00:53:13 2015 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Mon Mar 23 00:53:13 2015 +0530
----------------------------------------------------------------------
.../ApplicationEventSynchronizer.java | 40 +++++++++
.../ApplicationSynchronizeTask.java | 54 -----------
.../ApplicationSynchronizerTaskScheduler.java | 87 ------------------
.../internal/AutoscalerServiceComponent.java | 24 +++--
.../autoscaler/util/AutoscalerConstants.java | 7 +-
.../CloudControllerServiceComponent.java | 57 ++++++++----
.../publisher/TopologyEventSynchronizer.java | 64 +++++++++++++
.../publisher/TopologySynchronizerTask.java | 71 ---------------
.../TopologySynchronizerTaskScheduler.java | 83 -----------------
.../StratosManagerServiceComponent.java | 74 +++++++++------
.../ApplicationSignUpEventSynchronizer.java | 56 ++++++++++++
.../ApplicationSignUpSynchronizerTask.java | 66 --------------
.../synchronizer/SynchronizerTaskScheduler.java | 72 ---------------
.../synchronizer/TenantEventSynchronizer.java | 84 +++++++++++++++++
.../synchronizer/TenantSynzhronizerTask.java | 94 --------------------
.../tests/SampleApplicationsTest.java | 5 +-
16 files changed, 354 insertions(+), 584 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
new file mode 100644
index 0000000..fc7a528
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder;
+
+public class ApplicationEventSynchronizer implements Runnable {
+
+ private static final Log log = LogFactory.getLog(ApplicationEventSynchronizer.class);
+
+ @Override
+ public void run() {
+ if (log.isDebugEnabled()) {
+ log.debug("Executing topology synchronization task");
+ }
+ // publish to the topic
+ if (ApplicationHolder.getApplications() != null) {
+ ApplicationBuilder.handleCompleteApplication(ApplicationHolder.getApplications());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
deleted file mode 100644
index 81e4fa2..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder;
-import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
-import org.wso2.carbon.ntask.core.Task;
-
-import java.util.Map;
-
-public class ApplicationSynchronizeTask implements Task {
- private static final Log log = LogFactory.getLog(ApplicationSynchronizeTask.class);
-
- @Override
- public void execute() {
- if (log.isDebugEnabled()) {
- log.debug("Executing topology synchronization task");
- }
- // publish to the topic
- if (ApplicationHolder.getApplications() != null) {
- ApplicationBuilder.handleCompleteApplication(ApplicationHolder.getApplications());
- }
- }
-
- @Override
- public void init() {
- if(log.isDebugEnabled()) {
- log.debug("Applications Complete Event publisher task has been started...");
- }
-
- }
-
- @Override
- public void setProperties(Map<String, String> arg0) {}
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
deleted file mode 100644
index ea692c2..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.applications;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.util.AutoscalerConstants;
-import org.wso2.carbon.ntask.common.TaskException;
-import org.wso2.carbon.ntask.core.TaskInfo;
-import org.wso2.carbon.ntask.core.TaskManager;
-import org.wso2.carbon.ntask.core.service.TaskService;
-
-import java.util.HashMap;
-
-/**
- * Topology synchronizer task scheduler for scheduling the topology synchronizer task
- * using carbon task service.
- */
-public class ApplicationSynchronizerTaskScheduler {
-
- private static final Log log = LogFactory.getLog(ApplicationSynchronizerTaskScheduler.class);
-
- public static void schedule(TaskService taskService) {
- TaskManager taskManager = null;
- try {
-
- //if (!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE)) {
- // Register task type
- taskService.registerTaskType(AutoscalerConstants.APPLICATION_SYNC_TASK_TYPE);
-
- /*// Register task
- taskManager = taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
- String cronProp = dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_PROPERTY);
- String cron = cronProp != null ? cronProp :CloudControllerConstants.TOPOLOGY_SYNC_CRON ;
- TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron);
- TaskInfo taskInfo = new TaskInfo(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME,
- TopologySynchronizerTask.class.getName(),
- new HashMap<String, String>(), triggerInfo);
- taskManager.registerTask(taskInfo);*/
-
- taskManager = taskService.getTaskManager(AutoscalerConstants.APPLICATION_SYNC_TASK_TYPE);
- String cronProp = AutoscalerConstants.APPLICATION_SYNC_CRON;
- TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cronProp);
- TaskInfo taskInfo = new TaskInfo(AutoscalerConstants.APPLICATION_SYNC_TASK_NAME,
- ApplicationSynchronizeTask.class.getName(),
- new HashMap<String, String>(), triggerInfo);
- taskManager.registerTask(taskInfo);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Topology synchronization task scheduled: %s", AutoscalerConstants.APPLICATION_SYNC_TASK_NAME));
- }
- //}
-
- } catch (Exception e) {
- if (taskManager != null) {
- try {
- taskManager.deleteTask(AutoscalerConstants.APPLICATION_SYNC_TASK_NAME);
- } catch (TaskException te) {
- if (log.isErrorEnabled()) {
- log.error(te);
- }
- }
- }
-
- String msg = String.format("Could not schedule topology synchronization task: %s",
- AutoscalerConstants.APPLICATION_SYNC_TASK_NAME);
- log.error(msg, e);
- throw new RuntimeException(msg, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index 3c03807..81608ec 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -23,8 +23,7 @@ import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.algorithms.networkpartition.NetworkPartitionAlgorithmContext;
-import org.apache.stratos.autoscaler.applications.ApplicationSynchronizeTask;
-import org.apache.stratos.autoscaler.applications.ApplicationSynchronizerTaskScheduler;
+import org.apache.stratos.autoscaler.applications.ApplicationEventSynchronizer;
import org.apache.stratos.autoscaler.context.AutoscalerContext;
import org.apache.stratos.autoscaler.event.receiver.health.AutoscalerHealthStatEventReceiver;
import org.apache.stratos.autoscaler.event.receiver.topology.AutoscalerTopologyEventReceiver;
@@ -53,6 +52,8 @@ import org.wso2.carbon.utils.ConfigurationContextService;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* @scr.component name=org.apache.stratos.autoscaler.internal.AutoscalerServiceComponent" immediate="true"
@@ -79,6 +80,7 @@ public class AutoscalerServiceComponent {
private AutoscalerTopologyEventReceiver asTopologyReceiver;
private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
private ExecutorService executorService;
+ private ScheduledExecutorService scheduler;
protected void activate(ComponentContext componentContext) throws Exception {
try {
@@ -88,6 +90,11 @@ public class AutoscalerServiceComponent {
executorService = StratosThreadPool.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID,
threadPoolSize);
+ int schedulerThreadPoolSize = conf.getInt(AutoscalerConstants.SCHEDULER_THREAD_POOL_SIZE_KEY,
+ AutoscalerConstants.AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE);
+ scheduler = StratosThreadPool.getScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID,
+ schedulerThreadPoolSize);
+
Runnable autoscalerActivator = new Runnable() {
@Override
public void run() {
@@ -196,16 +203,14 @@ public class AutoscalerServiceComponent {
log.info("Scheduling tasks to publish applications");
}
- ApplicationSynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
-
ComponentStartUpSynchronizer componentStartUpSynchronizer =
ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() {
@Override
public void activated(Component component) {
if(component == Component.StratosManager) {
- ApplicationSynchronizeTask applicationSynchronizeTask = new ApplicationSynchronizeTask();
- applicationSynchronizeTask.execute();
+ Runnable applicationSynchronizer = new ApplicationEventSynchronizer();
+ scheduler.scheduleAtFixedRate(applicationSynchronizer, 0, 1, TimeUnit.MINUTES);
}
}
});
@@ -229,9 +234,10 @@ public class AutoscalerServiceComponent {
}
// Shutdown executor service
- if(executorService != null) {
- shutdownExecutorService(executorService);
- }
+ shutdownExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID);
+
+ // Shutdown scheduler
+ shutdownScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID);
// Shutdown application monitor executor service
shutdownExecutorService(AutoscalerConstants.APPLICATION_MONITOR_THREAD_POOL_ID);
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
index f5d74fc..1423cc4 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
@@ -35,6 +35,9 @@ public final class AutoscalerConstants {
public static final String PROPERTY_VALUE_ATTR = "value";
public static final String AUTOSCALER_THREAD_POOL_ID = "autoscaler.thread.pool";
public static final String THREAD_POOL_SIZE_KEY = "autoscaler.thread.pool.size";
+ public static final String AUTOSCALER_SCHEDULER_ID = "autoscaler.scheduler.thread.pool";
+ public static final String SCHEDULER_THREAD_POOL_SIZE_KEY = "autoscaler.scheduler.thread.pool.size";
+ public static final int AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE = 5;
public static final int AUTOSCALER_THREAD_POOL_SIZE = 50;
public static final String COMPONENTS_CONFIG = CarbonUtils.getCarbonConfigDirPath() +
File.separator + "stratos-config.xml";
@@ -74,7 +77,7 @@ public final class AutoscalerConstants {
public static final String CLUSTER_MONITOR_SCHEDULER_ID = "cluster.monitor.scheduler";
public static final String CLUSTER_MONITOR_THREAD_POOL_ID = "cluster.monitor.thread.pool";
public static final String CLUSTER_MONITOR_THREAD_POOL_SIZE = "cluster.monitor.thread.pool.size";
- public static final String MEMBER_FAULT_EVENT_NAME = "member_fault";
+ public static final String MEMBER_FAULT_EVENT_NAME = "member_fault";
//scheduler
public static final int SCHEDULE_DEFAULT_INITIAL_DELAY = 30;
public static final int SCHEDULE_DEFAULT_PERIOD = 15;
@@ -117,5 +120,5 @@ public final class AutoscalerConstants {
public static final String OAUTH_SERVICE_SFX = "services/OAuthAdminService";
public static final String IDENTITY_APPLICATION_SERVICE_SFX = "services/IdentityApplicationManagementService";
public static final String TOKEN_ENDPOINT_SFX = "oauth2/token";
- public static final String TERMINATE_DEPENDENTS = "terminate-dependents";
+ public static final String TERMINATE_DEPENDENTS = "terminate-dependents";
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index bf8e92c..83973e0 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -26,8 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cloud.controller.context.CloudControllerContext;
import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-import org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTask;
-import org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTaskScheduler;
+import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventSynchronizer;
import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationEventReceiver;
import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
import org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver;
@@ -49,6 +48,8 @@ import org.wso2.carbon.registry.core.session.UserRegistry;
import org.wso2.carbon.utils.ConfigurationContextService;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Registering Cloud Controller Service.
@@ -71,18 +72,24 @@ public class CloudControllerServiceComponent {
private static final Log log = LogFactory.getLog(CloudControllerServiceComponent.class);
+ private static final String CLOUD_CONTROLLER_COORDINATOR_LOCK = "cloud.controller.coordinator.lock";
+ private static final String THREAD_POOL_ID = "cloud.controller.thread.pool";
+ private static final String SCHEDULER_THREAD_POOL_ID = "cloud.controller.scheduler.thread.pool";
+ private static final int THREAD_POOL_SIZE = 10;
+ private static final int SCHEDULER_THREAD_POOL_SIZE = 5;
+
private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
private ApplicationEventReceiver applicationEventReceiver;
private ExecutorService executorService;
-
- private static final String DEFAULT_IDENTIFIER = "Cloud-Controller";
- private static final int THREAD_POOL_SIZE = 10;
- private static final String CLOUD_CONTROLLER_COORDINATOR_LOCK = "CLOUD_CONTROLLER_COORDINATOR_LOCK";
+ private ScheduledExecutorService scheduler;
protected void activate(final ComponentContext context) {
try {
- executorService = StratosThreadPool.getExecutorService(DEFAULT_IDENTIFIER, THREAD_POOL_SIZE);
+ executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE);
+ scheduler = StratosThreadPool.getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID,
+ SCHEDULER_THREAD_POOL_SIZE);
+
Runnable cloudControllerActivator = new Runnable() {
@Override
public void run() {
@@ -160,7 +167,6 @@ public class CloudControllerServiceComponent {
if (log.isInfoEnabled()) {
log.info("Scheduling topology synchronizer task");
}
- TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
ComponentStartUpSynchronizer componentStartUpSynchronizer =
ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
@@ -168,8 +174,8 @@ public class CloudControllerServiceComponent {
@Override
public void activated(Component component) {
if(component == Component.StratosManager) {
- TopologySynchronizerTask topologySynchronizerTask = new TopologySynchronizerTask();
- topologySynchronizerTask.execute();
+ Runnable topologySynchronizer = new TopologyEventSynchronizer();
+ scheduler.scheduleAtFixedRate(topologySynchronizer, 0, 1, TimeUnit.MINUTES);
}
}
});
@@ -253,12 +259,31 @@ public class CloudControllerServiceComponent {
}
// Shutdown executor service
+ shutdownExecutorService(THREAD_POOL_ID);
+
+ // Shutdown scheduler
+ shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID);
+ }
+
+ private void shutdownExecutorService(String executorServiceId) {
+ ExecutorService executorService = StratosThreadPool.getExecutorService(executorServiceId, 1);
if(executorService != null) {
- try {
- executorService.shutdownNow();
- } catch (Exception e) {
- log.warn("An error occurred while shutting down cloud controller executor service", e);
- }
+ shutdownExecutorService(executorService);
}
- }
+ }
+
+ private void shutdownScheduledExecutorService(String executorServiceId) {
+ ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
+ if(executorService != null) {
+ shutdownExecutorService(executorService);
+ }
+ }
+
+ private void shutdownExecutorService(ExecutorService executorService) {
+ try {
+ executorService.shutdownNow();
+ } catch (Exception e) {
+ log.warn("An error occurred while shutting down executor service", e);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
new file mode 100644
index 0000000..d465f36
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.messaging.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
+import org.apache.stratos.cloud.controller.context.CloudControllerContext;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager;
+
+/**
+ * Topology event synchronizer publishes complete topology event periodically.
+ */
+public class TopologyEventSynchronizer implements Runnable {
+
+ private static final Log log = LogFactory.getLog(TopologyEventSynchronizer.class);
+
+ @Override
+ public void run() {
+ if (log.isDebugEnabled()) {
+ log.debug("Executing topology synchronizer");
+ }
+
+ if(!CloudControllerConfig.getInstance().isTopologySyncEnabled()) {
+ if(log.isWarnEnabled()) {
+ log.warn("Topology synchronization is disabled");
+ }
+ return;
+ }
+
+ if(CloudControllerContext.getInstance().isTopologySyncRunning()) {
+ if(log.isWarnEnabled()) {
+ log.warn("Topology synchronization is already running");
+ }
+ return;
+ }
+
+ try {
+ // Publish complete topology event
+ if (TopologyManager.getTopology() != null) {
+ CloudControllerContext.getInstance().setTopologySyncRunning(true);
+ TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology());
+ }
+ } finally {
+ CloudControllerContext.getInstance().setTopologySyncRunning(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java
deleted file mode 100644
index aa0ee67..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.messaging.publisher;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
-import org.apache.stratos.cloud.controller.context.CloudControllerContext;
-import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager;
-import org.wso2.carbon.ntask.core.Task;
-
-import java.util.Map;
-
-public class TopologySynchronizerTask implements Task {
-
- private static final Log log = LogFactory.getLog(TopologySynchronizerTask.class);
-
- @Override
- public void execute() {
- if (log.isDebugEnabled()) {
- log.debug("Executing topology synchronization task");
- }
-
- if(!CloudControllerConfig.getInstance().isTopologySyncEnabled()) {
- if(log.isWarnEnabled()) {
- log.warn("Topology synchronization is disabled");
- }
- return;
- }
-
- if(CloudControllerContext.getInstance().isTopologySyncRunning()) {
- if(log.isWarnEnabled()) {
- log.warn("Topology synchronization is already running");
- }
- return;
- }
-
- try {
- // Publish complete topology event
- if (TopologyManager.getTopology() != null) {
- CloudControllerContext.getInstance().setTopologySyncRunning(true);
- TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology());
- }
- } finally {
- CloudControllerContext.getInstance().setTopologySyncRunning(false);
- }
- }
-
- @Override
- public void init() {
- }
-
- @Override
- public void setProperties(Map<String, String> arg0) {}
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
deleted file mode 100644
index 9575ce4..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.messaging.publisher;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
-import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.wso2.carbon.ntask.common.TaskException;
-import org.wso2.carbon.ntask.core.TaskInfo;
-import org.wso2.carbon.ntask.core.TaskManager;
-import org.wso2.carbon.ntask.core.service.TaskService;
-
-import java.util.HashMap;
-
-/**
- * Topology synchronizer task scheduler for scheduling the topology synchronizer task
- * using carbon task service.
- */
-public class TopologySynchronizerTaskScheduler {
-
- private static final Log log = LogFactory.getLog(TopologySynchronizerTaskScheduler.class);
-
- public static void schedule(TaskService taskService) {
- TaskManager taskManager = null;
- try {
-
- if (!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE)) {
- // Register task type
- taskService.registerTaskType(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
-
- // Register task
- taskManager = taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
- String cronProp = CloudControllerConfig.getInstance().
- getTopologyConfig().getProperty(CloudControllerConstants.CRON_PROPERTY);
- String cron = cronProp != null ? cronProp :CloudControllerConstants.TOPOLOGY_SYNC_CRON ;
- TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron);
-
- TaskInfo taskInfo = new TaskInfo(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME,
- TopologySynchronizerTask.class.getName(),
- new HashMap<String, String>(), triggerInfo);
- taskManager.registerTask(taskInfo);
- if(log.isInfoEnabled()) {
- log.info(String.format("Synchronization task scheduled: [task] %s [cron] %s",
- CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME, cron));
- }
- }
-
- } catch (Exception e) {
- if (taskManager != null) {
- try {
- taskManager.deleteTask(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME);
- } catch (TaskException te) {
- if (log.isErrorEnabled()) {
- log.error(te);
- }
- }
- }
-
- String msg = String.format("Could not schedule synchronization task: %s",
- CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME);
- log.error(msg, e);
- throw new RuntimeException(msg, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 4c1dbf4..00fc68d 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -28,16 +28,14 @@ import org.apache.stratos.common.services.DistributedObjectProvider;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.manager.context.StratosManagerContext;
import org.apache.stratos.manager.messaging.publisher.TenantEventPublisher;
-import org.apache.stratos.manager.messaging.publisher.synchronizer.ApplicationSignUpSynchronizerTask;
-import org.apache.stratos.manager.messaging.publisher.synchronizer.SynchronizerTaskScheduler;
-import org.apache.stratos.manager.messaging.publisher.synchronizer.TenantSynzhronizerTask;
+import org.apache.stratos.manager.messaging.publisher.synchronizer.ApplicationSignUpEventSynchronizer;
+import org.apache.stratos.manager.messaging.publisher.synchronizer.TenantEventSynchronizer;
import org.apache.stratos.manager.messaging.receiver.StratosManagerApplicationEventReceiver;
import org.apache.stratos.manager.messaging.receiver.StratosManagerInstanceStatusEventReceiver;
import org.apache.stratos.manager.messaging.receiver.StratosManagerTopologyEventReceiver;
import org.apache.stratos.manager.user.management.TenantUserRoleManager;
import org.apache.stratos.manager.user.management.exception.UserManagerException;
import org.apache.stratos.manager.utils.CartridgeConfigFileReader;
-import org.apache.stratos.manager.utils.StratosManagerConstants;
import org.apache.stratos.manager.utils.UserRoleCreator;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.util.MessagingUtil;
@@ -51,6 +49,8 @@ import org.wso2.carbon.user.core.service.RealmService;
import org.wso2.carbon.utils.ConfigurationContextService;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* @scr.component name="org.wso2.carbon.hosting.mgt.internal.StratosManagerServiceComponent"
@@ -81,18 +81,25 @@ import java.util.concurrent.ExecutorService;
public class StratosManagerServiceComponent {
private static final Log log = LogFactory.getLog(StratosManagerServiceComponent.class);
- private static final String THREAD_EXECUTOR_ID = "stratos.manager.thread.pool";
- private static final String STRATOS_MANAGER_COORDINATOR_LOCK = "STRATOS_MANAGER_COORDINATOR_LOCK";
+
+ private static final String THREAD_POOL_ID = "stratos.manager.thread.pool";
+ private static final String SCHEDULER_THREAD_POOL_ID = "stratos.manager.scheduler.thread.pool";
+ private static final String STRATOS_MANAGER_COORDINATOR_LOCK = "stratos.manager.coordinator.lock";
private static final int THREAD_POOL_SIZE = 20;
+ private static final int SCHEDULER_THREAD_POOL_SIZE = 5;
private StratosManagerTopologyEventReceiver topologyEventReceiver;
private StratosManagerInstanceStatusEventReceiver instanceStatusEventReceiver;
private StratosManagerApplicationEventReceiver applicationEventReceiver;
private ExecutorService executorService;
+ private ScheduledExecutorService scheduler;
protected void activate(final ComponentContext componentContext) throws Exception {
try {
- executorService = StratosThreadPool.getExecutorService(THREAD_EXECUTOR_ID, THREAD_POOL_SIZE);
+ executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE);
+ scheduler = StratosThreadPool.getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID,
+ SCHEDULER_THREAD_POOL_SIZE);
+
Runnable stratosManagerActivator = new Runnable() {
@Override
public void run() {
@@ -167,11 +174,9 @@ public class StratosManagerServiceComponent {
private void executeCoordinatorTasks(ComponentContext componentContext) throws UserStoreException,
UserManagerException {
- // Initialize tenant event publisher
initializeTenantEventPublisher(componentContext);
-
- // Initialize instance status receiver
initializeInstanceStatusEventReceiver();
+ registerComponentStartUpEventListeners();
// Create internal/user Role at server start-up
createInternalUserRole(componentContext);
@@ -227,20 +232,6 @@ public class StratosManagerServiceComponent {
* @param componentContext
*/
private void initializeTenantEventPublisher(ComponentContext componentContext) {
- // Schedule complete tenant event synchronizer
- if(log.isDebugEnabled()) {
- log.debug("Scheduling tenant synchronizer task...");
- }
- SynchronizerTaskScheduler.schedule(StratosManagerConstants.TENANT_SYNC_TASK_TYPE,
- StratosManagerConstants.TENANT_SYNC_TASK_NAME, TenantSynzhronizerTask.class);
-
- // Schedule complete application signup event synchronizer
- if(log.isDebugEnabled()) {
- log.debug("Scheduling application signup synchronizer task...");
- }
- SynchronizerTaskScheduler.schedule(StratosManagerConstants.APPLICATION_SIGNUP_SYNC_TASK_TYPE,
- StratosManagerConstants.APPLICATION_SIGNUP_SYNC_TASK_NAME, ApplicationSignUpSynchronizerTask.class);
-
// Register tenant event publisher
if(log.isDebugEnabled()) {
log.debug("Initializing tenant event publisher...");
@@ -252,18 +243,20 @@ public class StratosManagerServiceComponent {
if(log.isInfoEnabled()) {
log.info("Tenant event publisher initialized");
}
+ }
+ private void registerComponentStartUpEventListeners() {
ComponentStartUpSynchronizer componentStartUpSynchronizer =
ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer();
componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() {
@Override
public void activated(Component component) {
if(component == Component.StratosManager) {
- TenantSynzhronizerTask tenantSynzhronizerTask = new TenantSynzhronizerTask();
- tenantSynzhronizerTask.execute();
+ Runnable tenantSynchronizer = new TenantEventSynchronizer();
+ scheduler.scheduleAtFixedRate(tenantSynchronizer, 0, 1, TimeUnit.MINUTES);
- ApplicationSignUpSynchronizerTask applicationSignUpSynchronizerTask = new ApplicationSignUpSynchronizerTask();
- applicationSignUpSynchronizerTask.execute();
+ Runnable applicationSignUpSynchronizer = new ApplicationSignUpEventSynchronizer();
+ scheduler.scheduleAtFixedRate(applicationSignUpSynchronizer, 0, 1, TimeUnit.MINUTES);
}
}
});
@@ -343,6 +336,29 @@ public class StratosManagerServiceComponent {
EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());
- executorService.shutdownNow();
+ shutdownExecutorService(THREAD_POOL_ID);
+ shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID);
+ }
+
+ private void shutdownExecutorService(String executorServiceId) {
+ ExecutorService executorService = StratosThreadPool.getExecutorService(executorServiceId, 1);
+ if(executorService != null) {
+ shutdownExecutorService(executorService);
+ }
+ }
+
+ private void shutdownScheduledExecutorService(String executorServiceId) {
+ ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
+ if(executorService != null) {
+ shutdownExecutorService(executorService);
+ }
+ }
+
+ private void shutdownExecutorService(ExecutorService executorService) {
+ try {
+ executorService.shutdownNow();
+ } catch (Exception e) {
+ log.warn("An error occurred while shutting down executor service", e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpEventSynchronizer.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpEventSynchronizer.java
new file mode 100644
index 0000000..c64c5e8
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpEventSynchronizer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.messaging.publisher.synchronizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.components.ApplicationSignUpHandler;
+import org.apache.stratos.manager.messaging.publisher.ApplicationSignUpEventPublisher;
+import org.apache.stratos.messaging.domain.application.signup.ApplicationSignUp;
+
+import java.util.List;
+
+/**
+ * Application signup synchronizer publishes complete application signup event periodically.
+ */
+public class ApplicationSignUpEventSynchronizer implements Runnable {
+
+ private static final Log log = LogFactory.getLog(ApplicationSignUpEventSynchronizer.class);
+
+ private ApplicationSignUpHandler applicationSignUpHandler;
+
+ public ApplicationSignUpEventSynchronizer() {
+ applicationSignUpHandler = new ApplicationSignUpHandler();
+ }
+
+ @Override
+ public void run() {
+ try {
+ List<ApplicationSignUp> applicationSignUps = applicationSignUpHandler.getApplicationSignUps();
+ if((applicationSignUps != null) && (applicationSignUps.size() > 0)) {
+ log.debug("Publishing complete application signup event");
+ ApplicationSignUpEventPublisher.publishCompleteApplicationSignUpsEvent(applicationSignUps);
+ }
+ } catch (Exception e) {
+ String message = "Could not publish complete application signup event";
+ log.error(message, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpSynchronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpSynchronizerTask.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpSynchronizerTask.java
deleted file mode 100644
index adf6756..0000000
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/ApplicationSignUpSynchronizerTask.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.manager.messaging.publisher.synchronizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.components.ApplicationSignUpHandler;
-import org.apache.stratos.manager.messaging.publisher.ApplicationSignUpEventPublisher;
-import org.apache.stratos.messaging.domain.application.signup.ApplicationSignUp;
-import org.wso2.carbon.ntask.core.Task;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Application signup synchronizer task.
- */
-public class ApplicationSignUpSynchronizerTask implements Task {
-
- private static final Log log = LogFactory.getLog(ApplicationSignUpSynchronizerTask.class);
-
- private ApplicationSignUpHandler applicationSignUpHandler;
-
- public ApplicationSignUpSynchronizerTask() {
- init();
- }
-
- @Override
- public void setProperties(Map<String, String> map) {
- }
-
- @Override
- public void init() {
- applicationSignUpHandler = new ApplicationSignUpHandler();
- }
-
- @Override
- public void execute() {
- try {
- List<ApplicationSignUp> applicationSignUps = applicationSignUpHandler.getApplicationSignUps();
- if((applicationSignUps != null) && (applicationSignUps.size() > 0)) {
- ApplicationSignUpEventPublisher.publishCompleteApplicationSignUpsEvent(applicationSignUps);
- }
- } catch (Exception e) {
- String message = "Could not publish complete application signup event";
- log.error(message, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/SynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/SynchronizerTaskScheduler.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/SynchronizerTaskScheduler.java
deleted file mode 100644
index 41976cd..0000000
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/SynchronizerTaskScheduler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.manager.messaging.publisher.synchronizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.internal.ServiceReferenceHolder;
-import org.apache.stratos.manager.utils.StratosManagerConstants;
-import org.wso2.carbon.ntask.common.TaskException;
-import org.wso2.carbon.ntask.core.TaskInfo;
-import org.wso2.carbon.ntask.core.TaskManager;
-import org.wso2.carbon.ntask.core.service.TaskService;
-
-import java.util.HashMap;
-
-/**
- * Synchronizer task scheduler for scheduling tasks using carbon task service.
- */
-public class SynchronizerTaskScheduler {
-
- private static final Log log = LogFactory.getLog(SynchronizerTaskScheduler.class);
-
- public static void schedule(String taskType, String taskName, Class taskClass) {
- TaskManager taskManager = null;
- try {
- TaskService taskService = ServiceReferenceHolder.getInstance().getTaskService();
-
- if (!taskService.getRegisteredTaskTypes().contains(taskType)) {
- // Register task type
- taskService.registerTaskType(taskType);
-
- // Register task
- taskManager = taskService.getTaskManager(taskType);
- String cron = StratosManagerConstants.DEFAULT_CRON;
- TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron);
- TaskInfo taskInfo = new TaskInfo(taskName, taskClass.getName(), new HashMap<String, String>(), triggerInfo);
- taskManager.registerTask(taskInfo);
- if(log.isInfoEnabled()) {
- log.info(String.format("Synchronization task scheduled: [task] %s [cron] %s", taskName, cron));
- }
- }
- } catch (Exception e) {
- if (taskManager != null) {
- try {
- taskManager.deleteTask(taskName);
- } catch (TaskException te) {
- if (log.isErrorEnabled()) {
- log.error(te);
- }
- }
- }
- throw new RuntimeException(String.format("Could not schedule synchronization task: %s", taskName), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
new file mode 100644
index 0000000..380bf69
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.messaging.publisher.synchronizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.internal.ServiceReferenceHolder;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
+import org.apache.stratos.messaging.util.MessagingUtil;
+import org.wso2.carbon.stratos.common.beans.TenantInfoBean;
+import org.wso2.carbon.user.core.tenant.TenantManager;
+
+import java.util.*;
+
+/**
+ * Tenant event synchronizer publishes complete tenant event periodically.
+ */
+public class TenantEventSynchronizer implements Runnable {
+
+ private static final Log log = LogFactory.getLog(TenantEventSynchronizer.class);
+
+ @Override
+ public void run() {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Publishing complete tenant event"));
+ }
+ Tenant tenant;
+ List<Tenant> tenants = new ArrayList<Tenant>();
+ TenantManager tenantManager = ServiceReferenceHolder.getRealmService().getTenantManager();
+ org.wso2.carbon.user.api.Tenant[] carbonTenants = tenantManager.getAllTenants();
+ for (org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) {
+ // Create tenant
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Tenant found: [tenant-id] %d [tenant-domain] %s",
+ carbonTenant.getId(), carbonTenant.getDomain()));
+ }
+ tenant = new Tenant(carbonTenant.getId(), carbonTenant.getDomain());
+
+ if (!org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance()
+ .tenantExists(carbonTenant.getId())) {
+ // if the tenant is not already there in TenantManager,
+ // trigger TenantCreatedEvent
+ TenantInfoBean tenantBean = new TenantInfoBean();
+ tenantBean.setTenantId(carbonTenant.getId());
+ tenantBean.setTenantDomain(carbonTenant.getDomain());
+
+ // Add tenant to Tenant Manager
+ org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance()
+ .addTenant(tenant);
+ }
+ tenants.add(tenant);
+ }
+ CompleteTenantEvent event = new CompleteTenantEvent(tenants);
+ String topic = MessagingUtil.getMessageTopicName(event);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
+ eventPublisher.publish(event);
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not publish complete tenant event", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantSynzhronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantSynzhronizerTask.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantSynzhronizerTask.java
deleted file mode 100644
index f38e6e1..0000000
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantSynzhronizerTask.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.manager.messaging.publisher.synchronizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.internal.ServiceReferenceHolder;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
-import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
-import org.apache.stratos.messaging.domain.tenant.Tenant;
-import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
-import org.apache.stratos.messaging.util.MessagingUtil;
-import org.wso2.carbon.ntask.core.Task;
-import org.wso2.carbon.stratos.common.beans.TenantInfoBean;
-import org.wso2.carbon.user.core.tenant.TenantManager;
-
-import java.util.*;
-
-/**
- * Tenant synchronizer task for publishing complete tenant event periodically
- * to message broker.
- */
-public class TenantSynzhronizerTask implements Task {
-
- private static final Log log = LogFactory.getLog(TenantSynzhronizerTask.class);
-
- @Override
- public void init() {
- }
-
- @Override
- public void execute() {
- try {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Publishing complete tenant event"));
- }
- Tenant tenant;
- List<Tenant> tenants = new ArrayList<Tenant>();
- TenantManager tenantManager = ServiceReferenceHolder.getRealmService().getTenantManager();
- org.wso2.carbon.user.api.Tenant[] carbonTenants = tenantManager.getAllTenants();
- for (org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) {
- // Create tenant
- if (log.isDebugEnabled()) {
- log.debug(String.format("Tenant found: [tenant-id] %d [tenant-domain] %s",
- carbonTenant.getId(), carbonTenant.getDomain()));
- }
- tenant = new Tenant(carbonTenant.getId(), carbonTenant.getDomain());
-
- if (!org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance()
- .tenantExists(carbonTenant.getId())) {
- // if the tenant is not already there in TenantManager,
- // trigger TenantCreatedEvent
- TenantInfoBean tenantBean = new TenantInfoBean();
- tenantBean.setTenantId(carbonTenant.getId());
- tenantBean.setTenantDomain(carbonTenant.getDomain());
-
- // Add tenant to Tenant Manager
- org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance()
- .addTenant(tenant);
- }
- tenants.add(tenant);
- }
- CompleteTenantEvent event = new CompleteTenantEvent(tenants);
- String topic = MessagingUtil.getMessageTopicName(event);
- EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
- eventPublisher.publish(event);
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("Could not publish complete tenant event", e);
- }
- }
- }
-
- @Override
- public void setProperties(Map<String, String> stringStringMap) {
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf8bd349/products/stratos/modules/integration/src/test/java/org/apache/stratos/integration/tests/SampleApplicationsTest.java
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/src/test/java/org/apache/stratos/integration/tests/SampleApplicationsTest.java b/products/stratos/modules/integration/src/test/java/org/apache/stratos/integration/tests/SampleApplicationsTest.java
index 6ff13d8..08f6b91 100644
--- a/products/stratos/modules/integration/src/test/java/org/apache/stratos/integration/tests/SampleApplicationsTest.java
+++ b/products/stratos/modules/integration/src/test/java/org/apache/stratos/integration/tests/SampleApplicationsTest.java
@@ -59,7 +59,6 @@ public class SampleApplicationsTest extends StratosTestServerManager {
public void testSingleCartridgeApplication() {
try {
initializeApplicationEventReceiver();
-
runApplicationTest("single-cartridge", "single-cartridge-app");
} catch (Exception e) {
log.error(e);
@@ -117,6 +116,10 @@ public class SampleApplicationsTest extends StratosTestServerManager {
long startTime = System.currentTimeMillis();
Application application = ApplicationManager.getApplications().getApplication(applicationName);
while(!((application != null) && (application.getStatus() == ApplicationStatus.Active))) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
application = ApplicationManager.getApplications().getApplication(applicationName);
if((System.currentTimeMillis() - startTime) > APPLICATION_ACTIVATION_TIMEOUT) {
break;