You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/03 20:53:55 UTC

stratos git commit: Implementing cloud controller coordinator actions for distributed environments

Repository: stratos
Updated Branches:
  refs/heads/master 0c09f44b9 -> 5a191402a


Implementing cloud controller coordinator actions for distributed environments


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/5a191402
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/5a191402
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/5a191402

Branch: refs/heads/master
Commit: 5a191402a429ce57663444be74fdbf9052b3d2df
Parents: 0c09f44
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Dec 4 01:23:24 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 4 01:23:48 2014 +0530

----------------------------------------------------------------------
 .../context/CloudControllerContext.java         |   9 +-
 .../CloudControllerServiceComponent.java        | 106 ++++++++++++-------
 .../internal/ServiceReferenceHolder.java        |  10 ++
 3 files changed, 83 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/5a191402/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index 53e7be4..10bff88 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -145,6 +145,7 @@ public class CloudControllerContext implements Serializable {
     private boolean clustered;
 
     private transient AsyncDataPublisher dataPublisher;
+    private boolean coordinator;
 
     private CloudControllerContext() {
         // Check clustering status
@@ -580,9 +581,11 @@ public class CloudControllerContext implements Serializable {
     }
 
     public boolean isCoordinator() {
-        AxisConfiguration axisConfiguration = ServiceReferenceHolder.getInstance().getAxisConfiguration();
-        ClusteringAgent clusteringAgent = axisConfiguration.getClusteringAgent();
-        return ((axisConfiguration != null) && (clusteringAgent != null) && (clusteringAgent.isCoordinator()));
+        return coordinator;
+    }
+
+    public void setCoordinator(boolean coordinator) {
+        this.coordinator = coordinator;
     }
 
     public void persist() throws RegistryException {

http://git-wip-us.apache.org/repos/asf/stratos/blob/5a191402/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 8a5167d..ed1b0f5 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -20,12 +20,11 @@ package org.apache.stratos.cloud.controller.internal;
  *
 */
 
-import org.apache.commons.configuration.XMLConfiguration;
-
 import com.hazelcast.core.HazelcastInstance;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.context.CloudControllerContext;
 import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationTopicReceiver;
 import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
 import org.apache.stratos.cloud.controller.exception.CloudControllerException;
@@ -39,7 +38,6 @@ import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.util.Util;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.ComponentContext;
-import org.wso2.carbon.caching.impl.DistributedMapProvider;
 import org.wso2.carbon.ntask.core.service.TaskService;
 import org.wso2.carbon.registry.core.exceptions.RegistryException;
 import org.wso2.carbon.registry.core.service.RegistryService;
@@ -52,6 +50,8 @@ import java.util.concurrent.ExecutorService;
  * Registering Cloud Controller Service.
  *
  * @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
+ * @scr.reference name="hazelcast.instance.service" interface="com.hazelcast.core.HazelcastInstance"
+ *                cardinality="0..1"policy="dynamic" bind="setHazelcastInstance" unbind="unsetHazelcastInstance"
  * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.clustering.DistributedObjectProvider"
  *                cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
  * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
@@ -67,58 +67,78 @@ public class CloudControllerServiceComponent {
 	private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
 	private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
 	private ApplicationTopicReceiver applicationTopicReceiver;
-	private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
+    private ExecutorService executorService;
+
+    private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
 	private static final String DEFAULT_IDENTIFIER = "Cloud-Controller";
 	private static final String THREAD_POOL_SIZE_KEY = "threadPool.cloudcontroller.threadPoolSize";
 	private static final String COMPONENTS_CONFIG = "stratos-config";
 	private static final int THREAD_POOL_SIZE = 10;
+    private static final String CLOUD_CONTROLLER_COORDINATOR_LOCK = "CLOUD_CONTROLLER_COORDINATOR_LOCK";
 
-	protected void activate(ComponentContext context) {
+    protected void activate(ComponentContext context) {
 		try {
+			executorService = StratosThreadPool.getExecutorService(DEFAULT_IDENTIFIER, THREAD_POOL_SIZE);
 
+			// Register cloud controller service
+			BundleContext bundleContext = context.getBundleContext();
+			bundleContext.registerService(CloudControllerService.class.getName(),
+			                              new CloudControllerServiceImpl(), null);
 
+            if(CloudControllerContext.getInstance().isClustered()) {
+                Thread coordinatorElectorThread = new Thread() {
+                    @Override
+                    public void run() {
+                        ServiceReferenceHolder.getInstance().getHazelcastInstance()
+                                .getLock(CLOUD_CONTROLLER_COORDINATOR_LOCK).lock();
+
+                        log.info("Elected this member [" + ServiceReferenceHolder.getInstance().getHazelcastInstance()
+                                .getCluster().getLocalMember().getUuid() + "] " +
+                                "as the cloud controller coordinator for the cluster");
+
+                        CloudControllerContext.getInstance().setCoordinator(true);
+                        executeCoordinatorTasks();
+                    }
+                };
+                coordinatorElectorThread.setName("Cloud controller coordinator elector thread");
+                executorService.submit(coordinatorElectorThread);
+            } else {
+                executeCoordinatorTasks();
+            }
+		} catch (Throwable e) {
+			log.error("******* Cloud Controller Service bundle is failed to activate ****", e);
+        }
+    }
 
-			ExecutorService executorService = StratosThreadPool.getExecutorService(DEFAULT_IDENTIFIER, THREAD_POOL_SIZE);
-			applicationTopicReceiver = new ApplicationTopicReceiver();
-			applicationTopicReceiver.setExecutorService(executorService);
-			applicationTopicReceiver.execute();
-
-			if (log.isInfoEnabled()) {
-				log.info("Application Receiver thread started");
-			}
-
-			clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
-			clusterStatusTopicReceiver.setExecutorService(executorService);
-			clusterStatusTopicReceiver.execute();
-
-			if (log.isInfoEnabled()) {
-				log.info("Cluster status Receiver thread started");
-			}
+    private void executeCoordinatorTasks() {
+        applicationTopicReceiver = new ApplicationTopicReceiver();
+        applicationTopicReceiver.setExecutorService(executorService);
+        applicationTopicReceiver.execute();
 
-			instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
-			instanceStatusTopicReceiver.setExecutorService(executorService);
-			instanceStatusTopicReceiver.execute();
+        if (log.isInfoEnabled()) {
+            log.info("Application Receiver thread started");
+        }
 
-			if (log.isInfoEnabled()) {
-				log.info("Instance status message receiver thread started");
-			}
+        clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
+        clusterStatusTopicReceiver.setExecutorService(executorService);
+        clusterStatusTopicReceiver.execute();
 
-			// Register cloud controller service
-			BundleContext bundleContext = context.getBundleContext();
-			bundleContext.registerService(CloudControllerService.class.getName(),
-			                              new CloudControllerServiceImpl(), null);
+        if (log.isInfoEnabled()) {
+            log.info("Cluster status Receiver thread started");
+        }
 
-			if (log.isInfoEnabled()) {
-				log.info("Scheduling tasks");
-			}
+        instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
+        instanceStatusTopicReceiver.setExecutorService(executorService);
+        instanceStatusTopicReceiver.execute();
 
-			TopologySynchronizerTaskScheduler
-					.schedule(ServiceReferenceHolder.getInstance()
-					                                .getTaskService());
+        if (log.isInfoEnabled()) {
+            log.info("Instance status message receiver thread started");
+        }
 
-		} catch (Throwable e) {
-			log.error("******* Cloud Controller Service bundle is failed to activate ****", e);
+        if (log.isInfoEnabled()) {
+            log.info("Scheduling topology synchronizer task");
         }
+        TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
     }
 
     protected void setTaskService(TaskService taskService) {
@@ -166,6 +186,14 @@ public class CloudControllerServiceComponent {
         ServiceReferenceHolder.getInstance().setAxisConfiguration(null);
     }
 
+    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
+    }
+
+    public void unsetHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
+    }
+
     protected void setDistributedObjectProvider(DistributedObjectProvider distributedObjectProvider) {
         ServiceReferenceHolder.getInstance().setDistributedObjectProvider(distributedObjectProvider);
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/5a191402/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
index 9ac0a19..bbee450 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.stratos.cloud.controller.internal;
 
+import com.hazelcast.core.HazelcastInstance;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.stratos.common.clustering.DistributedObjectProvider;
 import org.wso2.carbon.ntask.core.service.TaskService;
@@ -34,6 +35,7 @@ public class ServiceReferenceHolder {
     private Registry registry;
     private AxisConfiguration axisConfiguration;
     private DistributedObjectProvider distributedObjectProvider;
+    private HazelcastInstance hazelcastInstance;
 
     private ServiceReferenceHolder() {
     }
@@ -76,4 +78,12 @@ public class ServiceReferenceHolder {
     public DistributedObjectProvider getDistributedObjectProvider() {
         return distributedObjectProvider;
     }
+
+    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
+        this.hazelcastInstance = hazelcastInstance;
+    }
+
+    public HazelcastInstance getHazelcastInstance() {
+        return hazelcastInstance;
+    }
 }