You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/11/03 08:32:51 UTC
[36/50] [abbrv] git commit: adding complete application publisher task
adding complete application publisher task
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0d726c8f
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0d726c8f
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0d726c8f
Branch: refs/heads/docker-grouping-merge
Commit: 0d726c8fe2ebc43576ed60d78997df2a5f921d9f
Parents: eb523d7
Author: reka <rt...@gmail.com>
Authored: Fri Oct 31 17:00:03 2014 +0530
Committer: reka <rt...@gmail.com>
Committed: Fri Oct 31 17:00:03 2014 +0530
----------------------------------------------------------------------
.../apache/stratos/autoscaler/Constants.java | 4 +
.../ApplicationSynchronizeTask.java | 51 ++++++++++++
.../ApplicationSynchronizerTask.java | 50 -----------
.../ApplicationSynchronizerTaskScheduler.java | 87 ++++++++++++++++++++
.../topic/ApplicationsEventPublisher.java | 9 ++
.../internal/AutoscalerServerComponent.java | 41 +++++++--
.../autoscaler/util/ServiceReferenceHolder.java | 12 ++-
7 files changed, 196 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
index 3e9e5e2..ef7be70 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
@@ -34,6 +34,10 @@ public class Constants {
public static final int SCHEDULE_DEFAULT_INITIAL_DELAY = 30;
public static final int SCHEDULE_DEFAULT_PERIOD = 15;
+ public static final String APPLICATION_SYNC_CRON = "1 * * * * ? *";
+ public static final String APPLICATION_SYNC_TASK_NAME = "TOPOLOGY_SYNC_TASK";
+ public static final String APPLICATION_SYNC_TASK_TYPE = "TOPOLOGY_SYNC_TASK_TYPE";
+
public static final String AUTOSCALER_CONFIG_FILE_NAME = "autoscaler.xml";
public static final String CLOUD_CONTROLLER_SERVICE_SFX = "services/CloudControllerService";
http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
new file mode 100644
index 0000000..e18cd12
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
+import org.wso2.carbon.ntask.core.Task;
+
+import java.util.Map;
+
+public class ApplicationSynchronizeTask implements Task {
+ private static final Log log = LogFactory.getLog(ApplicationSynchronizeTask.class);
+
+ @Override
+ public void execute() {
+ if (log.isDebugEnabled()) {
+ log.debug("Executing topology synchronization task");
+ }
+ // publish to the topic
+ if (ApplicationHolder.getApplications() != null) {
+ ApplicationsEventPublisher.sendCompleteTopologyEvent(ApplicationHolder.getApplications());
+ }
+ }
+
+ @Override
+ public void init() {
+ log.info("Applications Complete Event publisher task has been started...");
+
+ }
+
+ @Override
+ public void setProperties(Map<String, String> arg0) {}
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java
deleted file mode 100644
index a8ce042..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.applications;
-
-import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager;
-import org.wso2.carbon.ntask.core.Task;
-
-import java.util.Map;
-
-public class ApplicationSynchronizerTask implements Task {
-
- @Override
- public void setProperties(Map<String, String> stringStringMap) {
-
- }
-
- @Override
- public void init() {
-
- }
-
- @Override
- public void execute() {
-
- Applications applications = ApplicationManager.getApplications();
- if (applications != null) {
- // publish complete Applications event
- ApplicationsEventPublisher.sendCompleteApplicationsEvent(applications);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
new file mode 100644
index 0000000..7d9eb1c
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.Constants;
+import org.wso2.carbon.ntask.common.TaskException;
+import org.wso2.carbon.ntask.core.TaskInfo;
+import org.wso2.carbon.ntask.core.TaskManager;
+import org.wso2.carbon.ntask.core.service.TaskService;
+
+import java.util.HashMap;
+
+/**
+ * Topology synchronizer task scheduler for scheduling the topology synchronizer task
+ * using carbon task service.
+ */
+public class ApplicationSynchronizerTaskScheduler {
+
+ private static final Log log = LogFactory.getLog(ApplicationSynchronizerTaskScheduler.class);
+
+ public static void schedule(TaskService taskService) {
+ TaskManager taskManager = null;
+ try {
+
+ //if (!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE)) {
+ // Register task type
+ taskService.registerTaskType(Constants.APPLICATION_SYNC_TASK_TYPE);
+
+ /*// Register task
+ taskManager = taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
+ String cronProp = dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_PROPERTY);
+ String cron = cronProp != null ? cronProp :CloudControllerConstants.TOPOLOGY_SYNC_CRON ;
+ TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron);
+ TaskInfo taskInfo = new TaskInfo(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME,
+ TopologySynchronizerTask.class.getName(),
+ new HashMap<String, String>(), triggerInfo);
+ taskManager.registerTask(taskInfo);*/
+
+ taskManager = taskService.getTaskManager(Constants.APPLICATION_SYNC_TASK_TYPE);
+ String cronProp = Constants.APPLICATION_SYNC_CRON;
+ TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cronProp);
+ TaskInfo taskInfo = new TaskInfo(Constants.APPLICATION_SYNC_TASK_NAME,
+ ApplicationSynchronizeTask.class.getName(),
+ new HashMap<String, String>(), triggerInfo);
+ taskManager.registerTask(taskInfo);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Topology synchronization task scheduled: %s", Constants.APPLICATION_SYNC_TASK_NAME));
+ }
+ //}
+
+ } catch (Exception e) {
+ if (taskManager != null) {
+ try {
+ taskManager.deleteTask(Constants.APPLICATION_SYNC_TASK_NAME);
+ } catch (TaskException te) {
+ if (log.isErrorEnabled()) {
+ log.error(te);
+ }
+ }
+ }
+
+ String msg = String.format("Could not schedule topology synchronization task: %s",
+ Constants.APPLICATION_SYNC_TASK_NAME);
+ log.error(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
index bbeeca4..7a1203a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
@@ -23,6 +23,15 @@ public class ApplicationsEventPublisher {
publishEvent(new CompleteApplicationsEvent(completeApplications));
}
+ public static void sendCompleteTopologyEvent(Applications applications) {
+ CompleteApplicationsEvent applicationsEvent = new CompleteApplicationsEvent(applications);
+
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Publishing complete Applications event"));
+ }
+ publishEvent(applicationsEvent);
+ }
+
public static void sendGroupCreatedEvent(String appId, String groupId) {
try {
ApplicationManager.acquireReadLockForApplication(appId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 4823057..5ada0b7 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -21,6 +21,7 @@ package org.apache.stratos.autoscaler.internal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
+import org.apache.stratos.autoscaler.applications.ApplicationSynchronizerTaskScheduler;
import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
import org.apache.stratos.autoscaler.exception.AutoScalerException;
import org.apache.stratos.autoscaler.message.receiver.health.AutoscalerHealthStatEventReceiver;
@@ -32,6 +33,7 @@ import org.apache.stratos.autoscaler.registry.RegistryManager;
import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
import org.osgi.service.component.ComponentContext;
+import org.wso2.carbon.ntask.core.service.TaskService;
import org.wso2.carbon.registry.api.RegistryException;
import org.wso2.carbon.registry.core.service.RegistryService;
@@ -41,6 +43,9 @@ import java.util.List;
/**
* @scr.component name=org.apache.stratos.autoscaler.internal.AutoscalerServerComponent"
* immediate="true"
+ * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
+ * cardinality="1..1" policy="dynamic" bind="setTaskService"
+ * unbind="unsetTaskService"
* @scr.reference name="registry.service"
* interface=
* "org.wso2.carbon.registry.core.service.RegistryService"
@@ -52,13 +57,13 @@ public class AutoscalerServerComponent {
private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
AutoscalerTopologyEventReceiver asTopologyReceiver;
-// TopicSubscriber healthStatTopicSubscriber;
+ // TopicSubscriber healthStatTopicSubscriber;
AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
protected void activate(ComponentContext componentContext) throws Exception {
try {
// Start topology receiver
- asTopologyReceiver = new AutoscalerTopologyEventReceiver();
+ asTopologyReceiver = new AutoscalerTopologyEventReceiver();
Thread topologyTopicSubscriberThread = new Thread(asTopologyReceiver);
topologyTopicSubscriberThread.start();
if (log.isDebugEnabled()) {
@@ -88,7 +93,7 @@ public class AutoscalerServerComponent {
Partition partition = partitionIterator.next();
PartitionManager.getInstance().addPartitionToInformationModel(partition);
}
-
+
// Adding the network partitions stored in registry to the information model
List<NetworkPartitionLbHolder> nwPartitionHolders = RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
Iterator<NetworkPartitionLbHolder> nwPartitionIterator = nwPartitionHolders.iterator();
@@ -96,7 +101,7 @@ public class AutoscalerServerComponent {
NetworkPartitionLbHolder nwPartition = nwPartitionIterator.next();
PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
}
-
+
List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies();
Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator();
while (asPolicyIterator.hasNext()) {
@@ -112,6 +117,14 @@ public class AutoscalerServerComponent {
}
if (log.isInfoEnabled()) {
+ log.info("Scheduling tasks to publish applications");
+ }
+
+ ApplicationSynchronizerTaskScheduler
+ .schedule(ServiceReferenceHolder.getInstance()
+ .getTaskService());
+
+ if (log.isInfoEnabled()) {
log.info("Autoscaler Server Component activated");
}
} catch (Throwable e) {
@@ -120,10 +133,10 @@ public class AutoscalerServerComponent {
}
protected void deactivate(ComponentContext context) {
- asTopologyReceiver.terminate();
- autoscalerHealthStatEventReceiver.terminate();
+ asTopologyReceiver.terminate();
+ autoscalerHealthStatEventReceiver.terminate();
}
-
+
protected void setRegistryService(RegistryService registryService) {
if (log.isDebugEnabled()) {
log.debug("Setting the Registry Service");
@@ -143,4 +156,18 @@ public class AutoscalerServerComponent {
}
ServiceReferenceHolder.getInstance().setRegistry(null);
}
+
+ protected void setTaskService(TaskService taskService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Setting the Task Service");
+ }
+ ServiceReferenceHolder.getInstance().setTaskService(taskService);
+ }
+
+ protected void unsetTaskService(TaskService taskService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Unsetting the Task Service");
+ }
+ ServiceReferenceHolder.getInstance().setTaskService(null);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
index 00629dc..9040f74 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
@@ -21,13 +21,15 @@ package org.apache.stratos.autoscaler.util;
*/
+import org.wso2.carbon.ntask.core.service.TaskService;
import org.wso2.carbon.registry.core.Registry;
import org.wso2.carbon.registry.core.session.UserRegistry;
public class ServiceReferenceHolder {
private static ServiceReferenceHolder instance;
- private Registry registry;
+ private Registry registry;
+ private TaskService taskService;
private ServiceReferenceHolder() {
}
@@ -46,4 +48,12 @@ public class ServiceReferenceHolder {
public Registry getRegistry() {
return registry;
}
+
+ public TaskService getTaskService() {
+ return taskService;
+ }
+
+ public void setTaskService(TaskService taskService) {
+ this.taskService = taskService;
+ }
}