You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/03 20:53:55 UTC
stratos git commit: Implementing cloud controller coordinator actions
for distributed environments
Repository: stratos
Updated Branches:
refs/heads/master 0c09f44b9 -> 5a191402a
Implementing cloud controller coordinator actions for distributed environments
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/5a191402
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/5a191402
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/5a191402
Branch: refs/heads/master
Commit: 5a191402a429ce57663444be74fdbf9052b3d2df
Parents: 0c09f44
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Dec 4 01:23:24 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 4 01:23:48 2014 +0530
----------------------------------------------------------------------
.../context/CloudControllerContext.java | 9 +-
.../CloudControllerServiceComponent.java | 106 ++++++++++++-------
.../internal/ServiceReferenceHolder.java | 10 ++
3 files changed, 83 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/5a191402/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index 53e7be4..10bff88 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -145,6 +145,7 @@ public class CloudControllerContext implements Serializable {
private boolean clustered;
private transient AsyncDataPublisher dataPublisher;
+ private boolean coordinator;
private CloudControllerContext() {
// Check clustering status
@@ -580,9 +581,11 @@ public class CloudControllerContext implements Serializable {
}
public boolean isCoordinator() {
- AxisConfiguration axisConfiguration = ServiceReferenceHolder.getInstance().getAxisConfiguration();
- ClusteringAgent clusteringAgent = axisConfiguration.getClusteringAgent();
- return ((axisConfiguration != null) && (clusteringAgent != null) && (clusteringAgent.isCoordinator()));
+ return coordinator;
+ }
+
+ public void setCoordinator(boolean coordinator) {
+ this.coordinator = coordinator;
}
public void persist() throws RegistryException {
http://git-wip-us.apache.org/repos/asf/stratos/blob/5a191402/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 8a5167d..ed1b0f5 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -20,12 +20,11 @@ package org.apache.stratos.cloud.controller.internal;
*
*/
-import org.apache.commons.configuration.XMLConfiguration;
-
import com.hazelcast.core.HazelcastInstance;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.context.CloudControllerContext;
import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationTopicReceiver;
import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
import org.apache.stratos.cloud.controller.exception.CloudControllerException;
@@ -39,7 +38,6 @@ import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.util.Util;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
-import org.wso2.carbon.caching.impl.DistributedMapProvider;
import org.wso2.carbon.ntask.core.service.TaskService;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import org.wso2.carbon.registry.core.service.RegistryService;
@@ -52,6 +50,8 @@ import java.util.concurrent.ExecutorService;
* Registering Cloud Controller Service.
*
* @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
+ * @scr.reference name="hazelcast.instance.service" interface="com.hazelcast.core.HazelcastInstance"
+ * cardinality="0..1"policy="dynamic" bind="setHazelcastInstance" unbind="unsetHazelcastInstance"
* @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.clustering.DistributedObjectProvider"
* cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
* @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
@@ -67,58 +67,78 @@ public class CloudControllerServiceComponent {
private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
private ApplicationTopicReceiver applicationTopicReceiver;
- private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
+ private ExecutorService executorService;
+
+ private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
private static final String DEFAULT_IDENTIFIER = "Cloud-Controller";
private static final String THREAD_POOL_SIZE_KEY = "threadPool.cloudcontroller.threadPoolSize";
private static final String COMPONENTS_CONFIG = "stratos-config";
private static final int THREAD_POOL_SIZE = 10;
+ private static final String CLOUD_CONTROLLER_COORDINATOR_LOCK = "CLOUD_CONTROLLER_COORDINATOR_LOCK";
- protected void activate(ComponentContext context) {
+ protected void activate(ComponentContext context) {
try {
+ executorService = StratosThreadPool.getExecutorService(DEFAULT_IDENTIFIER, THREAD_POOL_SIZE);
+ // Register cloud controller service
+ BundleContext bundleContext = context.getBundleContext();
+ bundleContext.registerService(CloudControllerService.class.getName(),
+ new CloudControllerServiceImpl(), null);
+ if(CloudControllerContext.getInstance().isClustered()) {
+ Thread coordinatorElectorThread = new Thread() {
+ @Override
+ public void run() {
+ ServiceReferenceHolder.getInstance().getHazelcastInstance()
+ .getLock(CLOUD_CONTROLLER_COORDINATOR_LOCK).lock();
+
+ log.info("Elected this member [" + ServiceReferenceHolder.getInstance().getHazelcastInstance()
+ .getCluster().getLocalMember().getUuid() + "] " +
+ "as the cloud controller coordinator for the cluster");
+
+ CloudControllerContext.getInstance().setCoordinator(true);
+ executeCoordinatorTasks();
+ }
+ };
+ coordinatorElectorThread.setName("Cloud controller coordinator elector thread");
+ executorService.submit(coordinatorElectorThread);
+ } else {
+ executeCoordinatorTasks();
+ }
+ } catch (Throwable e) {
+ log.error("******* Cloud Controller Service bundle is failed to activate ****", e);
+ }
+ }
- ExecutorService executorService = StratosThreadPool.getExecutorService(DEFAULT_IDENTIFIER, THREAD_POOL_SIZE);
- applicationTopicReceiver = new ApplicationTopicReceiver();
- applicationTopicReceiver.setExecutorService(executorService);
- applicationTopicReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Application Receiver thread started");
- }
-
- clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
- clusterStatusTopicReceiver.setExecutorService(executorService);
- clusterStatusTopicReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Cluster status Receiver thread started");
- }
+ private void executeCoordinatorTasks() {
+ applicationTopicReceiver = new ApplicationTopicReceiver();
+ applicationTopicReceiver.setExecutorService(executorService);
+ applicationTopicReceiver.execute();
- instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
- instanceStatusTopicReceiver.setExecutorService(executorService);
- instanceStatusTopicReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Application Receiver thread started");
+ }
- if (log.isInfoEnabled()) {
- log.info("Instance status message receiver thread started");
- }
+ clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
+ clusterStatusTopicReceiver.setExecutorService(executorService);
+ clusterStatusTopicReceiver.execute();
- // Register cloud controller service
- BundleContext bundleContext = context.getBundleContext();
- bundleContext.registerService(CloudControllerService.class.getName(),
- new CloudControllerServiceImpl(), null);
+ if (log.isInfoEnabled()) {
+ log.info("Cluster status Receiver thread started");
+ }
- if (log.isInfoEnabled()) {
- log.info("Scheduling tasks");
- }
+ instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
+ instanceStatusTopicReceiver.setExecutorService(executorService);
+ instanceStatusTopicReceiver.execute();
- TopologySynchronizerTaskScheduler
- .schedule(ServiceReferenceHolder.getInstance()
- .getTaskService());
+ if (log.isInfoEnabled()) {
+ log.info("Instance status message receiver thread started");
+ }
- } catch (Throwable e) {
- log.error("******* Cloud Controller Service bundle is failed to activate ****", e);
+ if (log.isInfoEnabled()) {
+ log.info("Scheduling topology synchronizer task");
}
+ TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
}
protected void setTaskService(TaskService taskService) {
@@ -166,6 +186,14 @@ public class CloudControllerServiceComponent {
ServiceReferenceHolder.getInstance().setAxisConfiguration(null);
}
+ public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
+ ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
+ }
+
+ public void unsetHazelcastInstance(HazelcastInstance hazelcastInstance) {
+ ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
+ }
+
protected void setDistributedObjectProvider(DistributedObjectProvider distributedObjectProvider) {
ServiceReferenceHolder.getInstance().setDistributedObjectProvider(distributedObjectProvider);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/5a191402/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
index 9ac0a19..bbee450 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java
@@ -18,6 +18,7 @@
*/
package org.apache.stratos.cloud.controller.internal;
+import com.hazelcast.core.HazelcastInstance;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.stratos.common.clustering.DistributedObjectProvider;
import org.wso2.carbon.ntask.core.service.TaskService;
@@ -34,6 +35,7 @@ public class ServiceReferenceHolder {
private Registry registry;
private AxisConfiguration axisConfiguration;
private DistributedObjectProvider distributedObjectProvider;
+ private HazelcastInstance hazelcastInstance;
private ServiceReferenceHolder() {
}
@@ -76,4 +78,12 @@ public class ServiceReferenceHolder {
public DistributedObjectProvider getDistributedObjectProvider() {
return distributedObjectProvider;
}
+
+ public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
+ this.hazelcastInstance = hazelcastInstance;
+ }
+
+ public HazelcastInstance getHazelcastInstance() {
+ return hazelcastInstance;
+ }
}