You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/11/03 08:32:51 UTC

[36/50] [abbrv] git commit: adding complete application publisher task

adding complete application publisher task


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0d726c8f
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0d726c8f
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0d726c8f

Branch: refs/heads/docker-grouping-merge
Commit: 0d726c8fe2ebc43576ed60d78997df2a5f921d9f
Parents: eb523d7
Author: reka <rt...@gmail.com>
Authored: Fri Oct 31 17:00:03 2014 +0530
Committer: reka <rt...@gmail.com>
Committed: Fri Oct 31 17:00:03 2014 +0530

----------------------------------------------------------------------
 .../apache/stratos/autoscaler/Constants.java    |  4 +
 .../ApplicationSynchronizeTask.java             | 51 ++++++++++++
 .../ApplicationSynchronizerTask.java            | 50 -----------
 .../ApplicationSynchronizerTaskScheduler.java   | 87 ++++++++++++++++++++
 .../topic/ApplicationsEventPublisher.java       |  9 ++
 .../internal/AutoscalerServerComponent.java     | 41 +++++++--
 .../autoscaler/util/ServiceReferenceHolder.java | 12 ++-
 7 files changed, 196 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
index 3e9e5e2..ef7be70 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
@@ -34,6 +34,10 @@ public class Constants {
     public static final int SCHEDULE_DEFAULT_INITIAL_DELAY = 30;
     public static final int SCHEDULE_DEFAULT_PERIOD = 15;
 
+    public static final String APPLICATION_SYNC_CRON = "1 * * * * ? *";
+    public static final String APPLICATION_SYNC_TASK_NAME = "TOPOLOGY_SYNC_TASK";
+    public static final String APPLICATION_SYNC_TASK_TYPE = "TOPOLOGY_SYNC_TASK_TYPE";
+
     public static final String AUTOSCALER_CONFIG_FILE_NAME = "autoscaler.xml";
 
     public static final String CLOUD_CONTROLLER_SERVICE_SFX = "services/CloudControllerService";

http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
new file mode 100644
index 0000000..e18cd12
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
+import org.wso2.carbon.ntask.core.Task;
+
+import java.util.Map;
+
+public class ApplicationSynchronizeTask implements Task {
+    private static final Log log = LogFactory.getLog(ApplicationSynchronizeTask.class);
+
+    @Override
+    public void execute() {
+        if (log.isDebugEnabled()) {
+            log.debug("Executing topology synchronization task");
+        }
+        // publish to the topic
+        if (ApplicationHolder.getApplications() != null) {
+            ApplicationsEventPublisher.sendCompleteTopologyEvent(ApplicationHolder.getApplications());
+        }
+    }
+
+    @Override
+    public void init() {
+        log.info("Applications Complete Event publisher task has been started...");
+
+    }
+
+    @Override
+    public void setProperties(Map<String, String> arg0) {}
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java
deleted file mode 100644
index a8ce042..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.applications;
-
-import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
-import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager;
-import org.wso2.carbon.ntask.core.Task;
-
-import java.util.Map;
-
-public class ApplicationSynchronizerTask implements Task {
-
-    @Override
-    public void setProperties(Map<String, String> stringStringMap) {
-
-    }
-
-    @Override
-    public void init() {
-
-    }
-
-    @Override
-    public void execute() {
-
-        Applications applications = ApplicationManager.getApplications();
-        if (applications != null) {
-            // publish complete Applications event
-            ApplicationsEventPublisher.sendCompleteApplicationsEvent(applications);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
new file mode 100644
index 0000000..7d9eb1c
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.Constants;
+import org.wso2.carbon.ntask.common.TaskException;
+import org.wso2.carbon.ntask.core.TaskInfo;
+import org.wso2.carbon.ntask.core.TaskManager;
+import org.wso2.carbon.ntask.core.service.TaskService;
+
+import java.util.HashMap;
+
+/**
+ * Topology synchronizer task scheduler for scheduling the topology synchronizer task
+ * using carbon task service.
+ */
+public class ApplicationSynchronizerTaskScheduler {
+
+    private static final Log log = LogFactory.getLog(ApplicationSynchronizerTaskScheduler.class);
+
+    public static void schedule(TaskService taskService) {
+        TaskManager taskManager = null;
+        try {
+
+            //if (!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE)) {
+            // Register task type
+            taskService.registerTaskType(Constants.APPLICATION_SYNC_TASK_TYPE);
+
+                /*// Register task
+                taskManager = taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
+                String cronProp = dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_PROPERTY);
+				String cron = cronProp != null ?  cronProp :CloudControllerConstants.TOPOLOGY_SYNC_CRON ;
+                TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron);
+                TaskInfo taskInfo = new TaskInfo(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME,
+                        TopologySynchronizerTask.class.getName(),
+                        new HashMap<String, String>(), triggerInfo);
+                taskManager.registerTask(taskInfo);*/
+
+            taskManager = taskService.getTaskManager(Constants.APPLICATION_SYNC_TASK_TYPE);
+            String cronProp = Constants.APPLICATION_SYNC_CRON;
+            TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cronProp);
+            TaskInfo taskInfo = new TaskInfo(Constants.APPLICATION_SYNC_TASK_NAME,
+                    ApplicationSynchronizeTask.class.getName(),
+                    new HashMap<String, String>(), triggerInfo);
+            taskManager.registerTask(taskInfo);
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Topology synchronization task scheduled: %s", Constants.APPLICATION_SYNC_TASK_NAME));
+            }
+            //}
+
+        } catch (Exception e) {
+            if (taskManager != null) {
+                try {
+                    taskManager.deleteTask(Constants.APPLICATION_SYNC_TASK_NAME);
+                } catch (TaskException te) {
+                    if (log.isErrorEnabled()) {
+                        log.error(te);
+                    }
+                }
+            }
+
+            String msg = String.format("Could not schedule topology synchronization task: %s",
+                    Constants.APPLICATION_SYNC_TASK_NAME);
+            log.error(msg, e);
+            throw new RuntimeException(msg, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
index bbeeca4..7a1203a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
@@ -23,6 +23,15 @@ public class ApplicationsEventPublisher {
         publishEvent(new CompleteApplicationsEvent(completeApplications));
     }
 
+    public static void sendCompleteTopologyEvent(Applications applications) {
+        CompleteApplicationsEvent applicationsEvent = new CompleteApplicationsEvent(applications);
+
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Publishing complete Applications event"));
+        }
+        publishEvent(applicationsEvent);
+    }
+
     public static void sendGroupCreatedEvent(String appId, String groupId) {
         try {
             ApplicationManager.acquireReadLockForApplication(appId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 4823057..5ada0b7 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -21,6 +21,7 @@ package org.apache.stratos.autoscaler.internal;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
+import org.apache.stratos.autoscaler.applications.ApplicationSynchronizerTaskScheduler;
 import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
 import org.apache.stratos.autoscaler.exception.AutoScalerException;
 import org.apache.stratos.autoscaler.message.receiver.health.AutoscalerHealthStatEventReceiver;
@@ -32,6 +33,7 @@ import org.apache.stratos.autoscaler.registry.RegistryManager;
 import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
 import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
 import org.osgi.service.component.ComponentContext;
+import org.wso2.carbon.ntask.core.service.TaskService;
 import org.wso2.carbon.registry.api.RegistryException;
 import org.wso2.carbon.registry.core.service.RegistryService;
 
@@ -41,6 +43,9 @@ import java.util.List;
 /**
  * @scr.component name=org.apache.stratos.autoscaler.internal.AutoscalerServerComponent"
  * immediate="true"
+ * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
+ * cardinality="1..1" policy="dynamic" bind="setTaskService"
+ * unbind="unsetTaskService"
  * @scr.reference name="registry.service"
  * interface=
  * "org.wso2.carbon.registry.core.service.RegistryService"
@@ -52,13 +57,13 @@ public class AutoscalerServerComponent {
 
     private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
     AutoscalerTopologyEventReceiver asTopologyReceiver;
-//    TopicSubscriber healthStatTopicSubscriber;
+    //    TopicSubscriber healthStatTopicSubscriber;
     AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
 
     protected void activate(ComponentContext componentContext) throws Exception {
         try {
             // Start topology receiver
-        	asTopologyReceiver = new AutoscalerTopologyEventReceiver();
+            asTopologyReceiver = new AutoscalerTopologyEventReceiver();
             Thread topologyTopicSubscriberThread = new Thread(asTopologyReceiver);
             topologyTopicSubscriberThread.start();
             if (log.isDebugEnabled()) {
@@ -88,7 +93,7 @@ public class AutoscalerServerComponent {
                 Partition partition = partitionIterator.next();
                 PartitionManager.getInstance().addPartitionToInformationModel(partition);
             }
-            
+
             // Adding the network partitions stored in registry to the information model
             List<NetworkPartitionLbHolder> nwPartitionHolders = RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
             Iterator<NetworkPartitionLbHolder> nwPartitionIterator = nwPartitionHolders.iterator();
@@ -96,7 +101,7 @@ public class AutoscalerServerComponent {
                 NetworkPartitionLbHolder nwPartition = nwPartitionIterator.next();
                 PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
             }
-            
+
             List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies();
             Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator();
             while (asPolicyIterator.hasNext()) {
@@ -112,6 +117,14 @@ public class AutoscalerServerComponent {
             }
 
             if (log.isInfoEnabled()) {
+                log.info("Scheduling tasks to publish applications");
+            }
+
+            ApplicationSynchronizerTaskScheduler
+                    .schedule(ServiceReferenceHolder.getInstance()
+                            .getTaskService());
+
+            if (log.isInfoEnabled()) {
                 log.info("Autoscaler Server Component activated");
             }
         } catch (Throwable e) {
@@ -120,10 +133,10 @@ public class AutoscalerServerComponent {
     }
 
     protected void deactivate(ComponentContext context) {
-    	asTopologyReceiver.terminate();
-    	autoscalerHealthStatEventReceiver.terminate();
+        asTopologyReceiver.terminate();
+        autoscalerHealthStatEventReceiver.terminate();
     }
-    
+
     protected void setRegistryService(RegistryService registryService) {
         if (log.isDebugEnabled()) {
             log.debug("Setting the Registry Service");
@@ -143,4 +156,18 @@ public class AutoscalerServerComponent {
         }
         ServiceReferenceHolder.getInstance().setRegistry(null);
     }
+
+    protected void setTaskService(TaskService taskService) {
+        if (log.isDebugEnabled()) {
+            log.debug("Setting the Task Service");
+        }
+        ServiceReferenceHolder.getInstance().setTaskService(taskService);
+    }
+
+    protected void unsetTaskService(TaskService taskService) {
+        if (log.isDebugEnabled()) {
+            log.debug("Unsetting the Task Service");
+        }
+        ServiceReferenceHolder.getInstance().setTaskService(null);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
index 00629dc..9040f74 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
@@ -21,13 +21,15 @@ package org.apache.stratos.autoscaler.util;
 */
 
 
+import org.wso2.carbon.ntask.core.service.TaskService;
 import org.wso2.carbon.registry.core.Registry;
 import org.wso2.carbon.registry.core.session.UserRegistry;
 
 public class ServiceReferenceHolder {
 	
 	private static ServiceReferenceHolder instance;
-	private Registry registry;	
+	private Registry registry;
+    private TaskService taskService;
 
 	private ServiceReferenceHolder() {
 	}
@@ -46,4 +48,12 @@ public class ServiceReferenceHolder {
     public Registry getRegistry() {
 		return registry;
 	}
+
+    public TaskService getTaskService() {
+        return taskService;
+    }
+
+    public void setTaskService(TaskService taskService) {
+        this.taskService = taskService;
+    }
 }