You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/03 19:44:50 UTC

[1/3] stratos git commit: Adding distributed locks to cloud controller service methods

Repository: stratos
Updated Branches:
  refs/heads/master 08de72982 -> 71fab2b44


http://git-wip-us.apache.org/repos/asf/stratos/blob/71fab2b4/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 548b743..f9c55a0 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -103,34 +103,24 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             // cartridge can never be null
             cartridge = CloudControllerUtil.toCartridge(cartridgeConfig);
         } catch (Exception e) {
-            String msg =
-                    "Invalid Cartridge Definition: Cartridge Type: " +
-                            cartridgeConfig.getType() +
-                            ". Cause: Cannot instantiate a Cartridge Instance with the given Config. " + e.getMessage();
+            String msg = "Invalid cartridge definition: Cartridge type: " + cartridgeConfig.getType() +
+                         " Cause: Cannot instantiate a cartridge instance with the given configuration: " + e.getMessage();
             LOG.error(msg, e);
             throw new InvalidCartridgeDefinitionException(msg, e);
         }
 
-        List<IaasProvider> iaases = cartridge.getIaases();
+        List<IaasProvider> iaasProviders = cartridge.getIaases();
 
         if (!StratosConstants.KUBERNETES_DEPLOYER_TYPE.equals(cartridge.getDeployerType())) {
-            if (iaases == null || iaases.isEmpty()) {
-                String msg = "Invalid Cartridge Definition: Cartridge Type: "
-                        + cartridgeConfig.getType()
-                        + ". Cause: Iaases of this Cartridge is null or empty.";
-                LOG.error(msg);
-                throw new InvalidCartridgeDefinitionException(msg);
-            }
-
-            if (iaases == null || iaases.isEmpty()) {
-                String msg = "Invalid Cartridge Definition: Cartridge Type: " +
+            if (iaasProviders == null || iaasProviders.isEmpty()) {
+                String msg = "Invalid cartridge definition: Cartridge type: " +
                         cartridgeConfig.getType() +
-                        ". Cause: Iaases of this Cartridge is null or empty.";
+                        " Cause: Iaases of this cartridge is null or empty";
                 LOG.error(msg);
                 throw new InvalidCartridgeDefinitionException(msg);
             }
 
-            for (IaasProvider iaasProvider : iaases) {
+            for (IaasProvider iaasProvider : iaasProviders) {
                 CloudControllerUtil.getIaas(iaasProvider);
             }
         }
@@ -148,7 +138,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             populateNewCartridge(cartridge, cartridgeToBeRemoved);
         }
 
-        cloudControllerContext.addCartridge(cartridge);
+        CloudControllerContext.getInstance().addCartridge(cartridge);
 
         // persist
         persist();
@@ -188,10 +178,10 @@ public class CloudControllerServiceImpl implements CloudControllerService {
     public void undeployCartridgeDefinition(String cartridgeType) throws InvalidCartridgeTypeException {
 
         Cartridge cartridge = null;
-        if ((cartridge = cloudControllerContext.getCartridge(cartridgeType)) != null) {
-            if (cloudControllerContext.getCartridges().remove(cartridge)) {
+        if ((cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType)) != null) {
+            if (CloudControllerContext.getInstance().getCartridges().remove(cartridge)) {
                 // invalidate partition validation cache
-                cloudControllerContext.removeFromCartridgeTypeToPartitionIds(cartridgeType);
+                CloudControllerContext.getInstance().removeFromCartridgeTypeToPartitionIds(cartridgeType);
 
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Partition cache invalidated for cartridge " + cartridgeType);
@@ -261,7 +251,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             }
         }
 
-        cloudControllerContext.addServiceGroup(servicegroup);
+        CloudControllerContext.getInstance().addServiceGroup(servicegroup);
 
         this.persist();
 
@@ -274,10 +264,10 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 
         ServiceGroup serviceGroup = null;
 
-        serviceGroup = cloudControllerContext.getServiceGroup(name);
+        serviceGroup = CloudControllerContext.getInstance().getServiceGroup(name);
 
         if (serviceGroup != null) {
-            if (cloudControllerContext.getServiceGroups().remove(serviceGroup)) {
+            if (CloudControllerContext.getInstance().getServiceGroups().remove(serviceGroup)) {
                 persist();
                 if (LOG.isInfoEnabled()) {
                     LOG.info("Successfully undeployed the Service Group definition: " + serviceGroup);
@@ -299,7 +289,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             LOG.debug("getServiceGroupDefinition:" + name);
         }
 
-        ServiceGroup serviceGroup = this.cloudControllerContext.getServiceGroup(name);
+        ServiceGroup serviceGroup = CloudControllerContext.getInstance().getServiceGroup(name);
 
         if (serviceGroup == null) {
             if (LOG.isDebugEnabled()) {
@@ -365,13 +355,13 @@ public class CloudControllerServiceImpl implements CloudControllerService {
                 memberContext);
 
         String partitionId = partition.getId();
-        ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
+        ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
 
         handleNullObject(ctxt, "Instance start-up failed. Invalid cluster id. " + memberContext);
 
         String cartridgeType = ctxt.getCartridgeType();
 
-        Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+        Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
 
         if (cartridge == null) {
             String msg =
@@ -552,7 +542,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
      */
     private void persist() {
         try {
-            cloudControllerContext.persist();
+            CloudControllerContext.getInstance().persist();
         } catch (RegistryException e) {
             String msg = "Failed to persist the cloud controller context in registry.";
             LOG.fatal(msg);
@@ -570,7 +560,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 
         handleNullObject(memberId, "Termination failed. Null member id.");
 
-        MemberContext ctxt = cloudControllerContext.getMemberContextOfMemberId(memberId);
+        MemberContext ctxt = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId);
 
         if (ctxt == null) {
             String msg = "Termination failed. Invalid Member Id: " + memberId;
@@ -717,24 +707,23 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 
         @Override
         public void run() {
-
             String memberId = ctxt.getMemberId();
             String clusterId = ctxt.getClusterId();
             String partitionId = ctxt.getPartition().getId();
             String cartridgeType = ctxt.getCartridgeType();
             String nodeId = ctxt.getNodeId();
 
+            Lock lock = null;
             try {
-                // these will never be null, since we do not add null values for these.
-                Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+                CloudControllerContext.getInstance().acquireMemberContextWriteLock();
 
+                Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
                 LOG.info("Starting to terminate an instance with member id : " + memberId +
                         " in partition id: " + partitionId + " of cluster id: " + clusterId +
                         " and of cartridge type: " + cartridgeType);
 
                 if (cartridge == null) {
-                    String msg =
-                            "Termination of Member Id: " + memberId + " failed. " +
+                    String msg = "Termination of Member Id: " + memberId + " failed. " +
                                     "Cannot find a matching Cartridge for type: " +
                                     cartridgeType;
                     LOG.error(msg);
@@ -743,10 +732,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 
                 // if no matching node id can be found.
                 if (nodeId == null) {
-
-                    String msg =
-                            "Termination failed. Cannot find a node id for Member Id: " +
-                                    memberId;
+                    String msg = "Termination failed. Cannot find a node id for Member Id: " + memberId;
 
                     // log information
                     logTermination(ctxt);
@@ -761,14 +747,15 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 
                 // log information
                 logTermination(ctxt);
-
             } catch (Exception e) {
-                String msg =
-                        "Instance termination failed. " + ctxt.toString();
+                String msg = "Instance termination failed. " + ctxt.toString();
                 LOG.error(msg, e);
                 throw new CloudControllerException(msg, e);
+            } finally {
+                if(lock != null) {
+                    CloudControllerContext.getInstance().releaseWriteLock(lock);
+                }
             }
-
         }
     }
 
@@ -787,117 +774,127 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 
         @Override
         public void run() {
+            Lock lock = null;
+            try {
+                lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock();
 
+                String clusterId = memberContext.getClusterId();
+                Partition partition = memberContext.getPartition();
+                ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
+                Iaas iaas = iaasProvider.getIaas();
+                String publicIp = null;
 
-            String clusterId = memberContext.getClusterId();
-            Partition partition = memberContext.getPartition();
-            ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
-            Iaas iaas = iaasProvider.getIaas();
-            String publicIp = null;
-
-            NodeMetadata node = null;
-            // generate the group id from domain name and sub domain name.
-            // Should have lower-case ASCII letters, numbers, or dashes.
-            // Should have a length between 3-15
-            String str = clusterId.length() > 10 ? clusterId.substring(0, 10) : clusterId.substring(0, clusterId.length());
-            String group = str.replaceAll("[^a-z0-9-]", "");
+                NodeMetadata node = null;
+                // generate the group id from domain name and sub domain name.
+                // Should have lower-case ASCII letters, numbers, or dashes.
+                // Should have a length between 3-15
+                String str = clusterId.length() > 10 ? clusterId.substring(0, 10) : clusterId.substring(0, clusterId.length());
+                String group = str.replaceAll("[^a-z0-9-]", "");
 
-            try {
-                ComputeService computeService = iaasProvider
-                        .getComputeService();
-                Template template = iaasProvider.getTemplate();
+                try {
+                    ComputeService computeService = iaasProvider.getComputeService();
+                    Template template = iaasProvider.getTemplate();
 
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Cloud Controller is delegating request to start an instance for "
-                            + memberContext + " to Jclouds layer.");
-                }
-                // create and start a node
-                Set<? extends NodeMetadata> nodes = computeService
-                        .createNodesInGroup(group, 1, template);
-                node = nodes.iterator().next();
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Cloud Controller received a response for the request to start "
-                            + memberContext + " from Jclouds layer.");
-                }
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Cloud Controller is delegating request to start an instance for "
+                                + memberContext + " to Jclouds layer.");
+                    }
+                    // create and start a node
+                    Set<? extends NodeMetadata> nodes = computeService
+                            .createNodesInGroup(group, 1, template);
+                    node = nodes.iterator().next();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Cloud Controller received a response for the request to start "
+                                + memberContext + " from Jclouds layer.");
+                    }
 
-                if (node == null) {
-                    String msg = "Null response received for instance start-up request to Jclouds.\n"
-                            + memberContext.toString();
-                    LOG.error(msg);
-                    throw new IllegalStateException(msg);
-                }
+                    if (node == null) {
+                        String msg = "Null response received for instance start-up request to Jclouds.\n"
+                                + memberContext.toString();
+                        LOG.error(msg);
+                        throw new IllegalStateException(msg);
+                    }
 
-                // node id
-                String nodeId = node.getId();
-                if (nodeId == null) {
-                    String msg = "Node id of the starting instance is null.\n"
-                            + memberContext.toString();
-                    LOG.fatal(msg);
-                    throw new IllegalStateException(msg);
-                }
+                    // node id
+                    String nodeId = node.getId();
+                    if (nodeId == null) {
+                        String msg = "Node id of the starting instance is null.\n"
+                                + memberContext.toString();
+                        LOG.fatal(msg);
+                        throw new IllegalStateException(msg);
+                    }
 
-                memberContext.setNodeId(nodeId);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Node id was set. " + memberContext.toString());
-                }
+                    memberContext.setNodeId(nodeId);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Node id was set. " + memberContext.toString());
+                    }
 
-                // attach volumes
-                if (ctxt.isVolumeRequired()) {
-                    // remove region prefix
-                    String instanceId = nodeId.indexOf('/') != -1 ? nodeId
-                            .substring(nodeId.indexOf('/') + 1, nodeId.length())
-                            : nodeId;
-                    memberContext.setInstanceId(instanceId);
-                    if (ctxt.getVolumes() != null) {
-                        for (Volume volume : ctxt.getVolumes()) {
-                            try {
-                                iaas.attachVolume(instanceId, volume.getId(),
-                                        volume.getDevice());
-                            } catch (Exception e) {
-                                // continue without throwing an exception, since
-                                // there is an instance already running
-                                LOG.error("Attaching Volume to Instance [ "
-                                        + instanceId + " ] failed!", e);
+                    // attach volumes
+                    if (ctxt.isVolumeRequired()) {
+                        // remove region prefix
+                        String instanceId = nodeId.indexOf('/') != -1 ? nodeId
+                                .substring(nodeId.indexOf('/') + 1, nodeId.length())
+                                : nodeId;
+                        memberContext.setInstanceId(instanceId);
+                        if (ctxt.getVolumes() != null) {
+                            for (Volume volume : ctxt.getVolumes()) {
+                                try {
+                                    iaas.attachVolume(instanceId, volume.getId(),
+                                            volume.getDevice());
+                                } catch (Exception e) {
+                                    // continue without throwing an exception, since
+                                    // there is an instance already running
+                                    LOG.error("Attaching Volume to Instance [ "
+                                            + instanceId + " ] failed!", e);
+                                }
                             }
                         }
                     }
-                }
-
-            } catch (Exception e) {
-                String msg = "Failed to start an instance. " + memberContext.toString() + " Cause: " + e.getMessage();
-                LOG.error(msg, e);
-                throw new IllegalStateException(msg, e);
-            }
 
-            try {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("IP allocation process started for " + memberContext);
+                } catch (Exception e) {
+                    String msg = "Failed to start an instance. " + memberContext.toString() + " Cause: " + e.getMessage();
+                    LOG.error(msg, e);
+                    throw new IllegalStateException(msg, e);
                 }
-                String autoAssignIpProp =
-                        iaasProvider.getProperty(CloudControllerConstants.AUTO_ASSIGN_IP_PROPERTY);
 
-                String pre_defined_ip =
-                        iaasProvider.getProperty(CloudControllerConstants.FLOATING_IP_PROPERTY);
+                try {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("IP allocation process started for " + memberContext);
+                    }
+                    String autoAssignIpProp =
+                            iaasProvider.getProperty(CloudControllerConstants.AUTO_ASSIGN_IP_PROPERTY);
 
-                // reset ip
-                String ip = "";
+                    String pre_defined_ip =
+                            iaasProvider.getProperty(CloudControllerConstants.FLOATING_IP_PROPERTY);
 
-                // default behavior is autoIpAssign=false
-                if (autoAssignIpProp == null ||
-                        (autoAssignIpProp != null && autoAssignIpProp.equals("false"))) {
+                    // reset ip
+                    String ip = "";
 
-                    // check if floating ip is well defined in cartridge definition
-                    if (pre_defined_ip != null) {
-                        if (isValidIpAddress(pre_defined_ip)) {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("CloudControllerServiceImpl:IpAllocator:pre_defined_ip: invoking associatePredefinedAddress" + pre_defined_ip);
-                            }
-                            ip = iaas.associatePredefinedAddress(node, pre_defined_ip);
+                    // default behavior is autoIpAssign=false
+                    if (autoAssignIpProp == null ||
+                            (autoAssignIpProp != null && autoAssignIpProp.equals("false"))) {
 
-                            if (ip == null || "".equals(ip) || !pre_defined_ip.equals(ip)) {
-                                // throw exception and stop instance creation
-                                String msg = "Error occurred while allocating predefined floating ip address: " + pre_defined_ip +
-                                        " / allocated ip:" + ip +
+                        // check if floating ip is well defined in cartridge definition
+                        if (pre_defined_ip != null) {
+                            if (isValidIpAddress(pre_defined_ip)) {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("CloudControllerServiceImpl:IpAllocator:pre_defined_ip: invoking associatePredefinedAddress" + pre_defined_ip);
+                                }
+                                ip = iaas.associatePredefinedAddress(node, pre_defined_ip);
+
+                                if (ip == null || "".equals(ip) || !pre_defined_ip.equals(ip)) {
+                                    // throw exception and stop instance creation
+                                    String msg = "Error occurred while allocating predefined floating ip address: " + pre_defined_ip +
+                                            " / allocated ip:" + ip +
+                                            " - terminating node:" + memberContext.toString();
+                                    LOG.error(msg);
+                                    // terminate instance
+                                    terminate(iaasProvider,
+                                            node.getId(), memberContext);
+                                    throw new CloudControllerException(msg);
+                                }
+                            } else {
+                                String msg = "Invalid floating ip address configured: " + pre_defined_ip +
                                         " - terminating node:" + memberContext.toString();
                                 LOG.error(msg);
                                 // terminate instance
@@ -905,111 +902,105 @@ public class CloudControllerServiceImpl implements CloudControllerService {
                                         node.getId(), memberContext);
                                 throw new CloudControllerException(msg);
                             }
+
                         } else {
-                            String msg = "Invalid floating ip address configured: " + pre_defined_ip +
-                                    " - terminating node:" + memberContext.toString();
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("CloudControllerServiceImpl:IpAllocator:no (valid) predefined floating ip configured, "
+                                        + "selecting available one from pool");
+                            }
+                            // allocate an IP address - manual IP assigning mode
+                            ip = iaas.associateAddress(node);
+
+                            if (ip != null) {
+                                memberContext.setAllocatedIpAddress(ip);
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Allocated an ip address: "
+                                            + memberContext.toString());
+                                } else if (LOG.isInfoEnabled()) {
+                                    LOG.info("Allocated ip address [ " + memberContext.getAllocatedIpAddress() +
+                                            " ] to member with id: " + memberContext.getMemberId());
+                                }
+                            }
+                        }
+
+                        if (ip == null) {
+                            String msg = "No IP address found. IP allocation failed for " + memberContext;
                             LOG.error(msg);
-                            // terminate instance
-                            terminate(iaasProvider,
-                                    node.getId(), memberContext);
                             throw new CloudControllerException(msg);
                         }
 
-                    } else {
+                        // build the node with the new ip
+                        node = NodeMetadataBuilder.fromNodeMetadata(node)
+                                .publicAddresses(ImmutableSet.of(ip)).build();
+                    }
+
+
+                    // public ip
+                    if (node.getPublicAddresses() != null &&
+                            node.getPublicAddresses().iterator().hasNext()) {
+                        ip = node.getPublicAddresses().iterator().next();
+                        publicIp = ip;
+                        memberContext.setPublicIpAddress(ip);
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("CloudControllerServiceImpl:IpAllocator:no (valid) predefined floating ip configured, "
-                                    + "selecting available one from pool");
+                            LOG.debug("Retrieving Public IP Address : " + memberContext.toString());
+                        } else if (LOG.isInfoEnabled()) {
+                            LOG.info("Retrieving Public IP Address: " + memberContext.getPublicIpAddress() +
+                                    ", member id: " + memberContext.getMemberId());
                         }
-                        // allocate an IP address - manual IP assigning mode
-                        ip = iaas.associateAddress(node);
+                    }
 
-                        if (ip != null) {
-                            memberContext.setAllocatedIpAddress(ip);
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Allocated an ip address: "
-                                        + memberContext.toString());
-                            } else if (LOG.isInfoEnabled()) {
-                                LOG.info("Allocated ip address [ " + memberContext.getAllocatedIpAddress() +
-                                       " ] to member with id: " + memberContext.getMemberId());
-                            }
+                    // private IP
+                    if (node.getPrivateAddresses() != null &&
+                            node.getPrivateAddresses().iterator().hasNext()) {
+                        ip = node.getPrivateAddresses().iterator().next();
+                        memberContext.setPrivateIpAddress(ip);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Retrieving Private IP Address. " + memberContext.toString());
+                        } else if (LOG.isInfoEnabled()) {
+                            LOG.info("Retrieving Private IP Address: " + memberContext.getPrivateIpAddress() +
+                                    ", member id: " + memberContext.getMemberId());
                         }
                     }
 
-                    if (ip == null) {
-                        String msg = "No IP address found. IP allocation failed for " + memberContext;
-                        LOG.error(msg);
-                        throw new CloudControllerException(msg);
-                    }
+                    CloudControllerContext.getInstance().addMemberContext(memberContext);
 
-                    // build the node with the new ip
-                    node = NodeMetadataBuilder.fromNodeMetadata(node)
-                            .publicAddresses(ImmutableSet.of(ip)).build();
-                }
+                    // persist in registry
+                    persist();
+
+
+                    // trigger topology
+                    TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId,
+                            partition.getId(), ip, publicIp, memberContext);
 
+                    String memberID = memberContext.getMemberId();
 
-                // public ip
-                if (node.getPublicAddresses() != null &&
-                        node.getPublicAddresses().iterator().hasNext()) {
-                    ip = node.getPublicAddresses().iterator().next();
-                    publicIp = ip;
-                    memberContext.setPublicIpAddress(ip);
+                    // update the topology with the newly spawned member
+                    // publish data
+                    CartridgeInstanceDataPublisher.publish(memberID,
+                            memberContext.getPartition().getId(),
+                            memberContext.getNetworkPartitionId(),
+                            memberContext.getClusterId(),
+                            cartridgeType,
+                            MemberStatus.Created.toString(),
+                            node);
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("Retrieving Public IP Address : " + memberContext.toString());
-                    } else if (LOG.isInfoEnabled()) {
-                        LOG.info("Retrieving Public IP Address: " + memberContext.getPublicIpAddress() +
-                                ", member id: " + memberContext.getMemberId());
+                        LOG.debug("Node details: " + node.toString());
                     }
-                }
 
-                // private IP
-                if (node.getPrivateAddresses() != null &&
-                        node.getPrivateAddresses().iterator().hasNext()) {
-                    ip = node.getPrivateAddresses().iterator().next();
-                    memberContext.setPrivateIpAddress(ip);
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("Retrieving Private IP Address. " + memberContext.toString());
-                    } else if (LOG.isInfoEnabled()) {
-                        LOG.info("Retrieving Private IP Address: " + memberContext.getPrivateIpAddress() +
-                            ", member id: " +  memberContext.getMemberId());
+                        LOG.debug("IP allocation process ended for " + memberContext);
                     }
-                }
-
-                cloudControllerContext.addMemberContext(memberContext);
-
-                // persist in registry
-                persist();
-
-
-                // trigger topology
-                TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId,
-                        partition.getId(), ip, publicIp, memberContext);
 
-                String memberID = memberContext.getMemberId();
-
-                // update the topology with the newly spawned member
-                // publish data
-                CartridgeInstanceDataPublisher.publish(memberID,
-                        memberContext.getPartition().getId(),
-                        memberContext.getNetworkPartitionId(),
-                        memberContext.getClusterId(),
-                        cartridgeType,
-                        MemberStatus.Created.toString(),
-                        node);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Node details: " + node.toString());
+                } catch (Exception e) {
+                    String msg = "Error occurred while allocating an ip address. " + memberContext.toString();
+                    LOG.error(msg, e);
+                    throw new CloudControllerException(msg, e);
                 }
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("IP allocation process ended for " + memberContext);
+            } finally {
+                if(lock != null) {
+                    CloudControllerContext.getInstance().releaseWriteLock(lock);
                 }
-
-            } catch (Exception e) {
-                String msg = "Error occurred while allocating an ip address. " + memberContext.toString();
-                LOG.error(msg, e);
-                throw new CloudControllerException(msg, e);
             }
-
-
         }
     }
 
@@ -1026,7 +1017,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 
         handleNullObject(clusterId, "Instance termination failed. Cluster id is null.");
 
-        List<MemberContext> ctxts = cloudControllerContext.getMemberContextsOfClusterId(clusterId);
+        List<MemberContext> ctxts = CloudControllerContext.getInstance().getMemberContextsOfClusterId(clusterId);
 
         if (ctxts == null) {
             String msg = "Instance termination failed. No members found for cluster id: " + clusterId;
@@ -1088,7 +1079,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 
     private void detachVolume(IaasProvider iaasProvider, MemberContext ctxt) {
         String clusterId = ctxt.getClusterId();
-        ClusterContext clusterCtxt = cloudControllerContext.getClusterContext(clusterId);
+        ClusterContext clusterCtxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
         if (clusterCtxt.getVolumes() != null) {
             for (Volume volume : clusterCtxt.getVolumes()) {
                 try {
@@ -1130,11 +1121,10 @@ public class CloudControllerServiceImpl implements CloudControllerService {
                 null);
 
         // update data holders
-        cloudControllerContext.removeMemberContext(memberContext.getMemberId(), memberContext.getClusterId());
+        CloudControllerContext.getInstance().removeMemberContext(memberContext.getMemberId(), memberContext.getClusterId());
 
         // persist
         persist();
-
     }
 
     @Override
@@ -1154,7 +1144,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
         handleNullObject(hostName, "Service registration failed. Hostname is null.");
 
         Cartridge cartridge = null;
-        if ((cartridge = cloudControllerContext.getCartridge(cartridgeType)) == null) {
+        if ((cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType)) == null) {
 
             String msg = "Registration of cluster: " + clusterId +
                     " failed. - Unregistered Cartridge type: " + cartridgeType;
@@ -1171,7 +1161,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
         payload, hostName, props, isLb, registrant.getPersistence());
 
 
-        cloudControllerContext.addClusterContext(ctxt);*/
+        CloudControllerContext.getInstance().addClusterContext(ctxt);*/
         TopologyBuilder.handleClusterCreated(registrant, isLb);
 
         persist();
@@ -1213,8 +1203,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
     @Override
     public String[] getRegisteredCartridges() {
         // get the list of cartridges registered
-        List<Cartridge> cartridges = cloudControllerContext
-                .getCartridges();
+        Collection<Cartridge> cartridges = CloudControllerContext.getInstance().getCartridges();
 
         if (cartridges == null) {
             LOG.info("No registered Cartridge found.");
@@ -1241,7 +1230,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
     @Override
     public CartridgeInfo getCartridgeInfo(String cartridgeType)
             throws UnregisteredCartridgeException {
-        Cartridge cartridge = cloudControllerContext
+        Cartridge cartridge = CloudControllerContext.getInstance()
                 .getCartridge(cartridgeType);
 
         if (cartridge != null) {
@@ -1260,13 +1249,13 @@ public class CloudControllerServiceImpl implements CloudControllerService {
     public void unregisterService(String clusterId) throws UnregisteredClusterException {
         final String clusterId_ = clusterId;
 
-        ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_);
+        ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId_);
 
         handleNullObject(ctxt, "Service unregistration failed. Invalid cluster id: " + clusterId);
 
         String cartridgeType = ctxt.getCartridgeType();
 
-        Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+        Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
 
         if (cartridge == null) {
             String msg =
@@ -1281,12 +1270,12 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 
         } else {
 
-//	        TopologyBuilder.handleClusterMaintenanceMode(cloudControllerContext.getClusterContext(clusterId_));
+//	        TopologyBuilder.handleClusterMaintenanceMode(CloudControllerContext.getInstance().getClusterContext(clusterId_));
 
             Runnable terminateInTimeout = new Runnable() {
                 @Override
                 public void run() {
-                    ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_);
+                    ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId_);
                     if (ctxt == null) {
                         String msg = "Service unregistration failed. Cluster not found: " + clusterId_;
                         LOG.error(msg);
@@ -1325,52 +1314,69 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             };
             Runnable unregister = new Runnable() {
                 public void run() {
-                    ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_);
-                    if (ctxt == null) {
-                        String msg = "Service unregistration failed. Cluster not found: " + clusterId_;
-                        LOG.error(msg);
-                        return;
-                    }
-                    Collection<Member> members = TopologyManager.getTopology().
-                            getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers();
-                    // TODO why end time is needed?
-                    // long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * members.size();
+                    Lock lock = null;
+                    try {
+                        lock = CloudControllerContext.getInstance().acquireClusterContextWriteLock();
+                        ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId_);
+                        if (ctxt == null) {
+                            String msg = "Service unregistration failed. Cluster not found: " + clusterId_;
+                            LOG.error(msg);
+                            return;
+                        }
+                        Collection<Member> members = TopologyManager.getTopology().
+                                getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers();
+                        // TODO why end time is needed?
+                        // long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * members.size();
+
+                        while (members.size() > 0) {
+                            //waiting until all the members got removed from the Topology/ timed out
+                            CloudControllerUtil.sleep(1000);
+                        }
 
-                    while (members.size() > 0) {
-                        //waiting until all the members got removed from the Topology/ timed out
-                        CloudControllerUtil.sleep(1000);
+                        LOG.info("Unregistration of service cluster: " + clusterId_);
+                        deleteVolumes(ctxt);
+                        onClusterRemoval(clusterId_);
+                    } finally {
+                        if(lock != null) {
+                            CloudControllerContext.getInstance().releaseWriteLock(lock);
+                        }
                     }
-
-                    LOG.info("Unregistration of service cluster: " + clusterId_);
-                    deleteVolumes(ctxt);
-                    onClusterRemoval(clusterId_);
                 }
 
                 private void deleteVolumes(ClusterContext ctxt) {
                     if (ctxt.isVolumeRequired()) {
-                        Cartridge cartridge = cloudControllerContext.getCartridge(ctxt.getCartridgeType());
-                        if (cartridge != null && cartridge.getIaases() != null && ctxt.getVolumes() != null) {
-                            for (Volume volume : ctxt.getVolumes()) {
-                                if (volume.getId() != null) {
-                                    String iaasType = volume.getIaasType();
-                                    //Iaas iaas = cloudControllerContext.getIaasProvider(iaasType).getIaas();
-                                    Iaas iaas = cartridge.getIaasProvider(iaasType).getIaas();
-                                    if (iaas != null) {
-                                        try {
-                                            // delete the volumes if remove on unsubscription is true.
-                                            if (volume.isRemoveOntermination()) {
-                                                iaas.deleteVolume(volume.getId());
-                                                volume.setId(null);
-                                            }
-                                        } catch (Exception ignore) {
-                                            if (LOG.isErrorEnabled()) {
-                                                LOG.error("Error while deleting volume [id] " + volume.getId(), ignore);
+                        Lock lock = null;
+                        try {
+                            lock = CloudControllerContext.getInstance().acquireCartridgesWriteLock();
+
+                            Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(ctxt.getCartridgeType());
+                            if (cartridge != null && cartridge.getIaases() != null && ctxt.getVolumes() != null) {
+                                for (Volume volume : ctxt.getVolumes()) {
+                                    if (volume.getId() != null) {
+                                        String iaasType = volume.getIaasType();
+                                        //Iaas iaas = CloudControllerContext.getInstance().getIaasProvider(iaasType).getIaas();
+                                        Iaas iaas = cartridge.getIaasProvider(iaasType).getIaas();
+                                        if (iaas != null) {
+                                            try {
+                                                // delete the volumes if remove on unsubscription is true.
+                                                if (volume.isRemoveOntermination()) {
+                                                    iaas.deleteVolume(volume.getId());
+                                                    volume.setId(null);
+                                                }
+                                            } catch (Exception ignore) {
+                                                if (LOG.isErrorEnabled()) {
+                                                    LOG.error("Error while deleting volume [id] " + volume.getId(), ignore);
+                                                }
                                             }
                                         }
                                     }
                                 }
+                                CloudControllerContext.getInstance().updateCartridge(cartridge);
+                            }
+                        } finally {
+                            if(lock != null) {
+                                CloudControllerContext.getInstance().releaseWriteLock(lock);
                             }
-
                         }
                     }
                 }
@@ -1383,104 +1389,120 @@ public class CloudControllerServiceImpl implements CloudControllerService {
     @Override
     public void unregisterDockerService(String clusterId)
             throws UnregisteredClusterException {
-
-        // terminate all kubernetes units
+        Lock lock = null;
         try {
-            terminateAllContainers(clusterId);
-        } catch (InvalidClusterException e) {
-            String msg = "Docker instance termination fails for cluster: " + clusterId;
-            LOG.error(msg, e);
-            throw new UnregisteredClusterException(msg, e);
+            lock = CloudControllerContext.getInstance().acquireClusterContextWriteLock();
+            // terminate all kubernetes units
+            try {
+                terminateAllContainers(clusterId);
+            } catch (InvalidClusterException e) {
+                String msg = "Docker instance termination fails for cluster: " + clusterId;
+                LOG.error(msg, e);
+                throw new UnregisteredClusterException(msg, e);
+            }
+            // send cluster removal notifications and update the state
+            onClusterRemoval(clusterId);
+        } finally {
+            if(lock != null) {
+                CloudControllerContext.getInstance().releaseWriteLock(lock);
+            }
         }
-        // send cluster removal notifications and update the state
-        onClusterRemoval(clusterId);
     }
 
-
+    /***
+     * FIXME: A validate method shouldn't persist any data
+     */
     @Override
     public boolean validateDeploymentPolicy(String cartridgeType, Partition[] partitions)
             throws InvalidPartitionException, InvalidCartridgeTypeException {
 
-        List<String> validatedPartitions = CloudControllerContext.getInstance().getPartitionIds(cartridgeType);
-        if (validatedPartitions != null) {
-            // cache hit for this cartridge
-            // get list of partitions
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Partition validation cache hit for cartridge type: " + cartridgeType);
-            }
-        }
-
-        Map<String, IaasProvider> partitionToIaasProviders =
-                new ConcurrentHashMap<String, IaasProvider>();
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Deployment policy validation started for cartridge type: " + cartridgeType);
-        }
-
-        Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+        Lock lock = null;
+        try {
+            lock = CloudControllerContext.getInstance().acquireCartridgesWriteLock();
 
-        if (cartridge == null) {
-            String msg = "Invalid Cartridge Type: " + cartridgeType;
-            LOG.error(msg);
-            throw new InvalidCartridgeTypeException(msg);
-        }
+            List<String> validatedPartitions = CloudControllerContext.getInstance().getPartitionIds(cartridgeType);
+            if (validatedPartitions != null) {
+                // cache hit for this cartridge
+                // get list of partitions
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Partition validation cache hit for cartridge type: " + cartridgeType);
+                }
+            }
 
-        Map<String, Future<IaasProvider>> jobList = new HashMap<String, Future<IaasProvider>>();
+            Map<String, IaasProvider> partitionToIaasProviders =
+                    new ConcurrentHashMap<String, IaasProvider>();
 
-        for (Partition partition : partitions) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Deployment policy validation started for cartridge type: " + cartridgeType);
+            }
 
-            if (validatedPartitions.contains(partition.getId())) {
-                // partition cache hit
-                continue;
+            Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
+            if (cartridge == null) {
+                String msg = "Invalid Cartridge Type: " + cartridgeType;
+                LOG.error(msg);
+                throw new InvalidCartridgeTypeException(msg);
             }
 
-            Callable<IaasProvider> worker = new PartitionValidatorCallable(
-                    partition, cartridge);
-            Future<IaasProvider> job = CloudControllerContext.getInstance()
-                    .getExecutorService().submit(worker);
-            jobList.put(partition.getId(), job);
-        }
+            Map<String, Future<IaasProvider>> jobList = new HashMap<String, Future<IaasProvider>>();
+            for (Partition partition : partitions) {
+                if (validatedPartitions.contains(partition.getId())) {
+                    // partition cache hit
+                    continue;
+                }
 
-        // Retrieve the results of the concurrently performed sanity checks.
-        for (Entry<String, Future<IaasProvider>> entry : jobList.entrySet()) {
-            if (entry == null) {
-                continue;
+                Callable<IaasProvider> worker = new PartitionValidatorCallable(
+                        partition, cartridge);
+                Future<IaasProvider> job = CloudControllerContext.getInstance()
+                        .getExecutorService().submit(worker);
+                jobList.put(partition.getId(), job);
             }
-            String partitionId = entry.getKey();
-            Future<IaasProvider> job = entry.getValue();
-            try {
-                // add to a temporary Map
-                partitionToIaasProviders.put(partitionId, job.get());
 
-                // add to cache
-                this.cloudControllerContext.addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId);
+            // Retrieve the results of the concurrently performed sanity checks.
+            for (Entry<String, Future<IaasProvider>> entry : jobList.entrySet()) {
+                if (entry == null) {
+                    continue;
+                }
+                String partitionId = entry.getKey();
+                Future<IaasProvider> job = entry.getValue();
+                try {
+                    // add to a temporary Map
+                    partitionToIaasProviders.put(partitionId, job.get());
 
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Partition " + partitionId + " added to the cache against cartridge type: " + cartridgeType);
+                    // add to cache
+                    CloudControllerContext.getInstance().addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId);
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Partition " + partitionId + " added to the cache against cartridge type: " + cartridgeType);
+                    }
+                } catch (Exception e) {
+                    LOG.error(e.getMessage(), e);
+                    throw new InvalidPartitionException(e.getMessage(), e);
                 }
-            } catch (Exception e) {
-                LOG.error(e.getMessage(), e);
-                throw new InvalidPartitionException(e.getMessage(), e);
             }
-        }
 
-        // if and only if the deployment policy valid
-        cartridge.addIaasProviders(partitionToIaasProviders);
+            // if and only if the deployment policy valid
+            cartridge.addIaasProviders(partitionToIaasProviders);
+            CloudControllerContext.getInstance().updateCartridge(cartridge);
 
-        // persist data
-        persist();
+            // persist data
+            persist();
 
-        LOG.info("All partitions " + CloudControllerUtil.getPartitionIds(partitions) +
-                " were validated successfully, against the Cartridge: " + cartridgeType);
+            LOG.info("All partitions " + CloudControllerUtil.getPartitionIds(partitions) +
+                    " were validated successfully, against the Cartridge: " + cartridgeType);
 
-        return true;
+            return true;
+        } finally {
+            if(lock != null) {
+                CloudControllerContext.getInstance().releaseWriteLock(lock);
+            }
+        }
     }
 
     private void onClusterRemoval(final String clusterId) {
-        ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
+        ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
         TopologyBuilder.handleClusterRemoved(ctxt);
-        cloudControllerContext.removeClusterContext(clusterId);
-        cloudControllerContext.removeMemberContextsOfCluster(clusterId);
+        CloudControllerContext.getInstance().removeClusterContext(clusterId);
+        CloudControllerContext.getInstance().removeMemberContextsOfCluster(clusterId);
         persist();
     }
 
@@ -1524,160 +1546,166 @@ public class CloudControllerServiceImpl implements CloudControllerService {
     }
 
     public ClusterContext getClusterContext(String clusterId) {
-
-        return cloudControllerContext.getClusterContext(clusterId);
+        return CloudControllerContext.getInstance().getClusterContext(clusterId);
     }
 
     @Override
     public MemberContext[] startContainers(ContainerClusterContext containerClusterContext)
             throws UnregisteredCartridgeException {
+        Lock lock = null;
+        try {
+            lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock();
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("CloudControllerServiceImpl:startContainers");
-        }
-
-        handleNullObject(containerClusterContext, "Container start-up failed. ContainerClusterContext is null.");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("CloudControllerServiceImpl:startContainers");
+            }
 
-        String clusterId = containerClusterContext.getClusterId();
-        handleNullObject(clusterId, "Container start-up failed. Cluster id is null.");
+            handleNullObject(containerClusterContext, "Container start-up failed. ContainerClusterContext is null.");
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Received a container spawn request : " + containerClusterContext.toString());
-        }
+            String clusterId = containerClusterContext.getClusterId();
+            handleNullObject(clusterId, "Container start-up failed. Cluster id is null.");
 
-        ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
-        handleNullObject(ctxt, "Container start-up failed. Invalid cluster id. " + containerClusterContext.toString());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Received a container spawn request : " + containerClusterContext.toString());
+            }
 
-        String cartridgeType = ctxt.getCartridgeType();
+            ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
+            handleNullObject(ctxt, "Container start-up failed. Invalid cluster id. " + containerClusterContext.toString());
 
-        Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+            String cartridgeType = ctxt.getCartridgeType();
 
-        if (cartridge == null) {
-            String msg =
-                    "Instance start-up failed. No matching Cartridge found [type] " + cartridgeType + ". " +
-                            containerClusterContext.toString();
-            LOG.error(msg);
-            throw new UnregisteredCartridgeException(msg);
-        }
+            Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
 
-        try {
-            String minReplicas = validateProperty(StratosConstants.KUBERNETES_MIN_REPLICAS, ctxt);
-            String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
-            String kubernetesMasterIp = validateProperty(StratosConstants.KUBERNETES_MASTER_IP, containerClusterContext);
-            String kubernetesPortRange = validateProperty(StratosConstants.KUBERNETES_PORT_RANGE, containerClusterContext);
+            if (cartridge == null) {
+                String msg = "Instance start-up failed. No matching Cartridge found [type] " + cartridgeType + ". " +
+                                containerClusterContext.toString();
+                LOG.error(msg);
+                throw new UnregisteredCartridgeException(msg);
+            }
 
-            KubernetesClusterContext kubClusterContext = getKubernetesClusterContext(kubernetesClusterId, kubernetesMasterIp, kubernetesPortRange);
+            try {
+                String minReplicas = validateProperty(StratosConstants.KUBERNETES_MIN_REPLICAS, ctxt);
+                String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
+                String kubernetesMasterIp = validateProperty(StratosConstants.KUBERNETES_MASTER_IP, containerClusterContext);
+                String kubernetesPortRange = validateProperty(StratosConstants.KUBERNETES_PORT_RANGE, containerClusterContext);
 
-            KubernetesApiClient kubApi = kubClusterContext.getKubApi();
+                KubernetesClusterContext kubClusterContext = getKubernetesClusterContext(kubernetesClusterId,
+                        kubernetesMasterIp, kubernetesPortRange);
+                KubernetesApiClient kubApi = kubClusterContext.getKubApi();
 
-            // first let's create a replication controller.
-            ContainerClusterContextToReplicationController controllerFunction = new ContainerClusterContextToReplicationController();
-            ReplicationController controller = controllerFunction.apply(containerClusterContext);
+                // first let's create a replication controller.
+                ContainerClusterContextToReplicationController controllerFunction = new ContainerClusterContextToReplicationController();
+                ReplicationController controller = controllerFunction.apply(containerClusterContext);
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller is delegating request to start a replication controller " + controller +
-                        " for " + containerClusterContext + " to Kubernetes layer.");
-            }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cloud Controller is delegating request to start a replication controller " + controller +
+                            " for " + containerClusterContext + " to Kubernetes layer.");
+                }
 
-            kubApi.createReplicationController(controller);
+                kubApi.createReplicationController(controller);
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller successfully started the controller "
-                        + controller + " via Kubernetes layer.");
-            }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cloud Controller successfully started the controller "
+                            + controller + " via Kubernetes layer.");
+                }
 
-            // secondly let's create a kubernetes service proxy to load balance these containers
-            ContainerClusterContextToKubernetesService serviceFunction = new ContainerClusterContextToKubernetesService();
-            Service service = serviceFunction.apply(containerClusterContext);
+                // secondly let's create a kubernetes service proxy to load balance these containers
+                ContainerClusterContextToKubernetesService serviceFunction = new ContainerClusterContextToKubernetesService();
+                Service service = serviceFunction.apply(containerClusterContext);
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller is delegating request to start a service " + service +
-                        " for " + containerClusterContext + " to Kubernetes layer.");
-            }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cloud Controller is delegating request to start a service " + service +
+                            " for " + containerClusterContext + " to Kubernetes layer.");
+                }
 
-            kubApi.createService(service);
+                kubApi.createService(service);
 
-            // set host port and update
-            Property allocatedServiceHostPortProp = new Property();
-            allocatedServiceHostPortProp.setName(StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
-            allocatedServiceHostPortProp.setValue(String.valueOf(service.getPort()));
-            ctxt.getProperties().addProperty(allocatedServiceHostPortProp);
-            cloudControllerContext.addClusterContext(ctxt);
+                // set host port and update
+                Property allocatedServiceHostPortProp = new Property();
+                allocatedServiceHostPortProp.setName(StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
+                allocatedServiceHostPortProp.setValue(String.valueOf(service.getPort()));
+                ctxt.getProperties().addProperty(allocatedServiceHostPortProp);
+                CloudControllerContext.getInstance().addClusterContext(ctxt);
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller successfully started the service "
-                        + controller + " via Kubernetes layer.");
-            }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cloud Controller successfully started the service "
+                            + controller + " via Kubernetes layer.");
+                }
 
-            // create a label query
-            Label l = new Label();
-            l.setName(clusterId);
-            // execute the label query
-            Pod[] newlyCreatedPods = new Pod[0];
-            int expectedCount = Integer.parseInt(minReplicas);
+                // create a label query
+                Label l = new Label();
+                l.setName(clusterId);
+                // execute the label query
+                Pod[] newlyCreatedPods = new Pod[0];
+                int expectedCount = Integer.parseInt(minReplicas);
 
-            for (int i = 0; i < expectedCount; i++) {
-                newlyCreatedPods = kubApi.getSelectedPods(new Label[]{l});
+                for (int i = 0; i < expectedCount; i++) {
+                    newlyCreatedPods = kubApi.getSelectedPods(new Label[]{l});
 
-                if (LOG.isDebugEnabled()) {
+                    if (LOG.isDebugEnabled()) {
 
-                    LOG.debug("Pods Count: " + newlyCreatedPods.length + " for cluster: " + clusterId);
+                        LOG.debug("Pods Count: " + newlyCreatedPods.length + " for cluster: " + clusterId);
+                    }
+                    if (newlyCreatedPods.length == expectedCount) {
+                        break;
+                    }
+                    Thread.sleep(10000);
                 }
-                if (newlyCreatedPods.length == expectedCount) {
-                    break;
+
+                if (newlyCreatedPods.length == 0) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(String.format("Pods are not created for cluster : %s, hence deleting the service", clusterId));
+                    }
+                    terminateAllContainers(clusterId);
+                    return new MemberContext[0];
                 }
-                Thread.sleep(10000);
-            }
 
-            if (newlyCreatedPods.length == 0) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(String.format("Pods are not created for cluster : %s, hence deleting the service", clusterId));
-                }
-                terminateAllContainers(clusterId);
-                return new MemberContext[0];
-            }
 
-            if (LOG.isDebugEnabled()) {
+                    LOG.debug(String.format("Pods created : %s for cluster : %s", newlyCreatedPods.length, clusterId));
+                }
 
-                LOG.debug(String.format("Pods created : %s for cluster : %s", newlyCreatedPods.length, clusterId));
-            }
+                List<MemberContext> memberContexts = new ArrayList<MemberContext>();
 
-            List<MemberContext> memberContexts = new ArrayList<MemberContext>();
+                PodToMemberContext podToMemberContextFunc = new PodToMemberContext();
+                // generate Member Contexts
+                for (Pod pod : newlyCreatedPods) {
+                    MemberContext context = podToMemberContextFunc.apply(pod);
+                    context.setCartridgeType(cartridgeType);
+                    context.setClusterId(clusterId);
 
-            PodToMemberContext podToMemberContextFunc = new PodToMemberContext();
-            // generate Member Contexts
-            for (Pod pod : newlyCreatedPods) {
-                MemberContext context = podToMemberContextFunc.apply(pod);
-                context.setCartridgeType(cartridgeType);
-                context.setClusterId(clusterId);
+                    context.setProperties(CloudControllerUtil.addProperty(context
+                                    .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
+                            String.valueOf(service.getPort())));
 
-                context.setProperties(CloudControllerUtil.addProperty(context
-                                .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
-                        String.valueOf(service.getPort())));
+                    CloudControllerContext.getInstance().addMemberContext(context);
 
-                cloudControllerContext.addMemberContext(context);
+                    // wait till Pod status turns to running and send member spawned.
+                    ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Cloud Controller is starting the instance start up thread.");
+                    }
+                    CloudControllerContext.getInstance().addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
 
-                // wait till Pod status turns to running and send member spawned.
-                ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance();
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Cloud Controller is starting the instance start up thread.");
+                    memberContexts.add(context);
                 }
-                cloudControllerContext.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
-
-                memberContexts.add(context);
-            }
 
-            // persist in registry
-            persist();
+                // persist in registry
+                persist();
 
-            LOG.info("Kubernetes entities are successfully starting up: " + memberContexts);
+                LOG.info("Kubernetes entities are successfully starting up: " + memberContexts);
 
-            return memberContexts.toArray(new MemberContext[0]);
+                return memberContexts.toArray(new MemberContext[0]);
 
-        } catch (Exception e) {
-            String msg = "Failed to start an instance. " + containerClusterContext.toString() + " Cause: " + e.getMessage();
-            LOG.error(msg, e);
-            throw new IllegalStateException(msg, e);
+            } catch (Exception e) {
+                String msg = "Failed to start an instance. " + containerClusterContext.toString() + " Cause: " + e.getMessage();
+                LOG.error(msg, e);
+                throw new IllegalStateException(msg, e);
+            }
+        } finally {
+            if(lock != null) {
+                CloudControllerContext.getInstance().releaseWriteLock(lock);
+            }
         }
     }
 
@@ -1700,18 +1728,18 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             String kubernetesClusterId, String kubernetesMasterIp,
             String kubernetesPortRange) {
 
-        KubernetesClusterContext origCtxt = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId);
+        KubernetesClusterContext origCtxt = CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId);
         KubernetesClusterContext newCtxt = new KubernetesClusterContext(kubernetesClusterId, kubernetesPortRange, kubernetesMasterIp);
 
         if (origCtxt == null) {
-            cloudControllerContext.addKubernetesClusterContext(newCtxt);
+            CloudControllerContext.getInstance().addKubernetesClusterContext(newCtxt);
             return newCtxt;
         }
 
         if (!origCtxt.equals(newCtxt)) {
             // if for some reason master IP etc. have changed
             newCtxt.setAvailableHostPorts(origCtxt.getAvailableHostPorts());
-            cloudControllerContext.addKubernetesClusterContext(newCtxt);
+            CloudControllerContext.getInstance().addKubernetesClusterContext(newCtxt);
             return newCtxt;
         } else {
             return origCtxt;
@@ -1721,228 +1749,242 @@ public class CloudControllerServiceImpl implements CloudControllerService {
     @Override
     public MemberContext[] terminateAllContainers(String clusterId)
             throws InvalidClusterException {
+        Lock lock = null;
+        try {
+            lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock();
 
-        ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
-        handleNullObject(ctxt, "Kubernetes units temrination failed. Invalid cluster id. " + clusterId);
-
-        String kubernetesClusterId = CloudControllerUtil.getProperty(ctxt.getProperties(),
-                StratosConstants.KUBERNETES_CLUSTER_ID);
-        handleNullObject(kubernetesClusterId, "Kubernetes units termination failed. Cannot find '" +
-                StratosConstants.KUBERNETES_CLUSTER_ID + "'. " + ctxt);
+            ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
+            handleNullObject(ctxt, "Kubernetes units temrination failed. Invalid cluster id. " + clusterId);
 
-        KubernetesClusterContext kubClusterContext = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId);
-        handleNullObject(kubClusterContext, "Kubernetes units termination failed. Cannot find a matching Kubernetes Cluster for cluster id: "
-                + kubernetesClusterId);
+            String kubernetesClusterId = CloudControllerUtil.getProperty(ctxt.getProperties(),
+                    StratosConstants.KUBERNETES_CLUSTER_ID);
+            handleNullObject(kubernetesClusterId, "Kubernetes units termination failed. Cannot find '" +
+                    StratosConstants.KUBERNETES_CLUSTER_ID + "'. " + ctxt);
 
-        KubernetesApiClient kubApi = kubClusterContext.getKubApi();
-        // delete the service
-        try {
-            kubApi.deleteService(CloudControllerUtil.getCompatibleId(clusterId));
-        } catch (KubernetesClientException e) {
-            // we're not going to throw this error, but proceed with other deletions
-            LOG.error("Failed to delete Kubernetes service with id: " + clusterId, e);
-        }
+            KubernetesClusterContext kubClusterContext = CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId);
+            handleNullObject(kubClusterContext, "Kubernetes units termination failed. Cannot find a matching Kubernetes Cluster for cluster id: "
+                    + kubernetesClusterId);
 
-        // set replicas=0 for the replication controller
-        try {
-            kubApi.updateReplicationController(clusterId, 0);
-        } catch (KubernetesClientException e) {
-            // we're not going to throw this error, but proceed with other deletions
-            LOG.error("Failed to update Kubernetes Controller with id: " + clusterId, e);
-        }
+            KubernetesApiClient kubApi = kubClusterContext.getKubApi();
+            // delete the service
+            try {
+                kubApi.deleteService(CloudControllerUtil.getCompatibleId(clusterId));
+            } catch (KubernetesClientException e) {
+                // we're not going to throw this error, but proceed with other deletions
+                LOG.error("Failed to delete Kubernetes service with id: " + clusterId, e);
+            }
 
-        // delete pods forcefully
-        try {
-            // create a label query
-            Label l = new Label();
-            l.setName(clusterId);
-            // execute the label query
-            Pod[] pods = kubApi.getSelectedPods(new Label[]{l});
+            // set replicas=0 for the replication controller
+            try {
+                kubApi.updateReplicationController(clusterId, 0);
+            } catch (KubernetesClientException e) {
+                // we're not going to throw this error, but proceed with other deletions
+                LOG.error("Failed to update Kubernetes Controller with id: " + clusterId, e);
+            }
 
-            for (Pod pod : pods) {
-                try {
-                    // delete pods forcefully
-                    kubApi.deletePod(pod.getId());
-                } catch (KubernetesClientException ignore) {
-                    // we can't do nothing here
-                    LOG.warn(String.format("Failed to delete Pod [%s] forcefully!", pod.getId()));
+            // delete pods forcefully
+            try {
+                // create a label query
+                Label l = new Label();
+                l.setName(clusterId);
+                // execute the label query
+                Pod[] pods = kubApi.getSelectedPods(new Label[]{l});
+
+                for (Pod pod : pods) {
+                    try {
+                        // delete pods forcefully
+                        kubApi.deletePod(pod.getId());
+                    } catch (KubernetesClientException ignore) {
+                        // we can't do nothing here
+                        LOG.warn(String.format("Failed to delete Pod [%s] forcefully!", pod.getId()));
+                    }
                 }
+            } catch (KubernetesClientException e) {
+                // we're not going to throw this error, but proceed with other deletions
+                LOG.error("Failed to delete pods forcefully for cluster: " + clusterId, e);
             }
-        } catch (KubernetesClientException e) {
-            // we're not going to throw this error, but proceed with other deletions
-            LOG.error("Failed to delete pods forcefully for cluster: " + clusterId, e);
-        }
-
-        // delete the replication controller.
-        try {
-            kubApi.deleteReplicationController(clusterId);
-        } catch (KubernetesClientException e) {
-            String msg = "Failed to delete Kubernetes Controller with id: " + clusterId;
-            LOG.error(msg, e);
-            throw new InvalidClusterException(msg, e);
-        }
 
-        String allocatedPort = CloudControllerUtil.getProperty(ctxt.getProperties(),
-                StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
+            // delete the replication controller.
+            try {
+                kubApi.deleteReplicationController(clusterId);
+            } catch (KubernetesClientException e) {
+                String msg = "Failed to delete Kubernetes Controller with id: " + clusterId;
+                LOG.error(msg, e);
+                throw new InvalidClusterException(msg, e);
+            }
 
-        if (allocatedPort != null) {
-            kubClusterContext.deallocateHostPort(Integer
-                    .parseInt(allocatedPort));
-        } else {
-            LOG.warn("Host port dealloacation failed due to a missing property: "
-                    + StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
-        }
+            String allocatedPort = CloudControllerUtil.getProperty(ctxt.getProperties(),
+                    StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
 
-        List<MemberContext> membersToBeRemoved = cloudControllerContext.getMemberContextsOfClusterId(clusterId);
+            if (allocatedPort != null) {
+                kubClusterContext.deallocateHostPort(Integer
+                        .parseInt(allocatedPort));
+            } else {
+                LOG.warn("Host port dealloacation failed due to a missing property: "
+                        + StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
+            }
 
-        for (MemberContext memberContext : membersToBeRemoved) {
-            logTermination(memberContext);
-        }
+            List<MemberContext> membersToBeRemoved = CloudControllerContext.getInstance().getMemberContextsOfClusterId(clusterId);
 
-        // persist
-        persist();
+            for (MemberContext memberContext : membersToBeRemoved) {
+                logTermination(memberContext);
+            }
 
-        return membersToBeRemoved.toArray(new MemberContext[0]);
+            // persist
+            persist();
+            return membersToBeRemoved.toArray(new MemberContext[0]);
+        } finally {
+            if(lock != null) {
+                CloudControllerContext.getInstance().releaseWriteLock(lock);
+            }
+        }
     }
 
     @Override
     public MemberContext[] updateContainers(String clusterId, int replicas)
             throws UnregisteredCartridgeException {
+        Lock lock = null;
+        try {
+            lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock();
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("CloudControllerServiceImpl:updateContainers for cluster : " + clusterId);
-        }
-
-        ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
-        handleNullObject(ctxt, "Container update failed. Invalid cluster id. " + clusterId);
-
-        String cartridgeType = ctxt.getCartridgeType();
-
-        Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("CloudControllerServiceImpl:updateContainers for cluster : " + clusterId);
+            }
 
-        if (cartridge == null) {
-            String msg =
-                    "Container update failed. No matching Cartridge found [type] " + cartridgeType
-                            + ". [cluster id] " + clusterId;
-            LOG.error(msg);
-            throw new UnregisteredCartridgeException(msg);
-        }
+            ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
+            handleNullObject(ctxt, "Container update failed. Invalid cluster id. " + clusterId);
 
-        try {
-            String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
+            String cartridgeType = ctxt.getCartridgeType();
 
-            KubernetesClusterContext kubClusterContext = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId);
+            Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
 
-            if (kubClusterContext == null) {
+            if (cartridge == null) {
                 String msg =
-                        "Instance start-up failed. No matching Kubernetes Context Found for [id] " + kubernetesClusterId
+                        "Container update failed. No matching Cartridge found [type] " + cartridgeType
                                 + ". [cluster id] " + clusterId;
                 LOG.error(msg);
                 throw new UnregisteredCartridgeException(msg);
             }
 
-            KubernetesApiClient kubApi = kubClusterContext.getKubApi();
-            // create a label query
-            Label l = new Label();
-            l.setName(clusterId);
+            try {
+                String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
 
-            // get the current pods - useful when scale down
-            Pod[] previousStatePods = kubApi.getSelectedPods(new Label[]{l});
+                KubernetesClusterContext kubClusterContext = CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId);
 
-            // update the replication controller - cluster id = replication controller id
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller is delegating request to update a replication controller " + clusterId +
-                        " to Kubernetes layer.");
-            }
+                if (kubClusterContext == null) {
+                    String msg =
+                            "Instance start-up failed. No matching Kubernetes Context Found for [id] " + kubernetesClusterId
+                                    + ". [cluster id] " + clusterId;
+                    LOG.error(msg);
+                    throw new UnregisteredCartridgeException(msg);
+                }
 
-            kubApi.updateReplicationController(clusterId, replicas);
+                KubernetesApiClient kubApi = kubClusterContext.getKubApi();
+                // create a label query
+                Label l = new Label();
+                l.setName(clusterId);
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Cloud Controller successfully updated the controller "
-                        + clusterId + " via Kubernetes layer.");
-            }
+                // get the current pods - useful when scale down
+                Pod[] previousStatePods = kubApi.getSelectedPods(new Label[]{l});
 
-            // execute the label query
-            Pod[] allPods = new Pod[0];
+                // update the replication controller - cluster id = replication controller id
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cloud Controller is delegating request to update a replication controller " + clusterId +
+                            " to Kubernetes layer.");
+                }
 
-            // wait replicas*5s time in the worst case ; best case = 0s
-            for (int i = 0; i < (replicas * previousStatePods.length + 1); i++) {
-                allPods = kubApi.getSelectedPods(new Label[]{l});
+                kubApi.updateReplicationController(clusterId, replicas);
 
                 if (LOG.isDebugEnabled()) {
-
-                    LOG.debug("Pods Count: " + allPods.length + " for cluster: " + clusterId);
+                    LOG.debug("Cloud Controller successfully updated the controller "
+                            + clusterId + " via Kubernetes layer.");
                 }
-                if (allPods.length == replicas) {
-                    break;
-                }
-                Thread.sleep(10000);
-            }
 
-            if (LOG.isDebugEnabled()) {
+                // execute the label query
+                Pod[] allPods = new Pod[0];
 
-                LOG.debug(String.format("Pods created : %s for cluster : %s", allPods.length, clusterId));
-            }
+                // wait replicas*5s time in the worst case ; best case = 0s
+                for (int i = 0; i < (replicas * previousStatePods.length + 1); i++) {
+                    allPods = kubApi.getSelectedPods(new Label[]{l});
 
-            List<MemberContext> memberContexts = new ArrayList<MemberContext>();
+                    if (LOG.isDebugEnabled()) {
 
-            PodToMemberContext podToMemberContextFunc = new PodToMemberContext();
-            // generate Member Contexts
-            for (Pod pod : allPods) {
-                MemberContext context;
-                // if member context does not exist -> a new member (scale up)
-                if ((context = cloudControllerContext.getMemberContextOfMemberId(pod.getId())) == null) {
+                        LOG.debug("Pods Count: " + allPods.length + " for cluster: " + clusterId);
+                    }
+                    if (allPods.length == replicas) {
+                        break;
+                    }
+                    Thread.sleep(10000);
+                }
 
-                    context = podToMemberContextFunc.apply(pod);
-                    context.setCartridgeType(cartridgeType);
-                    context.setClusterId(clusterId);
+                if (LOG.isDebugEnabled()) {
 
-                    context.setProperties(CloudControllerUtil.addProperty(context
-                                    .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
-                            CloudControllerUtil.getProperty(ctxt.getProperties(),
-                                    StratosConstants.ALLOCATED_SERVICE_HOST_PORT)));
+                    LOG.debug(String.format("Pods created : %s for cluster : %s", allPods.length, clusterId));
+                }
 
-                    // wait till Pod status turns to running and send member spawned.
-                    ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance();
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Cloud Controller is starting the instance start up thread.");
-                    }
-                    cloudControllerContext.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
+                List<MemberContext> memberContexts = new ArrayList<MemberContext>();
 
-                    memberContexts.add(context);
+                PodToMemberContext podToMemberContextFunc = new PodToMemberContext();
+                // generate Member Contexts
+                for (Pod pod : allPods) {
+                    MemberContext context;
+                    // if member context does not exist -> a new member (scale up)
+                    if ((context = CloudControllerContext.getInstance().getMemberContextOfMemberId(pod.getId())) == null) {
 
-                }
-                // publish data
-                // TODO
-//                CartridgeInstanceDataPublisher.publish(context.getMemberId(), null, null, context.getClusterId(), cartridgeType, MemberStatus.Created.toString(), node);
+                        context = podToMemberContextFunc.apply(pod);
+                        context.setCartridgeType(cartridgeType);
+                        context.setClusterId(clusterId);
 
-            }
+                        context.setProperties(CloudControllerUtil.addProperty(context
+                                        .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
+                                CloudControllerUtil.getProperty(ctxt.getProperties(),
+                                        StratosConstants.ALLOCATED_SERVICE_HOST_PORT)));
+
+                        // wait till Pod status turns to running and send member spawned.
+                        ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance();
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Cloud Controller is starting the instance start up thread.");
+                        }
+                        CloudControllerContext.getInstance().addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
 
-            if (memberContexts.isEmpty()) {
-                // terminated members
-                @SuppressWarnings("unchecked")
-                List<Pod> difference = ListUtils.subtract(Arrays.asList(previousStatePods), Arrays.asList(allPods));
-                for (Pod pod : difference) {
-                    if (pod != null) {
-                        MemberContext context = cloudControllerContext.getMemberContextOfMemberId(pod.getId());
-                        logTermination(context);
                         memberContexts.add(context);
+
                     }
+                    // publish data
+                    // TODO
+//                CartridgeInstanceDataPublisher.publish(context.getMemberId(), null, null, context.getClusterId(), cartridgeType, MemberStatus.Created.toString(), node);
+
                 }
-            }
 
+                if (memberContexts.isEmpty()) {
+                    // terminated members
+                    @SuppressWarnings("unchecked")
+                    List<Pod> difference = ListUtils.subtract(Arrays.asList(previousStatePods), Arrays.asList(allPods));
+                    for (Pod pod : difference) {
+                        if (pod != null) {
+                            MemberContext context = CloudControllerContext.getInstance().getMe

<TRUNCATED>

[2/3] stratos git commit: Adding distributed locks to cloud controller service methods

Posted by im...@apache.org.
Adding distributed locks to cloud controller service methods


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/71fab2b4
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/71fab2b4
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/71fab2b4

Branch: refs/heads/master
Commit: 71fab2b440d40b82f80407ce6481b36259a26e1b
Parents: e8914f3
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Dec 4 00:14:06 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 4 00:14:40 2014 +0530

----------------------------------------------------------------------
 .../context/CloudControllerContext.java         |  164 +-
 .../impl/CloudControllerServiceImpl.java        | 1642 +++++++++---------
 2 files changed, 941 insertions(+), 865 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/71fab2b4/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index 2d4e195..53e7be4 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -37,10 +37,7 @@ import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
 import org.wso2.carbon.registry.core.exceptions.RegistryException;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -55,23 +52,23 @@ public class CloudControllerContext implements Serializable {
     private static final long serialVersionUID = -2662307358852779897L;
     private static final Log log = LogFactory.getLog(CloudControllerContext.class);
 
-    private static final String CC_CLUSTER_ID_TO_MEMBER_CTX = "CC_CLUSTER_ID_TO_MEMBER_CTX";
+    private static final String CC_CLUSTER_ID_TO_MEMBER_CTX_MAP = "CC_CLUSTER_ID_TO_MEMBER_CTX_MAP";
     private static final String CC_CLUSTER_ID_TO_CLUSTER_CTX = "CC_CLUSTER_ID_TO_CLUSTER_CTX";
-    private static final String CC_MEMBER_ID_TO_MEMBER_CTX = "CC_MEMBER_ID_TO_MEMBER_CTX";
-    private static final String CC_MEMBER_ID_TO_SCH_TASK = "CC_MEMBER_ID_TO_SCH_TASK";
+    private static final String CC_MEMBER_ID_TO_MEMBER_CTX_MAP = "CC_MEMBER_ID_TO_MEMBER_CTX_MAP";
+    private static final String CC_MEMBER_ID_TO_SCH_TASK_MAP = "CC_MEMBER_ID_TO_SCH_TASK_MAP";
     private static final String CC_KUB_GROUP_ID_TO_GROUP_MAP = "CC_KUB_GROUP_ID_TO_GROUP_MAP";
-    private static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX";
-    private static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS = "CC_CARTRIDGE_TYPE_TO_PARTITION_IDS";
-    private static final String CC_CARTRIDGES = "CC_CARTRIDGES";
-    private static final String CC_SERVICE_GROUPS = "CC_SERVICE_GROUPS";
-
-    private static final String CC_CLUSTER_CTX_LOCK = "CC_CLUSTER_ID_TO_MEMBER_CTX_LOCK";
-    private static final String CC_MEMBER_CTX_LOCK = "CC_MEMBER_ID_TO_MEMBER_CTX_LOCK";
-    private static final String CC_SCH_TASK_LOCK = "CC_MEMBER_ID_TO_SCH_TASK_LOCK";
-    private static final String CC_KUB_GROUP_LOCK = "CC_KUB_GROUP_ID_TO_GROUP_MAP_LOCK";
-    private static final String CC_KUB_CLUSTER_CTX_LOCK = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX_LOCK";
-    private static final String CC_CARTRIDGES_LOCK = "CC_CARTRIDGES_LOCK";
-    private static final String CC_SERVICE_GROUPS_LOCK = "CC_SERVICE_GROUPS_LOCK";
+    private static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX_MAP = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX_MAP";
+    private static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS_MAP = "CC_CARTRIDGE_TYPE_TO_PARTITION_IDS_MAP";
+    private static final String CC_CARTRIDGE_TYPE_TO_CARTRIDGES_MAP = "CC_CARTRIDGE_TYPE_TO_CARTRIDGES_MAP";
+    private static final String CC_SERVICE_GROUP_NAME_TO_SERVICE_GROUP_MAP = "CC_SERVICE_GROUP_NAME_TO_SERVICE_GROUP_MAP";
+
+    private static final String CC_CLUSTER_CTX_WRITE_LOCK = "CC_CLUSTER_CTX_WRITE_LOCK";
+    private static final String CC_MEMBER_CTX_WRITE_LOCK = "CC_MEMBER_CTX_WRITE_LOCK";
+    private static final String CC_SCH_TASK_WRITE_LOCK = "CC_SCH_TASK_WRITE_LOCK";
+    private static final String CC_KUB_GROUP_WRITE_LOCK = "CC_KUB_GROUP_WRITE_LOCK";
+    private static final String CC_KUB_CLUSTER_CTX_WRITE_LOCK = "CC_KUB_CLUSTER_CTX_WRITE_LOCK";
+    private static final String CC_CARTRIDGES_WRITE_LOCK = "CC_CARTRIDGES_WRITE_LOCK";
+    private static final String CC_SERVICE_GROUPS_WRITE_LOCK = "CC_SERVICE_GROUPS_WRITE_LOCK";
 
     private static volatile CloudControllerContext instance;
 
@@ -129,14 +126,18 @@ public class CloudControllerContext implements Serializable {
     private transient ExecutorService executorService = Executors.newFixedThreadPool(20);
 
     /**
-     * List of registered {@link org.apache.stratos.cloud.controller.domain.Cartridge}s
+     * Map of registered {@link org.apache.stratos.cloud.controller.domain.Cartridge}s
+     * Key - cartridge type
+     * Value - cartridge
      */
-    private List<Cartridge> cartridges;
+    private Map<String, Cartridge> cartridgeTypeToCartridgeMap;
 
     /**
-     * List of deployed service groups
+     * Map of deployed service groups
+     * Key - service group name
+     * Value service group
      */
-    private List<ServiceGroup> serviceGroups;
+    private Map<String, ServiceGroup> serviceGroupNameToServiceGroupMap;
 
     private String streamId;
     private boolean isPublisherRunning;
@@ -157,14 +158,14 @@ public class CloudControllerContext implements Serializable {
 
         // Initialize objects
         kubernetesGroupsMap = distributedObjectProvider.getMap(CC_KUB_GROUP_ID_TO_GROUP_MAP);
-        clusterIdToMemberContextListMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX);
-        memberIdToMemberContextMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_MEMBER_CTX);
-        memberIdToScheduledTaskMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_SCH_TASK);
-        kubClusterIdToKubClusterContextMap = distributedObjectProvider.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX);
+        clusterIdToMemberContextListMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX_MAP);
+        memberIdToMemberContextMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_MEMBER_CTX_MAP);
+        memberIdToScheduledTaskMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_SCH_TASK_MAP);
+        kubClusterIdToKubClusterContextMap = distributedObjectProvider.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX_MAP);
         clusterIdToContextMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_CLUSTER_CTX);
-        cartridgeTypeToPartitionIdsMap = distributedObjectProvider.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS);
-        cartridges = distributedObjectProvider.getList(CC_CARTRIDGES);
-        serviceGroups = distributedObjectProvider.getList(CC_SERVICE_GROUPS);
+        cartridgeTypeToPartitionIdsMap = distributedObjectProvider.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS_MAP);
+        cartridgeTypeToCartridgeMap = distributedObjectProvider.getMap(CC_CARTRIDGE_TYPE_TO_CARTRIDGES_MAP);
+        serviceGroupNameToServiceGroupMap = distributedObjectProvider.getMap(CC_SERVICE_GROUP_NAME_TO_SERVICE_GROUP_MAP);
 
         // Update context from the registry
         updateContextFromRegistry();
@@ -181,115 +182,97 @@ public class CloudControllerContext implements Serializable {
         return instance;
     }
 
-    public List<Cartridge> getCartridges() {
-        return cartridges;
+    public java.util.Collection<Cartridge> getCartridges() {
+        return cartridgeTypeToCartridgeMap.values();
     }
 
-    public void setCartridges(List<Cartridge> cartridges) {
-        this.cartridges = cartridges;
+    public void addCartridges(List<Cartridge> cartridges) {
+        for(Cartridge cartridge : cartridges) {
+            addCartridge(cartridge);
+        }
     }
 
-    public void setServiceGroups(List<ServiceGroup> serviceGroups) {
-        this.serviceGroups = serviceGroups;
+    public void addServiceGroups(List<ServiceGroup> serviceGroups) {
+        for(ServiceGroup serviceGroup : serviceGroups) {
+            addServiceGroup(serviceGroup);
+        }
     }
 
-    public List<ServiceGroup> getServiceGroups() {
-        return this.serviceGroups;
+    public Collection<ServiceGroup> getServiceGroups() {
+        return serviceGroupNameToServiceGroupMap.values();
     }
 
     public Cartridge getCartridge(String cartridgeType) {
-        for (Cartridge cartridge : cartridges) {
-            if (cartridge.getType().equals(cartridgeType)) {
-                return cartridge;
-            }
-        }
-        return null;
+        return cartridgeTypeToCartridgeMap.get(cartridgeType);
     }
 
     private Lock acquireWriteLock(String object) {
         return distributedObjectProvider.acquireLock(object);
     }
 
-    private void releaseWriteLock(Lock lock) {
+    public void releaseWriteLock(Lock lock) {
         distributedObjectProvider.releaseLock(lock);
     }
 
-    public Lock acquireKubernetesWriteLock() {
-        return acquireWriteLock(CC_CLUSTER_CTX_LOCK);
+    public Lock acquireClusterContextWriteLock() {
+        return acquireWriteLock(CC_CLUSTER_CTX_WRITE_LOCK);
     }
 
     public Lock acquireMemberContextWriteLock() {
-        return acquireWriteLock(CC_MEMBER_CTX_LOCK);
+        return acquireWriteLock(CC_MEMBER_CTX_WRITE_LOCK);
     }
 
     public Lock acquireScheduleTaskWriteLock() {
-        return acquireWriteLock(CC_SCH_TASK_LOCK);
+        return acquireWriteLock(CC_SCH_TASK_WRITE_LOCK);
     }
 
     public Lock acquireKubernetesGroupWriteLock() {
-        return acquireWriteLock(CC_KUB_GROUP_LOCK);
+        return acquireWriteLock(CC_KUB_GROUP_WRITE_LOCK);
     }
 
     public Lock acquireKubernetesClusterContextWriteLock() {
-        return acquireWriteLock(CC_KUB_CLUSTER_CTX_LOCK);
+        return acquireWriteLock(CC_KUB_CLUSTER_CTX_WRITE_LOCK);
     }
 
     public Lock acquireCartridgesWriteLock() {
-        return acquireWriteLock(CC_CARTRIDGES_LOCK);
+        return acquireWriteLock(CC_CARTRIDGES_WRITE_LOCK);
     }
 
     public Lock acquireServiceGroupsWriteLock() {
-        return acquireWriteLock(CC_SERVICE_GROUPS_LOCK);
-    }
-
-    public void releaseKubernetesWriteLock(Lock lock) {
-        releaseWriteLock(lock);
-    }
-
-    public void releaseMemberContextWriteLock(Lock lock) {
-        releaseWriteLock(lock);
+        return acquireWriteLock(CC_SERVICE_GROUPS_WRITE_LOCK);
     }
 
-    public void releaseScheduleTaskWriteLock(Lock lock) {
-        releaseWriteLock(lock);
+    public void addCartridge(Cartridge cartridge) {
+        cartridgeTypeToCartridgeMap.put(cartridge.getType(), cartridge);
     }
 
-    public void releaseKubernetesGroupWriteLock(Lock lock) {
-        releaseWriteLock(lock);
-    }
-
-    public void releaseKubernetesClusterContextWriteLock(Lock lock) {
-        releaseWriteLock(lock);
+    public void removeCartridge(Cartridge cartridge) {
+        if(cartridgeTypeToCartridgeMap.containsKey(cartridge.getType())) {
+            cartridgeTypeToCartridgeMap.remove(cartridge.getType());
+        }
     }
 
-    public void releaseCartridgesWriteLock(Lock lock) {
-        releaseWriteLock(lock);
+    public void updateCartridge(Cartridge cartridge) {
+        cartridgeTypeToCartridgeMap.put(cartridge.getType(), cartridge);
     }
 
-    public void releaseServiceGroupsWriteLock(Lock lock) {
-        releaseWriteLock(lock);
+    public ServiceGroup getServiceGroup(String name) {
+        return serviceGroupNameToServiceGroupMap.get(name);
     }
 
-    public void addCartridge(Cartridge newCartridges) {
-        cartridges.add(newCartridges);
+    public void addServiceGroup(ServiceGroup serviceGroup) {
+        serviceGroupNameToServiceGroupMap.put(serviceGroup.getName(), serviceGroup);
     }
 
-    public ServiceGroup getServiceGroup(String name) {
-        for (ServiceGroup serviceGroup : serviceGroups) {
-            if (serviceGroup.getName().equals(name)) {
-                return serviceGroup;
-            }
+    public void removeServiceGroups(List<ServiceGroup> serviceGroups) {
+        for(ServiceGroup serviceGroup : serviceGroups) {
+            removeServiceGroup(serviceGroup);
         }
-        return null;
-    }
-
-    public void addServiceGroup(ServiceGroup newServiceGroup) {
-        serviceGroups.add(newServiceGroup);
     }
 
-    public void removeServiceGroup(List<ServiceGroup> serviceGroup) {
-        if (this.serviceGroups != null) {
-            this.serviceGroups.removeAll(serviceGroup);
+    private void removeServiceGroup(ServiceGroup serviceGroup) {
+        if(serviceGroupNameToServiceGroupMap.containsKey(serviceGroup.getName())) {
+            serviceGroupNameToServiceGroupMap.remove(serviceGroup.getName());
         }
     }
 
@@ -623,9 +606,8 @@ public class CloudControllerContext implements Serializable {
                         copyMap(serializedObj.kubClusterIdToKubClusterContextMap, kubClusterIdToKubClusterContextMap);
                         copyMap(serializedObj.clusterIdToContextMap, clusterIdToContextMap);
                         copyMap(serializedObj.cartridgeTypeToPartitionIdsMap, cartridgeTypeToPartitionIdsMap);
-
-                        copyList(serializedObj.getCartridges(), cartridges);
-                        copyList(serializedObj.getServiceGroups(), serviceGroups);
+                        copyMap(serializedObj.cartridgeTypeToCartridgeMap, cartridgeTypeToCartridgeMap);
+                        copyMap(serializedObj.serviceGroupNameToServiceGroupMap, serviceGroupNameToServiceGroupMap);
 
                         if (log.isDebugEnabled()) {
                             log.debug("Cloud controller context is read from the registry");


[3/3] stratos git commit: Moving distributed locks handled by distributed object provider and to functional methods

Posted by im...@apache.org.
Moving distributed locks handled by distributed object provider and to functional methods


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/e8914f3d
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/e8914f3d
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/e8914f3d

Branch: refs/heads/master
Commit: e8914f3d048866ab8913801aa702c6f234b41ce5
Parents: 08de729
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Dec 3 18:19:36 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 4 00:14:40 2014 +0530

----------------------------------------------------------------------
 .../context/CloudControllerContext.java         | 148 +++++++---
 .../impl/CloudControllerServiceImpl.java        |  19 +-
 .../clustering/DistributedObjectProvider.java   |   9 +-
 .../impl/HazelcastDistributedListProvider.java  | 268 +++++++++++++++++++
 .../HazelcastDistributedObjectProvider.java     | 219 ++++++++-------
 .../clustering/impl/ListEntryListener.java      |  37 +++
 .../test/DistributedObjectProviderTest.java     |  78 ++----
 .../load/balancer/context/AlgorithmContext.java |  22 +-
 .../context/map/AlgorithmContextMap.java        |  16 +-
 9 files changed, 598 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index af8741c..2d4e195 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -36,12 +36,8 @@ import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
 import org.wso2.carbon.registry.core.exceptions.RegistryException;
 
-import com.google.common.net.InetAddresses;
-
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -49,6 +45,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.locks.Lock;
 
 /**
  * This object holds all runtime data and provides faster access. This is a Singleton class.
@@ -58,15 +55,23 @@ public class CloudControllerContext implements Serializable {
     private static final long serialVersionUID = -2662307358852779897L;
     private static final Log log = LogFactory.getLog(CloudControllerContext.class);
 
-    public static final String KUB_GROUP_ID_TO_GROUP_MAP = "KUB_GROUP_ID_TO_GROUP_MAP";
-    public static final String CC_CLUSTER_ID_TO_MEMBER_CTX = "CC_CLUSTER_ID_TO_MEMBER_CTX";
-    public static final String CC_MEMBER_ID_TO_MEMBER_CTX = "CC_MEMBER_ID_TO_MEMBER_CTX";
-    public static final String CC_MEMBER_ID_TO_SCH_TASK = "CC_MEMBER_ID_TO_SCH_TASK";
-    public static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX";
-    public static final String CC_CLUSTER_ID_TO_CLUSTER_CTX = "CC_CLUSTER_ID_TO_CLUSTER_CTX";
-    public static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS = "CC_CARTRIDGE_TYPE_TO_PARTITION_IDS";
-    public static final String CC_CARTRIDGES = "CC_CARTRIDGES";
-    public static final String CC_SERVICE_GROUPS = "CC_SERVICE_GROUPS";
+    private static final String CC_CLUSTER_ID_TO_MEMBER_CTX = "CC_CLUSTER_ID_TO_MEMBER_CTX";
+    private static final String CC_CLUSTER_ID_TO_CLUSTER_CTX = "CC_CLUSTER_ID_TO_CLUSTER_CTX";
+    private static final String CC_MEMBER_ID_TO_MEMBER_CTX = "CC_MEMBER_ID_TO_MEMBER_CTX";
+    private static final String CC_MEMBER_ID_TO_SCH_TASK = "CC_MEMBER_ID_TO_SCH_TASK";
+    private static final String CC_KUB_GROUP_ID_TO_GROUP_MAP = "CC_KUB_GROUP_ID_TO_GROUP_MAP";
+    private static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX";
+    private static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS = "CC_CARTRIDGE_TYPE_TO_PARTITION_IDS";
+    private static final String CC_CARTRIDGES = "CC_CARTRIDGES";
+    private static final String CC_SERVICE_GROUPS = "CC_SERVICE_GROUPS";
+
+    private static final String CC_CLUSTER_CTX_LOCK = "CC_CLUSTER_ID_TO_MEMBER_CTX_LOCK";
+    private static final String CC_MEMBER_CTX_LOCK = "CC_MEMBER_ID_TO_MEMBER_CTX_LOCK";
+    private static final String CC_SCH_TASK_LOCK = "CC_MEMBER_ID_TO_SCH_TASK_LOCK";
+    private static final String CC_KUB_GROUP_LOCK = "CC_KUB_GROUP_ID_TO_GROUP_MAP_LOCK";
+    private static final String CC_KUB_CLUSTER_CTX_LOCK = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX_LOCK";
+    private static final String CC_CARTRIDGES_LOCK = "CC_CARTRIDGES_LOCK";
+    private static final String CC_SERVICE_GROUPS_LOCK = "CC_SERVICE_GROUPS_LOCK";
 
     private static volatile CloudControllerContext instance;
 
@@ -151,7 +156,7 @@ public class CloudControllerContext implements Serializable {
         distributedObjectProvider = ServiceReferenceHolder.getInstance().getDistributedObjectProvider();
 
         // Initialize objects
-        kubernetesGroupsMap = distributedObjectProvider.getMap(KUB_GROUP_ID_TO_GROUP_MAP);
+        kubernetesGroupsMap = distributedObjectProvider.getMap(CC_KUB_GROUP_ID_TO_GROUP_MAP);
         clusterIdToMemberContextListMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX);
         memberIdToMemberContextMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_MEMBER_CTX);
         memberIdToScheduledTaskMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_SCH_TASK);
@@ -201,8 +206,72 @@ public class CloudControllerContext implements Serializable {
         return null;
     }
 
+    private Lock acquireWriteLock(String object) {
+        return distributedObjectProvider.acquireLock(object);
+    }
+
+    private void releaseWriteLock(Lock lock) {
+        distributedObjectProvider.releaseLock(lock);
+    }
+
+    public Lock acquireKubernetesWriteLock() {
+        return acquireWriteLock(CC_CLUSTER_CTX_LOCK);
+    }
+
+    public Lock acquireMemberContextWriteLock() {
+        return acquireWriteLock(CC_MEMBER_CTX_LOCK);
+    }
+
+    public Lock acquireScheduleTaskWriteLock() {
+        return acquireWriteLock(CC_SCH_TASK_LOCK);
+    }
+
+    public Lock acquireKubernetesGroupWriteLock() {
+        return acquireWriteLock(CC_KUB_GROUP_LOCK);
+    }
+
+    public Lock acquireKubernetesClusterContextWriteLock() {
+        return acquireWriteLock(CC_KUB_CLUSTER_CTX_LOCK);
+    }
+
+    public Lock acquireCartridgesWriteLock() {
+        return acquireWriteLock(CC_CARTRIDGES_LOCK);
+    }
+
+    public Lock acquireServiceGroupsWriteLock() {
+        return acquireWriteLock(CC_SERVICE_GROUPS_LOCK);
+    }
+
+    public void releaseKubernetesWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseMemberContextWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseScheduleTaskWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseKubernetesGroupWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseKubernetesClusterContextWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseCartridgesWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseServiceGroupsWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
     public void addCartridge(Cartridge newCartridges) {
-        distributedObjectProvider.addToList(cartridges, newCartridges);
+        cartridges.add(newCartridges);
     }
 
     public ServiceGroup getServiceGroup(String name) {
@@ -215,7 +284,7 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addServiceGroup(ServiceGroup newServiceGroup) {
-        distributedObjectProvider.addToList(serviceGroups, newServiceGroup);
+        serviceGroups.add(newServiceGroup);
     }
 
     public void removeServiceGroup(List<ServiceGroup> serviceGroup) {
@@ -257,38 +326,37 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addMemberContext(MemberContext memberContext) {
-        distributedObjectProvider.putToMap(memberIdToMemberContextMap, memberContext.getMemberId(), memberContext);
+        memberIdToMemberContextMap.put(memberContext.getMemberId(), memberContext);
 
         List<MemberContext> memberContextList;
         if ((memberContextList = clusterIdToMemberContextListMap.get(memberContext.getClusterId())) == null) {
             memberContextList = new ArrayList<MemberContext>();
         }
         if (memberContextList.contains(memberContext)) {
-            distributedObjectProvider.removeFromList(memberContextList,memberContext);
+            memberContextList.remove(memberContext);
         }
-        distributedObjectProvider.addToList(memberContextList, memberContext);
-        distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, memberContext.getClusterId(),
-                memberContextList);
+        memberContextList.add(memberContext);
+        clusterIdToMemberContextListMap.put(memberContext.getClusterId(), memberContextList);
         if (log.isDebugEnabled()) {
             log.debug("Added member context to the cloud controller context: " + memberContext);
         }
     }
 
     public void addScheduledFutureJob(String memberId, ScheduledFuture<?> job) {
-        distributedObjectProvider.putToMap(memberIdToScheduledTaskMap, memberId, job);
+        memberIdToScheduledTaskMap.put(memberId, job);
     }
 
     public List<MemberContext> removeMemberContextsOfCluster(String clusterId) {
         List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId);
-        distributedObjectProvider.removeFromMap(clusterIdToMemberContextListMap, clusterId);
+        clusterIdToMemberContextListMap.remove(clusterId);
         if (memberContextList == null) {
             return new ArrayList<MemberContext>();
         }
         for (MemberContext memberContext : memberContextList) {
             String memberId = memberContext.getMemberId();
-            distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId);
+            memberIdToMemberContextMap.remove(memberId);
             ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
-            distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId);
+            memberIdToScheduledTaskMap.remove(memberId);
             stopTask(task);
 
             if (log.isDebugEnabled()) {
@@ -301,7 +369,7 @@ public class CloudControllerContext implements Serializable {
 
     public MemberContext removeMemberContext(String memberId, String clusterId) {
         MemberContext removedMemberContext = memberIdToMemberContextMap.get(memberId);
-        distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId);
+        memberIdToMemberContextMap.remove(memberId);
 
         List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId);
         if (memberContextList != null) {
@@ -315,10 +383,10 @@ public class CloudControllerContext implements Serializable {
                     iterator.remove();
                 }
             }
-            distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, clusterId, newCtxts);
+            clusterIdToMemberContextListMap.put(clusterId, newCtxts);
         }
         ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
-        distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId);
+        memberIdToScheduledTaskMap.remove(memberId);
         stopTask(task);
         return removedMemberContext;
     }
@@ -339,7 +407,7 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addClusterContext(ClusterContext ctxt) {
-        distributedObjectProvider.putToMap(clusterIdToContextMap, ctxt.getClusterId(), ctxt);
+        clusterIdToContextMap.put(ctxt.getClusterId(), ctxt);
     }
 
     public ClusterContext getClusterContext(String clusterId) {
@@ -348,7 +416,7 @@ public class CloudControllerContext implements Serializable {
 
     public ClusterContext removeClusterContext(String clusterId) {
         ClusterContext removed = clusterIdToContextMap.get(clusterId);
-        distributedObjectProvider.removeFromMap(clusterIdToContextMap, clusterId);
+        clusterIdToContextMap.remove(clusterId);
         return removed;
     }
 
@@ -366,11 +434,11 @@ public class CloudControllerContext implements Serializable {
             list = new ArrayList<String>();
         }
         list.add(partitionId);
-        distributedObjectProvider.putToMap(cartridgeTypeToPartitionIdsMap, cartridgeType, list);
+        cartridgeTypeToPartitionIdsMap.put(cartridgeType, list);
     }
 
     public void removeFromCartridgeTypeToPartitionIds(String cartridgeType) {
-        distributedObjectProvider.removeFromMap(cartridgeTypeToPartitionIdsMap, cartridgeType);
+        cartridgeTypeToPartitionIdsMap.remove(cartridgeType);
     }
 
     public KubernetesClusterContext getKubernetesClusterContext(String kubClusterId) {
@@ -378,7 +446,7 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addKubernetesClusterContext(KubernetesClusterContext kubernetesClusterContext) {
-        distributedObjectProvider.putToMap(kubClusterIdToKubClusterContextMap,
+        kubClusterIdToKubClusterContextMap.put(
                 kubernetesClusterContext.getKubernetesClusterId(),
                 kubernetesClusterContext);
     }
@@ -388,7 +456,7 @@ public class CloudControllerContext implements Serializable {
      */
     public synchronized void removeKubernetesGroup(String kubernetesGroupId) {
         // Remove entry from information model
-        distributedObjectProvider.removeFromMap(kubernetesGroupsMap, kubernetesGroupId);
+        kubernetesGroupsMap.remove(kubernetesGroupId);
     }
 
     /**
@@ -435,8 +503,12 @@ public class CloudControllerContext implements Serializable {
         }
     }
 
-    public void addKubernetesGroupToInformationModel(KubernetesGroup kubernetesGroup) {
-        distributedObjectProvider.putToMap(kubernetesGroupsMap, kubernetesGroup.getGroupId(), kubernetesGroup);
+    public void addKubernetesGroup(KubernetesGroup kubernetesGroup) {
+        kubernetesGroupsMap.put(kubernetesGroup.getGroupId(), kubernetesGroup);
+    }
+
+    public void updateKubernetesGroup(KubernetesGroup kubernetesGroup) {
+        kubernetesGroupsMap.put(kubernetesGroup.getGroupId(), kubernetesGroup);
     }
     
     public boolean kubernetesGroupExists(KubernetesGroup kubernetesGroup) {
@@ -574,13 +646,13 @@ public class CloudControllerContext implements Serializable {
 
     private void copyMap(Map sourceMap, Map destinationMap) {
         for(Object key : sourceMap.keySet()) {
-            distributedObjectProvider.putToMap(destinationMap, key, sourceMap.get(key));
+            destinationMap.put(key, sourceMap.get(key));
         }
     }
 
     private void copyList(List sourceList, List destinationList) {
         for(Object item : sourceList) {
-            distributedObjectProvider.addToList(destinationList, item);
+            destinationList.add(item);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 4843565..548b743 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -73,6 +73,7 @@ import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
 
 /**
  * Cloud Controller Service is responsible for starting up new server instances,
@@ -2087,20 +2088,24 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             LOG.info("Deploying new Kubernetes group: " + kubernetesGroup);
         }
         CloudControllerUtil.validateKubernetesGroup(kubernetesGroup);
+        Lock lock = null;
         try {
+            lock = cloudControllerContext.acquireKubernetesGroupWriteLock();
             // Add to information model
-            cloudControllerContext.addKubernetesGroupToInformationModel(kubernetesGroup);
-
+            cloudControllerContext.addKubernetesGroup(kubernetesGroup);
             persist();
             
             if (LOG.isInfoEnabled()) {
                 LOG.info(String.format("Kubernetes group deployed successfully: [id] %s, [description] %s",
                         kubernetesGroup.getGroupId(), kubernetesGroup.getDescription()));
             }
-            
             return true;
         } catch (Exception e) {
             throw new InvalidKubernetesGroupException(e.getMessage(), e);
+        } finally {
+            if(lock != null) {
+                cloudControllerContext.releaseKubernetesGroupWriteLock(lock);
+            }
         }
     }
     
@@ -2117,7 +2122,9 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             LOG.info("Deploying new Kubernetes Host: " + kubernetesHost + " for Kubernetes group id: " + kubernetesGroupId);
         }
         CloudControllerUtil.validateKubernetesHost(kubernetesHost);
+        Lock lock = null;
         try {
+            lock = CloudControllerContext.getInstance().acquireKubernetesGroupWriteLock();
             KubernetesGroup kubernetesGroupStored = getKubernetesGroup(kubernetesGroupId);
             ArrayList<KubernetesHost> kubernetesHostArrayList;
 
@@ -2134,7 +2141,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 
             // Update information model
             kubernetesGroupStored.setKubernetesHosts(kubernetesHostArrayList.toArray(new KubernetesHost[kubernetesHostArrayList.size()]));
-
+            CloudControllerContext.getInstance().updateKubernetesGroup(kubernetesGroupStored);
             persist();
             
             if (LOG.isInfoEnabled()) {
@@ -2144,6 +2151,10 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             return true;
         } catch (Exception e) {
             throw new InvalidKubernetesHostException(e.getMessage(), e);
+        } finally {
+            if(lock != null) {
+                cloudControllerContext.releaseKubernetesGroupWriteLock(lock);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
index 2fd471b..7e7d130 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
@@ -22,6 +22,7 @@ package org.apache.stratos.common.clustering;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
 
 /**
  * Distributed object provider service interface.
@@ -31,11 +32,7 @@ public interface DistributedObjectProvider extends Serializable {
 
     List getList(String name);
 
-    void putToMap(Map map, Object key, Object value);
+    Lock acquireLock(Object object);
 
-    void removeFromMap(Map map, Object key);
-
-    void addToList(List list, Object value);
-
-    void removeFromList(List list, Object value);
+    void releaseLock(Lock lock);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java
new file mode 100644
index 0000000..537cea9
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.clustering.impl;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IList;
+import com.hazelcast.core.ItemEvent;
+import com.hazelcast.core.ItemListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+
+/**
+ * Hazelcast distributed list provider.
+ */
+public class HazelcastDistributedListProvider {
+    private static final Log log = LogFactory.getLog(HazelcastDistributedListProvider.class);
+
+    private HazelcastInstance hazelcastInstance;
+    private Map<String, DistList> listMap;
+
+    public HazelcastDistributedListProvider(HazelcastInstance hazelcastInstance) {
+        this.hazelcastInstance = hazelcastInstance;
+    }
+
+    public List getList(String name, ListEntryListener listEntryListener) {
+        List list = listMap.get(name);
+        if(list == null) {
+            synchronized (HazelcastDistributedListProvider.class) {
+                if(list == null) {
+                    list = new DistList(name, listEntryListener);
+                }
+            }
+        }
+        return list;
+    }
+
+    public void removeList(String name) {
+        DistList list = listMap.get(name);
+        if(list != null) {
+            IList ilist = (IList) list;
+            ilist.removeItemListener(list.getListenerId());
+            listMap.remove(list);
+            ilist.destroy();
+        }
+    }
+
+    private class DistList implements List {
+        private IList list;
+        private String listenerId;
+
+        public DistList(String name, final ListEntryListener listEntryListener) {
+            this.list = hazelcastInstance.getList(name);
+            listenerId = list.addItemListener(new ItemListener() {
+                @Override
+                public void itemAdded(ItemEvent itemEvent) {
+                    listEntryListener.itemAdded(itemEvent.getItem());
+                }
+
+                @Override
+                public void itemRemoved(ItemEvent itemEvent) {
+                    listEntryListener.itemRemoved(itemEvent.getItem());
+                }
+            }, false);
+        }
+
+        @Override
+        public int size() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.size();
+            }
+            return 0;
+        }
+
+        @Override
+        public boolean isEmpty() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.isEmpty();
+            }
+            return true;
+        }
+
+        @Override
+        public boolean contains(Object object) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.contains(object);
+            }
+            return false;
+        }
+
+        @Override
+        public Iterator iterator() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.iterator();
+            }
+            return null;
+        }
+
+        @Override
+        public Object[] toArray() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.toArray();
+            }
+            return new Object[0];
+        }
+
+        @Override
+        public boolean add(Object object) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.add(object);
+            }
+            return false;
+        }
+
+        @Override
+        public boolean remove(Object object) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.remove(object);
+            }
+            return false;
+        }
+
+        @Override
+        public boolean addAll(Collection collection) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.addAll(collection);
+            }
+            return false;
+        }
+
+        @Override
+        public boolean addAll(int i, Collection collection) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.addAll(i, collection);
+            }
+            return false;
+        }
+
+        @Override
+        public void clear() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                list.clear();
+            }
+        }
+
+        @Override
+        public Object get(int i) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.get(i);
+            }
+            return null;
+        }
+
+        @Override
+        public Object set(int i, Object o) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.set(i, o);
+            }
+            return null;
+        }
+
+        @Override
+        public void add(int i, Object o) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                list.add(i, o);
+            }
+        }
+
+        @Override
+        public Object remove(int i) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.remove(i);
+            }
+            return null;
+        }
+
+        @Override
+        public int indexOf(Object o) {
+            return list.indexOf(o);
+        }
+
+        @Override
+        public int lastIndexOf(Object o) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.lastIndexOf(o);
+            }
+            return -1;
+        }
+
+        @Override
+        public ListIterator listIterator() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.listIterator();
+            }
+            return null;
+        }
+
+        @Override
+        public ListIterator listIterator(int i) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.listIterator(i);
+            }
+            return null;
+        }
+
+        @Override
+        public List subList(int i, int i2) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.subList(i, i2);
+            }
+            return null;
+        }
+
+        @Override
+        public boolean retainAll(Collection collection) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.retainAll(collection);
+            }
+            return false;
+        }
+
+        @Override
+        public boolean removeAll(Collection collection) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.removeAll(collection);
+            }
+            return false;
+        }
+
+        @Override
+        public boolean containsAll(Collection collection) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.containsAll(collection);
+            }
+            return false;
+        }
+
+        @Override
+        public Object[] toArray(Object[] objects) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.toArray(objects);
+            }
+            return null;
+        }
+
+        public String getListenerId() {
+            return listenerId;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
index 55d765e..e5dab09 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
@@ -20,19 +20,22 @@
 package org.apache.stratos.common.clustering.impl;
 
 import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IList;
 import com.hazelcast.core.ILock;
-import com.hazelcast.core.IMap;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.clustering.DistributedObjectProvider;
 import org.apache.stratos.common.internal.ServiceReferenceHolder;
+import org.wso2.carbon.caching.impl.MapEntryListener;
+import org.wso2.carbon.core.clustering.hazelcast.HazelcastDistributedMapProvider;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Provides objects to be managed in distributed and non-distributed environments.
@@ -40,46 +43,15 @@ import java.util.concurrent.ConcurrentHashMap;
 public class HazelcastDistributedObjectProvider implements DistributedObjectProvider {
     private static final Log log = LogFactory.getLog(HazelcastDistributedObjectProvider.class);
 
-    public HazelcastDistributedObjectProvider() {
-    }
-
-    private boolean isClustered() {
-        AxisConfiguration axisConfiguration = ServiceReferenceHolder.getInstance().getAxisConfiguration();
-        return ((axisConfiguration != null) && (axisConfiguration.getClusteringAgent() != null)
-                && (getHazelcastInstance() != null));
-    }
-
-    private HazelcastInstance getHazelcastInstance() {
-        return ServiceReferenceHolder.getInstance().getHazelcastInstance();
-    }
+    private HazelcastDistributedMapProvider mapProvider;
+    private HazelcastDistributedListProvider listProvider;
+    private Map<Object, Lock> locksMap;
 
-    private com.hazelcast.core.ILock acquireDistributedLock(Object object) {
-        if((!isClustered()) || (object == null)) {
-            return null;
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName()));
-        }
-        ILock lock = getHazelcastInstance().getLock(object);
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName()));
-        }
-        return lock;
-    }
-
-    private void releaseDistributedLock(ILock lock) {
-        if(lock == null) {
-            return;
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Releasing distributed lock for %s...", lock.getKey()));
-        }
-        lock.forceUnlock();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Distributed lock released for %s", lock.getKey()));
-        }
+    public HazelcastDistributedObjectProvider() {
+        HazelcastInstance hazelcastInstance = ServiceReferenceHolder.getInstance().getHazelcastInstance();
+        mapProvider = new HazelcastDistributedMapProvider(hazelcastInstance);
+        listProvider = new HazelcastDistributedListProvider(hazelcastInstance);
+        locksMap = new HashMap<Object, Lock>();
     }
 
     /**
@@ -91,7 +63,28 @@ public class HazelcastDistributedObjectProvider implements DistributedObjectProv
     @Override
     public Map getMap(String key) {
         if(isClustered()) {
-            return getHazelcastInstance().getMap(key);
+            return mapProvider.getMap(key, new MapEntryListener() {
+                @Override
+                public <X> void entryAdded(X key) {
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Entry added to distributed map: [key] %s", key));
+                    }
+                }
+
+                @Override
+                public <X> void entryRemoved(X key) {
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Entry removed from distributed map: [key] %s", key));
+                    }
+                }
+
+                @Override
+                public <X> void entryUpdated(X key) {
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Entry updated in distributed map: [key] %s", key));
+                    }
+                }
+            });
         } else {
             return new ConcurrentHashMap<Object, Object>();
         }
@@ -106,94 +99,96 @@ public class HazelcastDistributedObjectProvider implements DistributedObjectProv
     @Override
     public List getList(String name) {
         if(isClustered()) {
-            return getHazelcastInstance().getList(name);
+            return listProvider.getList(name, new ListEntryListener() {
+                @Override
+                public void itemAdded(Object item) {
+                    if(log.isDebugEnabled()) {
+                        log.debug("Item added to distributed list: " + item);
+                    }
+                }
+
+                @Override
+                public void itemRemoved(Object item) {
+                    if(log.isDebugEnabled()) {
+                        log.debug("Item removed from distributed list: " + item);
+                    }
+                }
+            });
         } else {
             return new ArrayList();
         }
     }
 
-    /**
-     * Put a key value pair to a map, if clustered use a distributed lock.
-     * @param map
-     * @param key
-     * @param value
-     */
     @Override
-    public void putToMap(Map map, Object key, Object value) {
+    public Lock acquireLock(Object object) {
+        if(isClustered()) {
+            return acquireDistributedLock(object);
+        } else {
+            Lock lock = locksMap.get(object);
+            if(lock == null) {
+                synchronized (object) {
+                    if(lock == null) {
+                        lock = new ReentrantLock();
+                        locksMap.put(object, lock);
+                    }
+                }
+            }
+            lock.lock();
+            return lock;
+        }
+    }
+
+    @Override
+    public void releaseLock(Lock lock) {
          if(isClustered()) {
-             ILock lock = null;
-             try {
-                 IMap imap = (IMap) map;
-                 lock = acquireDistributedLock(imap.getName());
-                 imap.set(key, value);
-             } finally {
-                 releaseDistributedLock(lock);
-             }
+             releaseDistributedLock((ILock)lock);
          } else {
-            map.put(key, value);
+             lock.unlock();
          }
     }
 
-    /**
-     * Remove an object from a map, if clustered use a distributed lock.
-     * @param map
-     * @param key
-     */
-    @Override
-    public void removeFromMap(Map map, Object key) {
-        if(isClustered()) {
-            ILock lock = null;
-            try {
-                IMap imap = (IMap) map;
-                lock = acquireDistributedLock(imap.getName());
-                imap.delete(key);
-            } finally {
-                releaseDistributedLock(lock);
-            }
-        } else {
-            map.remove(key);
-        }
+    private boolean isClustered() {
+        AxisConfiguration axisConfiguration = ServiceReferenceHolder.getInstance().getAxisConfiguration();
+        return ((axisConfiguration != null) && (axisConfiguration.getClusteringAgent() != null)
+                && (getHazelcastInstance() != null));
     }
 
-    /**
-     * Add an object to a list, if clustered use a distributed lock.
-     * @param list
-     * @param value
-     */
-    @Override
-    public void addToList(List list, Object value) {
-        if(isClustered()) {
-            ILock lock = null;
-            try {
-                IList ilist = (IList) list;
-                lock = acquireDistributedLock(ilist.getName());
-                ilist.add(value);
-            } finally {
-                releaseDistributedLock(lock);
+    private HazelcastInstance getHazelcastInstance() {
+        return ServiceReferenceHolder.getInstance().getHazelcastInstance();
+    }
+
+    protected com.hazelcast.core.ILock acquireDistributedLock(Object object) {
+        if(object == null) {
+            if(log.isWarnEnabled()) {
+                log.warn("Could not acquire distributed lock, object is null");
             }
-        } else {
-            list.add(value);
+            return null;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName()));
+        }
+        ILock lock = getHazelcastInstance().getLock(object);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName()));
         }
+        return lock;
     }
 
-    /**
-     * Remove an object from a list, if clustered use a distributed lock.
-     * @param list
-     * @param value
-     */
-    @Override
-    public void removeFromList(List list, Object value) {
-        if(isClustered()) {
-            ILock lock = null;
-            try {
-                IList ilist = (IList) list;
-                lock = acquireDistributedLock(ilist.getName());
-                ilist.remove(value);
-            } finally {
-                releaseDistributedLock(lock);
+    protected void releaseDistributedLock(ILock lock) {
+        if(lock == null) {
+            if(log.isWarnEnabled()) {
+                log.warn("Could not release distributed lock, lock is null");
             }
-        } else {
-            list.remove(value);
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Releasing distributed lock for %s...", lock.getKey()));
+        }
+        lock.forceUnlock();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Distributed lock released for %s", lock.getKey()));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java
new file mode 100644
index 0000000..96f72dc
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.clustering.impl;
+
+/**
+ * List entry listener interface.
+ */
+public interface ListEntryListener {
+    /**
+     * Invoked when an item is added to the distributed list.
+     * @param item
+     */
+    void itemAdded(Object item);
+
+    /**
+     * Invoked when an item is removed from the distributed list.
+     * @param item
+     */
+    void itemRemoved(Object item);
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
index afb0c83..60ebc99 100644
--- a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
+++ b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
@@ -28,10 +28,9 @@ import org.junit.Test;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
 
-import static junit.framework.TestCase.assertFalse;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -51,6 +50,7 @@ public class DistributedObjectProviderTest {
         ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
         HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
         testPutToMap(provider);
+        testPutToMap(provider);
     }
 
     @Test
@@ -58,77 +58,47 @@ public class DistributedObjectProviderTest {
         ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
         HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
         testPutToMap(provider);
+        testPutToMap(provider);
     }
 
     private void testPutToMap(HazelcastDistributedObjectProvider provider) {
         Map<String, String> map = provider.getMap("MAP1");
-        provider.putToMap(map, "key1", "value1");
-        assertEquals(map.get("key1"), "value1");
-    }
-
-    @Test
-    public void testRemoveFromMapLocal() {
-        ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
-        HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
-        testRemoveFromMap(provider);
-    }
-
-    @Test
-    public void testRemoveFromMapDistributed() {
-        ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
-        HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
-        testRemoveFromMap(provider);
-    }
-
-    private void testRemoveFromMap(HazelcastDistributedObjectProvider provider) {
-        Map<String, String> map = provider.getMap("MAP1");
-        provider.putToMap(map, "key1", "value1");
-        assertEquals(map.get("key1"), "value1");
-        provider.removeFromMap(map, "key1");
-        assertNull(map.get("key1"));
+        Lock lock = null;
+        try {
+            lock = provider.acquireLock("MAP1_WRITE_LOCK");
+            map.put("key1", "value1");
+            assertEquals(map.get("key1"), "value1");
+        } finally {
+            provider.releaseLock(lock);
+        }
     }
 
     @Test
-    public void testAddToListLocal() {
+    public void testGetListLocal() {
         ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
         HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
         testAddToList(provider);
+        testAddToList(provider);
     }
 
     @Test
-    public void testAddToListDistributed() {
+    public void testGetListDistributed() {
         ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
         HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
         testAddToList(provider);
+        testAddToList(provider);
     }
 
     private void testAddToList(HazelcastDistributedObjectProvider provider) {
         List list = provider.getList("LIST1");
-        String value1 = "value1";
-        provider.addToList(list, value1);
-        assertTrue(list.contains(value1));
-    }
-
-    @Test
-    public void testRemoveFromListLocal() {
-        ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
-        HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
-        testRemovalFromList(provider);
-    }
-
-    @Test
-    public void testRemoveFromListDistributed() {
-        ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
-        HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
-        testRemovalFromList(provider);
-    }
-
-    private void testRemovalFromList(HazelcastDistributedObjectProvider provider) {
-        List list = provider.getList("LIST1");
-        String value1 = "value1";
-        provider.addToList(list, value1);
-        assertTrue(list.contains(value1));
-        provider.removeFromList(list, value1);
-        assertFalse(list.contains(value1));
+        Lock lock = null;
+        try {
+            lock = provider.acquireLock("LIST1_WRITE_LOCK");
+            String value1 = "value1";
+            list.add(value1);
+            assertTrue(list.contains(value1));
+        } finally {
+            provider.releaseLock(lock);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
index 3ff824d..240ebba 100755
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
@@ -21,6 +21,8 @@ package org.apache.stratos.load.balancer.context;
 
 import org.apache.stratos.load.balancer.context.map.AlgorithmContextMap;
 
+import java.util.concurrent.locks.Lock;
+
 /**
  * Algorithm context is used for identifying the cluster and its current member for executing load balancing algorithms.
  * Key: service name, cluster id
@@ -32,7 +34,15 @@ public class AlgorithmContext {
     public AlgorithmContext(String serviceName, String clusterId) {
         this.serviceName = serviceName;
         this.clusterId = clusterId;
-        AlgorithmContextMap.getInstance().putCurrentMemberIndex(serviceName, clusterId, 0);
+        Lock lock = null;
+        try {
+            lock = AlgorithmContextMap.getInstance().acquireCurrentMemberIndexLock();
+            AlgorithmContextMap.getInstance().putCurrentMemberIndex(serviceName, clusterId, 0);
+        } finally {
+            if(lock != null) {
+                AlgorithmContextMap.getInstance().releaseCurrentMemberIndexLock(lock);
+            }
+        }
     }
 
     public String getServiceName() {
@@ -48,6 +58,14 @@ public class AlgorithmContext {
     }
 
     public void setCurrentMemberIndex(int currentMemberIndex) {
-        AlgorithmContextMap.getInstance().putCurrentMemberIndex(getServiceName(), getClusterId(), currentMemberIndex);
+        Lock lock = null;
+        try {
+            lock = AlgorithmContextMap.getInstance().acquireCurrentMemberIndexLock();
+            AlgorithmContextMap.getInstance().putCurrentMemberIndex(getServiceName(), getClusterId(), currentMemberIndex);
+        } finally {
+            if(lock != null) {
+                AlgorithmContextMap.getInstance().releaseCurrentMemberIndexLock(lock);
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
index 2c03ccb..35acdb0 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
@@ -25,6 +25,7 @@ import org.apache.stratos.common.clustering.DistributedObjectProvider;
 import org.apache.stratos.load.balancer.internal.ServiceReferenceHolder;
 
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
 
 /**
  * Algorithm context map is a singleton class for managing load balancing algorithm context
@@ -34,6 +35,7 @@ public class AlgorithmContextMap {
     @SuppressWarnings("unused")
     private static final Log log = LogFactory.getLog(AlgorithmContextMap.class);
     private static final String LOAD_BALANCER_ALGORITHM_CONTEXT_MAP = "LOAD_BALANCER_ALGORITHM_CONTEXT_MAP";
+    private static final String CURRENT_MEMBER_INDEX_MAP_LOCK = "CURRENT_MEMBER_INDEX_MAP_LOCK";
     private static AlgorithmContextMap instance;
 
     private final Map<String, Integer> clusterMemberIndexMap;
@@ -61,14 +63,24 @@ public class AlgorithmContextMap {
         return String.format("%s-%s", serviceName, clusterId);
     }
 
+    public Lock acquireCurrentMemberIndexLock() {
+        return distributedObjectProvider.acquireLock(CURRENT_MEMBER_INDEX_MAP_LOCK);
+    }
+
+    public void releaseCurrentMemberIndexLock(Lock lock) {
+        if(lock != null) {
+            distributedObjectProvider.releaseLock(lock);
+        }
+    }
+
     public void putCurrentMemberIndex(String serviceName, String clusterId, int currentMemberIndex) {
         String key = constructKey(serviceName, clusterId);
-        distributedObjectProvider.putToMap(clusterMemberIndexMap, key, currentMemberIndex);
+        clusterMemberIndexMap.put(key, currentMemberIndex);
     }
 
     public void removeCluster(String serviceName, String clusterId) {
         String key = constructKey(serviceName, clusterId);
-        distributedObjectProvider.removeFromMap(clusterMemberIndexMap, key);
+        clusterMemberIndexMap.remove(key);
     }
 
     public int getCurrentMemberIndex(String serviceName, String clusterId) {