You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/03 19:44:50 UTC
[1/3] stratos git commit: Adding distributed locks to cloud
controller service methods
Repository: stratos
Updated Branches:
refs/heads/master 08de72982 -> 71fab2b44
http://git-wip-us.apache.org/repos/asf/stratos/blob/71fab2b4/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 548b743..f9c55a0 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -103,34 +103,24 @@ public class CloudControllerServiceImpl implements CloudControllerService {
// cartridge can never be null
cartridge = CloudControllerUtil.toCartridge(cartridgeConfig);
} catch (Exception e) {
- String msg =
- "Invalid Cartridge Definition: Cartridge Type: " +
- cartridgeConfig.getType() +
- ". Cause: Cannot instantiate a Cartridge Instance with the given Config. " + e.getMessage();
+ String msg = "Invalid cartridge definition: Cartridge type: " + cartridgeConfig.getType() +
+ " Cause: Cannot instantiate a cartridge instance with the given configuration: " + e.getMessage();
LOG.error(msg, e);
throw new InvalidCartridgeDefinitionException(msg, e);
}
- List<IaasProvider> iaases = cartridge.getIaases();
+ List<IaasProvider> iaasProviders = cartridge.getIaases();
if (!StratosConstants.KUBERNETES_DEPLOYER_TYPE.equals(cartridge.getDeployerType())) {
- if (iaases == null || iaases.isEmpty()) {
- String msg = "Invalid Cartridge Definition: Cartridge Type: "
- + cartridgeConfig.getType()
- + ". Cause: Iaases of this Cartridge is null or empty.";
- LOG.error(msg);
- throw new InvalidCartridgeDefinitionException(msg);
- }
-
- if (iaases == null || iaases.isEmpty()) {
- String msg = "Invalid Cartridge Definition: Cartridge Type: " +
+ if (iaasProviders == null || iaasProviders.isEmpty()) {
+ String msg = "Invalid cartridge definition: Cartridge type: " +
cartridgeConfig.getType() +
- ". Cause: Iaases of this Cartridge is null or empty.";
+ " Cause: Iaases of this cartridge is null or empty";
LOG.error(msg);
throw new InvalidCartridgeDefinitionException(msg);
}
- for (IaasProvider iaasProvider : iaases) {
+ for (IaasProvider iaasProvider : iaasProviders) {
CloudControllerUtil.getIaas(iaasProvider);
}
}
@@ -148,7 +138,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
populateNewCartridge(cartridge, cartridgeToBeRemoved);
}
- cloudControllerContext.addCartridge(cartridge);
+ CloudControllerContext.getInstance().addCartridge(cartridge);
// persist
persist();
@@ -188,10 +178,10 @@ public class CloudControllerServiceImpl implements CloudControllerService {
public void undeployCartridgeDefinition(String cartridgeType) throws InvalidCartridgeTypeException {
Cartridge cartridge = null;
- if ((cartridge = cloudControllerContext.getCartridge(cartridgeType)) != null) {
- if (cloudControllerContext.getCartridges().remove(cartridge)) {
+ if ((cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType)) != null) {
+ if (CloudControllerContext.getInstance().getCartridges().remove(cartridge)) {
// invalidate partition validation cache
- cloudControllerContext.removeFromCartridgeTypeToPartitionIds(cartridgeType);
+ CloudControllerContext.getInstance().removeFromCartridgeTypeToPartitionIds(cartridgeType);
if (LOG.isDebugEnabled()) {
LOG.debug("Partition cache invalidated for cartridge " + cartridgeType);
@@ -261,7 +251,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
}
}
- cloudControllerContext.addServiceGroup(servicegroup);
+ CloudControllerContext.getInstance().addServiceGroup(servicegroup);
this.persist();
@@ -274,10 +264,10 @@ public class CloudControllerServiceImpl implements CloudControllerService {
ServiceGroup serviceGroup = null;
- serviceGroup = cloudControllerContext.getServiceGroup(name);
+ serviceGroup = CloudControllerContext.getInstance().getServiceGroup(name);
if (serviceGroup != null) {
- if (cloudControllerContext.getServiceGroups().remove(serviceGroup)) {
+ if (CloudControllerContext.getInstance().getServiceGroups().remove(serviceGroup)) {
persist();
if (LOG.isInfoEnabled()) {
LOG.info("Successfully undeployed the Service Group definition: " + serviceGroup);
@@ -299,7 +289,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
LOG.debug("getServiceGroupDefinition:" + name);
}
- ServiceGroup serviceGroup = this.cloudControllerContext.getServiceGroup(name);
+ ServiceGroup serviceGroup = CloudControllerContext.getInstance().getServiceGroup(name);
if (serviceGroup == null) {
if (LOG.isDebugEnabled()) {
@@ -365,13 +355,13 @@ public class CloudControllerServiceImpl implements CloudControllerService {
memberContext);
String partitionId = partition.getId();
- ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
+ ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
handleNullObject(ctxt, "Instance start-up failed. Invalid cluster id. " + memberContext);
String cartridgeType = ctxt.getCartridgeType();
- Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+ Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
if (cartridge == null) {
String msg =
@@ -552,7 +542,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
*/
private void persist() {
try {
- cloudControllerContext.persist();
+ CloudControllerContext.getInstance().persist();
} catch (RegistryException e) {
String msg = "Failed to persist the cloud controller context in registry.";
LOG.fatal(msg);
@@ -570,7 +560,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
handleNullObject(memberId, "Termination failed. Null member id.");
- MemberContext ctxt = cloudControllerContext.getMemberContextOfMemberId(memberId);
+ MemberContext ctxt = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId);
if (ctxt == null) {
String msg = "Termination failed. Invalid Member Id: " + memberId;
@@ -717,24 +707,23 @@ public class CloudControllerServiceImpl implements CloudControllerService {
@Override
public void run() {
-
String memberId = ctxt.getMemberId();
String clusterId = ctxt.getClusterId();
String partitionId = ctxt.getPartition().getId();
String cartridgeType = ctxt.getCartridgeType();
String nodeId = ctxt.getNodeId();
+ Lock lock = null;
try {
- // these will never be null, since we do not add null values for these.
- Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+ CloudControllerContext.getInstance().acquireMemberContextWriteLock();
+ Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
LOG.info("Starting to terminate an instance with member id : " + memberId +
" in partition id: " + partitionId + " of cluster id: " + clusterId +
" and of cartridge type: " + cartridgeType);
if (cartridge == null) {
- String msg =
- "Termination of Member Id: " + memberId + " failed. " +
+ String msg = "Termination of Member Id: " + memberId + " failed. " +
"Cannot find a matching Cartridge for type: " +
cartridgeType;
LOG.error(msg);
@@ -743,10 +732,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
// if no matching node id can be found.
if (nodeId == null) {
-
- String msg =
- "Termination failed. Cannot find a node id for Member Id: " +
- memberId;
+ String msg = "Termination failed. Cannot find a node id for Member Id: " + memberId;
// log information
logTermination(ctxt);
@@ -761,14 +747,15 @@ public class CloudControllerServiceImpl implements CloudControllerService {
// log information
logTermination(ctxt);
-
} catch (Exception e) {
- String msg =
- "Instance termination failed. " + ctxt.toString();
+ String msg = "Instance termination failed. " + ctxt.toString();
LOG.error(msg, e);
throw new CloudControllerException(msg, e);
+ } finally {
+ if(lock != null) {
+ CloudControllerContext.getInstance().releaseWriteLock(lock);
+ }
}
-
}
}
@@ -787,117 +774,127 @@ public class CloudControllerServiceImpl implements CloudControllerService {
@Override
public void run() {
+ Lock lock = null;
+ try {
+ lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock();
+ String clusterId = memberContext.getClusterId();
+ Partition partition = memberContext.getPartition();
+ ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
+ Iaas iaas = iaasProvider.getIaas();
+ String publicIp = null;
- String clusterId = memberContext.getClusterId();
- Partition partition = memberContext.getPartition();
- ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
- Iaas iaas = iaasProvider.getIaas();
- String publicIp = null;
-
- NodeMetadata node = null;
- // generate the group id from domain name and sub domain name.
- // Should have lower-case ASCII letters, numbers, or dashes.
- // Should have a length between 3-15
- String str = clusterId.length() > 10 ? clusterId.substring(0, 10) : clusterId.substring(0, clusterId.length());
- String group = str.replaceAll("[^a-z0-9-]", "");
+ NodeMetadata node = null;
+ // generate the group id from domain name and sub domain name.
+ // Should have lower-case ASCII letters, numbers, or dashes.
+ // Should have a length between 3-15
+ String str = clusterId.length() > 10 ? clusterId.substring(0, 10) : clusterId.substring(0, clusterId.length());
+ String group = str.replaceAll("[^a-z0-9-]", "");
- try {
- ComputeService computeService = iaasProvider
- .getComputeService();
- Template template = iaasProvider.getTemplate();
+ try {
+ ComputeService computeService = iaasProvider.getComputeService();
+ Template template = iaasProvider.getTemplate();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cloud Controller is delegating request to start an instance for "
- + memberContext + " to Jclouds layer.");
- }
- // create and start a node
- Set<? extends NodeMetadata> nodes = computeService
- .createNodesInGroup(group, 1, template);
- node = nodes.iterator().next();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cloud Controller received a response for the request to start "
- + memberContext + " from Jclouds layer.");
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cloud Controller is delegating request to start an instance for "
+ + memberContext + " to Jclouds layer.");
+ }
+ // create and start a node
+ Set<? extends NodeMetadata> nodes = computeService
+ .createNodesInGroup(group, 1, template);
+ node = nodes.iterator().next();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cloud Controller received a response for the request to start "
+ + memberContext + " from Jclouds layer.");
+ }
- if (node == null) {
- String msg = "Null response received for instance start-up request to Jclouds.\n"
- + memberContext.toString();
- LOG.error(msg);
- throw new IllegalStateException(msg);
- }
+ if (node == null) {
+ String msg = "Null response received for instance start-up request to Jclouds.\n"
+ + memberContext.toString();
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
- // node id
- String nodeId = node.getId();
- if (nodeId == null) {
- String msg = "Node id of the starting instance is null.\n"
- + memberContext.toString();
- LOG.fatal(msg);
- throw new IllegalStateException(msg);
- }
+ // node id
+ String nodeId = node.getId();
+ if (nodeId == null) {
+ String msg = "Node id of the starting instance is null.\n"
+ + memberContext.toString();
+ LOG.fatal(msg);
+ throw new IllegalStateException(msg);
+ }
- memberContext.setNodeId(nodeId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Node id was set. " + memberContext.toString());
- }
+ memberContext.setNodeId(nodeId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node id was set. " + memberContext.toString());
+ }
- // attach volumes
- if (ctxt.isVolumeRequired()) {
- // remove region prefix
- String instanceId = nodeId.indexOf('/') != -1 ? nodeId
- .substring(nodeId.indexOf('/') + 1, nodeId.length())
- : nodeId;
- memberContext.setInstanceId(instanceId);
- if (ctxt.getVolumes() != null) {
- for (Volume volume : ctxt.getVolumes()) {
- try {
- iaas.attachVolume(instanceId, volume.getId(),
- volume.getDevice());
- } catch (Exception e) {
- // continue without throwing an exception, since
- // there is an instance already running
- LOG.error("Attaching Volume to Instance [ "
- + instanceId + " ] failed!", e);
+ // attach volumes
+ if (ctxt.isVolumeRequired()) {
+ // remove region prefix
+ String instanceId = nodeId.indexOf('/') != -1 ? nodeId
+ .substring(nodeId.indexOf('/') + 1, nodeId.length())
+ : nodeId;
+ memberContext.setInstanceId(instanceId);
+ if (ctxt.getVolumes() != null) {
+ for (Volume volume : ctxt.getVolumes()) {
+ try {
+ iaas.attachVolume(instanceId, volume.getId(),
+ volume.getDevice());
+ } catch (Exception e) {
+ // continue without throwing an exception, since
+ // there is an instance already running
+ LOG.error("Attaching Volume to Instance [ "
+ + instanceId + " ] failed!", e);
+ }
}
}
}
- }
-
- } catch (Exception e) {
- String msg = "Failed to start an instance. " + memberContext.toString() + " Cause: " + e.getMessage();
- LOG.error(msg, e);
- throw new IllegalStateException(msg, e);
- }
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("IP allocation process started for " + memberContext);
+ } catch (Exception e) {
+ String msg = "Failed to start an instance. " + memberContext.toString() + " Cause: " + e.getMessage();
+ LOG.error(msg, e);
+ throw new IllegalStateException(msg, e);
}
- String autoAssignIpProp =
- iaasProvider.getProperty(CloudControllerConstants.AUTO_ASSIGN_IP_PROPERTY);
- String pre_defined_ip =
- iaasProvider.getProperty(CloudControllerConstants.FLOATING_IP_PROPERTY);
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("IP allocation process started for " + memberContext);
+ }
+ String autoAssignIpProp =
+ iaasProvider.getProperty(CloudControllerConstants.AUTO_ASSIGN_IP_PROPERTY);
- // reset ip
- String ip = "";
+ String pre_defined_ip =
+ iaasProvider.getProperty(CloudControllerConstants.FLOATING_IP_PROPERTY);
- // default behavior is autoIpAssign=false
- if (autoAssignIpProp == null ||
- (autoAssignIpProp != null && autoAssignIpProp.equals("false"))) {
+ // reset ip
+ String ip = "";
- // check if floating ip is well defined in cartridge definition
- if (pre_defined_ip != null) {
- if (isValidIpAddress(pre_defined_ip)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("CloudControllerServiceImpl:IpAllocator:pre_defined_ip: invoking associatePredefinedAddress" + pre_defined_ip);
- }
- ip = iaas.associatePredefinedAddress(node, pre_defined_ip);
+ // default behavior is autoIpAssign=false
+ if (autoAssignIpProp == null ||
+ (autoAssignIpProp != null && autoAssignIpProp.equals("false"))) {
- if (ip == null || "".equals(ip) || !pre_defined_ip.equals(ip)) {
- // throw exception and stop instance creation
- String msg = "Error occurred while allocating predefined floating ip address: " + pre_defined_ip +
- " / allocated ip:" + ip +
+ // check if floating ip is well defined in cartridge definition
+ if (pre_defined_ip != null) {
+ if (isValidIpAddress(pre_defined_ip)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CloudControllerServiceImpl:IpAllocator:pre_defined_ip: invoking associatePredefinedAddress" + pre_defined_ip);
+ }
+ ip = iaas.associatePredefinedAddress(node, pre_defined_ip);
+
+ if (ip == null || "".equals(ip) || !pre_defined_ip.equals(ip)) {
+ // throw exception and stop instance creation
+ String msg = "Error occurred while allocating predefined floating ip address: " + pre_defined_ip +
+ " / allocated ip:" + ip +
+ " - terminating node:" + memberContext.toString();
+ LOG.error(msg);
+ // terminate instance
+ terminate(iaasProvider,
+ node.getId(), memberContext);
+ throw new CloudControllerException(msg);
+ }
+ } else {
+ String msg = "Invalid floating ip address configured: " + pre_defined_ip +
" - terminating node:" + memberContext.toString();
LOG.error(msg);
// terminate instance
@@ -905,111 +902,105 @@ public class CloudControllerServiceImpl implements CloudControllerService {
node.getId(), memberContext);
throw new CloudControllerException(msg);
}
+
} else {
- String msg = "Invalid floating ip address configured: " + pre_defined_ip +
- " - terminating node:" + memberContext.toString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CloudControllerServiceImpl:IpAllocator:no (valid) predefined floating ip configured, "
+ + "selecting available one from pool");
+ }
+ // allocate an IP address - manual IP assigning mode
+ ip = iaas.associateAddress(node);
+
+ if (ip != null) {
+ memberContext.setAllocatedIpAddress(ip);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Allocated an ip address: "
+ + memberContext.toString());
+ } else if (LOG.isInfoEnabled()) {
+ LOG.info("Allocated ip address [ " + memberContext.getAllocatedIpAddress() +
+ " ] to member with id: " + memberContext.getMemberId());
+ }
+ }
+ }
+
+ if (ip == null) {
+ String msg = "No IP address found. IP allocation failed for " + memberContext;
LOG.error(msg);
- // terminate instance
- terminate(iaasProvider,
- node.getId(), memberContext);
throw new CloudControllerException(msg);
}
- } else {
+ // build the node with the new ip
+ node = NodeMetadataBuilder.fromNodeMetadata(node)
+ .publicAddresses(ImmutableSet.of(ip)).build();
+ }
+
+
+ // public ip
+ if (node.getPublicAddresses() != null &&
+ node.getPublicAddresses().iterator().hasNext()) {
+ ip = node.getPublicAddresses().iterator().next();
+ publicIp = ip;
+ memberContext.setPublicIpAddress(ip);
if (LOG.isDebugEnabled()) {
- LOG.debug("CloudControllerServiceImpl:IpAllocator:no (valid) predefined floating ip configured, "
- + "selecting available one from pool");
+ LOG.debug("Retrieving Public IP Address : " + memberContext.toString());
+ } else if (LOG.isInfoEnabled()) {
+ LOG.info("Retrieving Public IP Address: " + memberContext.getPublicIpAddress() +
+ ", member id: " + memberContext.getMemberId());
}
- // allocate an IP address - manual IP assigning mode
- ip = iaas.associateAddress(node);
+ }
- if (ip != null) {
- memberContext.setAllocatedIpAddress(ip);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Allocated an ip address: "
- + memberContext.toString());
- } else if (LOG.isInfoEnabled()) {
- LOG.info("Allocated ip address [ " + memberContext.getAllocatedIpAddress() +
- " ] to member with id: " + memberContext.getMemberId());
- }
+ // private IP
+ if (node.getPrivateAddresses() != null &&
+ node.getPrivateAddresses().iterator().hasNext()) {
+ ip = node.getPrivateAddresses().iterator().next();
+ memberContext.setPrivateIpAddress(ip);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrieving Private IP Address. " + memberContext.toString());
+ } else if (LOG.isInfoEnabled()) {
+ LOG.info("Retrieving Private IP Address: " + memberContext.getPrivateIpAddress() +
+ ", member id: " + memberContext.getMemberId());
}
}
- if (ip == null) {
- String msg = "No IP address found. IP allocation failed for " + memberContext;
- LOG.error(msg);
- throw new CloudControllerException(msg);
- }
+ CloudControllerContext.getInstance().addMemberContext(memberContext);
- // build the node with the new ip
- node = NodeMetadataBuilder.fromNodeMetadata(node)
- .publicAddresses(ImmutableSet.of(ip)).build();
- }
+ // persist in registry
+ persist();
+
+
+ // trigger topology
+ TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId,
+ partition.getId(), ip, publicIp, memberContext);
+ String memberID = memberContext.getMemberId();
- // public ip
- if (node.getPublicAddresses() != null &&
- node.getPublicAddresses().iterator().hasNext()) {
- ip = node.getPublicAddresses().iterator().next();
- publicIp = ip;
- memberContext.setPublicIpAddress(ip);
+ // update the topology with the newly spawned member
+ // publish data
+ CartridgeInstanceDataPublisher.publish(memberID,
+ memberContext.getPartition().getId(),
+ memberContext.getNetworkPartitionId(),
+ memberContext.getClusterId(),
+ cartridgeType,
+ MemberStatus.Created.toString(),
+ node);
if (LOG.isDebugEnabled()) {
- LOG.debug("Retrieving Public IP Address : " + memberContext.toString());
- } else if (LOG.isInfoEnabled()) {
- LOG.info("Retrieving Public IP Address: " + memberContext.getPublicIpAddress() +
- ", member id: " + memberContext.getMemberId());
+ LOG.debug("Node details: " + node.toString());
}
- }
- // private IP
- if (node.getPrivateAddresses() != null &&
- node.getPrivateAddresses().iterator().hasNext()) {
- ip = node.getPrivateAddresses().iterator().next();
- memberContext.setPrivateIpAddress(ip);
if (LOG.isDebugEnabled()) {
- LOG.debug("Retrieving Private IP Address. " + memberContext.toString());
- } else if (LOG.isInfoEnabled()) {
- LOG.info("Retrieving Private IP Address: " + memberContext.getPrivateIpAddress() +
- ", member id: " + memberContext.getMemberId());
+ LOG.debug("IP allocation process ended for " + memberContext);
}
- }
-
- cloudControllerContext.addMemberContext(memberContext);
-
- // persist in registry
- persist();
-
-
- // trigger topology
- TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId,
- partition.getId(), ip, publicIp, memberContext);
- String memberID = memberContext.getMemberId();
-
- // update the topology with the newly spawned member
- // publish data
- CartridgeInstanceDataPublisher.publish(memberID,
- memberContext.getPartition().getId(),
- memberContext.getNetworkPartitionId(),
- memberContext.getClusterId(),
- cartridgeType,
- MemberStatus.Created.toString(),
- node);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Node details: " + node.toString());
+ } catch (Exception e) {
+ String msg = "Error occurred while allocating an ip address. " + memberContext.toString();
+ LOG.error(msg, e);
+ throw new CloudControllerException(msg, e);
}
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("IP allocation process ended for " + memberContext);
+ } finally {
+ if(lock != null) {
+ CloudControllerContext.getInstance().releaseWriteLock(lock);
}
-
- } catch (Exception e) {
- String msg = "Error occurred while allocating an ip address. " + memberContext.toString();
- LOG.error(msg, e);
- throw new CloudControllerException(msg, e);
}
-
-
}
}
@@ -1026,7 +1017,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
handleNullObject(clusterId, "Instance termination failed. Cluster id is null.");
- List<MemberContext> ctxts = cloudControllerContext.getMemberContextsOfClusterId(clusterId);
+ List<MemberContext> ctxts = CloudControllerContext.getInstance().getMemberContextsOfClusterId(clusterId);
if (ctxts == null) {
String msg = "Instance termination failed. No members found for cluster id: " + clusterId;
@@ -1088,7 +1079,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
private void detachVolume(IaasProvider iaasProvider, MemberContext ctxt) {
String clusterId = ctxt.getClusterId();
- ClusterContext clusterCtxt = cloudControllerContext.getClusterContext(clusterId);
+ ClusterContext clusterCtxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
if (clusterCtxt.getVolumes() != null) {
for (Volume volume : clusterCtxt.getVolumes()) {
try {
@@ -1130,11 +1121,10 @@ public class CloudControllerServiceImpl implements CloudControllerService {
null);
// update data holders
- cloudControllerContext.removeMemberContext(memberContext.getMemberId(), memberContext.getClusterId());
+ CloudControllerContext.getInstance().removeMemberContext(memberContext.getMemberId(), memberContext.getClusterId());
// persist
persist();
-
}
@Override
@@ -1154,7 +1144,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
handleNullObject(hostName, "Service registration failed. Hostname is null.");
Cartridge cartridge = null;
- if ((cartridge = cloudControllerContext.getCartridge(cartridgeType)) == null) {
+ if ((cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType)) == null) {
String msg = "Registration of cluster: " + clusterId +
" failed. - Unregistered Cartridge type: " + cartridgeType;
@@ -1171,7 +1161,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
payload, hostName, props, isLb, registrant.getPersistence());
- cloudControllerContext.addClusterContext(ctxt);*/
+ CloudControllerContext.getInstance().addClusterContext(ctxt);*/
TopologyBuilder.handleClusterCreated(registrant, isLb);
persist();
@@ -1213,8 +1203,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
@Override
public String[] getRegisteredCartridges() {
// get the list of cartridges registered
- List<Cartridge> cartridges = cloudControllerContext
- .getCartridges();
+ Collection<Cartridge> cartridges = CloudControllerContext.getInstance().getCartridges();
if (cartridges == null) {
LOG.info("No registered Cartridge found.");
@@ -1241,7 +1230,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
@Override
public CartridgeInfo getCartridgeInfo(String cartridgeType)
throws UnregisteredCartridgeException {
- Cartridge cartridge = cloudControllerContext
+ Cartridge cartridge = CloudControllerContext.getInstance()
.getCartridge(cartridgeType);
if (cartridge != null) {
@@ -1260,13 +1249,13 @@ public class CloudControllerServiceImpl implements CloudControllerService {
public void unregisterService(String clusterId) throws UnregisteredClusterException {
final String clusterId_ = clusterId;
- ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_);
+ ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId_);
handleNullObject(ctxt, "Service unregistration failed. Invalid cluster id: " + clusterId);
String cartridgeType = ctxt.getCartridgeType();
- Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+ Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
if (cartridge == null) {
String msg =
@@ -1281,12 +1270,12 @@ public class CloudControllerServiceImpl implements CloudControllerService {
} else {
-// TopologyBuilder.handleClusterMaintenanceMode(cloudControllerContext.getClusterContext(clusterId_));
+// TopologyBuilder.handleClusterMaintenanceMode(CloudControllerContext.getInstance().getClusterContext(clusterId_));
Runnable terminateInTimeout = new Runnable() {
@Override
public void run() {
- ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_);
+ ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId_);
if (ctxt == null) {
String msg = "Service unregistration failed. Cluster not found: " + clusterId_;
LOG.error(msg);
@@ -1325,52 +1314,69 @@ public class CloudControllerServiceImpl implements CloudControllerService {
};
Runnable unregister = new Runnable() {
public void run() {
- ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_);
- if (ctxt == null) {
- String msg = "Service unregistration failed. Cluster not found: " + clusterId_;
- LOG.error(msg);
- return;
- }
- Collection<Member> members = TopologyManager.getTopology().
- getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers();
- // TODO why end time is needed?
- // long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * members.size();
+ Lock lock = null;
+ try {
+ lock = CloudControllerContext.getInstance().acquireClusterContextWriteLock();
+ ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId_);
+ if (ctxt == null) {
+ String msg = "Service unregistration failed. Cluster not found: " + clusterId_;
+ LOG.error(msg);
+ return;
+ }
+ Collection<Member> members = TopologyManager.getTopology().
+ getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers();
+ // TODO why end time is needed?
+ // long endTime = System.currentTimeMillis() + ctxt.getTimeoutInMillis() * members.size();
+
+ while (members.size() > 0) {
+ //waiting until all the members got removed from the Topology/ timed out
+ CloudControllerUtil.sleep(1000);
+ }
- while (members.size() > 0) {
- //waiting until all the members got removed from the Topology/ timed out
- CloudControllerUtil.sleep(1000);
+ LOG.info("Unregistration of service cluster: " + clusterId_);
+ deleteVolumes(ctxt);
+ onClusterRemoval(clusterId_);
+ } finally {
+ if(lock != null) {
+ CloudControllerContext.getInstance().releaseWriteLock(lock);
+ }
}
-
- LOG.info("Unregistration of service cluster: " + clusterId_);
- deleteVolumes(ctxt);
- onClusterRemoval(clusterId_);
}
private void deleteVolumes(ClusterContext ctxt) {
if (ctxt.isVolumeRequired()) {
- Cartridge cartridge = cloudControllerContext.getCartridge(ctxt.getCartridgeType());
- if (cartridge != null && cartridge.getIaases() != null && ctxt.getVolumes() != null) {
- for (Volume volume : ctxt.getVolumes()) {
- if (volume.getId() != null) {
- String iaasType = volume.getIaasType();
- //Iaas iaas = cloudControllerContext.getIaasProvider(iaasType).getIaas();
- Iaas iaas = cartridge.getIaasProvider(iaasType).getIaas();
- if (iaas != null) {
- try {
- // delete the volumes if remove on unsubscription is true.
- if (volume.isRemoveOntermination()) {
- iaas.deleteVolume(volume.getId());
- volume.setId(null);
- }
- } catch (Exception ignore) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Error while deleting volume [id] " + volume.getId(), ignore);
+ Lock lock = null;
+ try {
+ lock = CloudControllerContext.getInstance().acquireCartridgesWriteLock();
+
+ Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(ctxt.getCartridgeType());
+ if (cartridge != null && cartridge.getIaases() != null && ctxt.getVolumes() != null) {
+ for (Volume volume : ctxt.getVolumes()) {
+ if (volume.getId() != null) {
+ String iaasType = volume.getIaasType();
+ //Iaas iaas = CloudControllerContext.getInstance().getIaasProvider(iaasType).getIaas();
+ Iaas iaas = cartridge.getIaasProvider(iaasType).getIaas();
+ if (iaas != null) {
+ try {
+ // delete the volumes if remove on unsubscription is true.
+ if (volume.isRemoveOntermination()) {
+ iaas.deleteVolume(volume.getId());
+ volume.setId(null);
+ }
+ } catch (Exception ignore) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Error while deleting volume [id] " + volume.getId(), ignore);
+ }
}
}
}
}
+ CloudControllerContext.getInstance().updateCartridge(cartridge);
+ }
+ } finally {
+ if(lock != null) {
+ CloudControllerContext.getInstance().releaseWriteLock(lock);
}
-
}
}
}
@@ -1383,104 +1389,120 @@ public class CloudControllerServiceImpl implements CloudControllerService {
@Override
public void unregisterDockerService(String clusterId)
throws UnregisteredClusterException {
-
- // terminate all kubernetes units
+ Lock lock = null;
try {
- terminateAllContainers(clusterId);
- } catch (InvalidClusterException e) {
- String msg = "Docker instance termination fails for cluster: " + clusterId;
- LOG.error(msg, e);
- throw new UnregisteredClusterException(msg, e);
+ lock = CloudControllerContext.getInstance().acquireClusterContextWriteLock();
+ // terminate all kubernetes units
+ try {
+ terminateAllContainers(clusterId);
+ } catch (InvalidClusterException e) {
+ String msg = "Docker instance termination fails for cluster: " + clusterId;
+ LOG.error(msg, e);
+ throw new UnregisteredClusterException(msg, e);
+ }
+ // send cluster removal notifications and update the state
+ onClusterRemoval(clusterId);
+ } finally {
+ if(lock != null) {
+ CloudControllerContext.getInstance().releaseWriteLock(lock);
+ }
}
- // send cluster removal notifications and update the state
- onClusterRemoval(clusterId);
}
-
+ /***
+ * FIXME: A validate method shouldn't persist any data
+ */
@Override
public boolean validateDeploymentPolicy(String cartridgeType, Partition[] partitions)
throws InvalidPartitionException, InvalidCartridgeTypeException {
- List<String> validatedPartitions = CloudControllerContext.getInstance().getPartitionIds(cartridgeType);
- if (validatedPartitions != null) {
- // cache hit for this cartridge
- // get list of partitions
- if (LOG.isDebugEnabled()) {
- LOG.debug("Partition validation cache hit for cartridge type: " + cartridgeType);
- }
- }
-
- Map<String, IaasProvider> partitionToIaasProviders =
- new ConcurrentHashMap<String, IaasProvider>();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Deployment policy validation started for cartridge type: " + cartridgeType);
- }
-
- Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+ Lock lock = null;
+ try {
+ lock = CloudControllerContext.getInstance().acquireCartridgesWriteLock();
- if (cartridge == null) {
- String msg = "Invalid Cartridge Type: " + cartridgeType;
- LOG.error(msg);
- throw new InvalidCartridgeTypeException(msg);
- }
+ List<String> validatedPartitions = CloudControllerContext.getInstance().getPartitionIds(cartridgeType);
+ if (validatedPartitions != null) {
+ // cache hit for this cartridge
+ // get list of partitions
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partition validation cache hit for cartridge type: " + cartridgeType);
+ }
+ }
- Map<String, Future<IaasProvider>> jobList = new HashMap<String, Future<IaasProvider>>();
+ Map<String, IaasProvider> partitionToIaasProviders =
+ new ConcurrentHashMap<String, IaasProvider>();
- for (Partition partition : partitions) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deployment policy validation started for cartridge type: " + cartridgeType);
+ }
- if (validatedPartitions.contains(partition.getId())) {
- // partition cache hit
- continue;
+ Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
+ if (cartridge == null) {
+ String msg = "Invalid Cartridge Type: " + cartridgeType;
+ LOG.error(msg);
+ throw new InvalidCartridgeTypeException(msg);
}
- Callable<IaasProvider> worker = new PartitionValidatorCallable(
- partition, cartridge);
- Future<IaasProvider> job = CloudControllerContext.getInstance()
- .getExecutorService().submit(worker);
- jobList.put(partition.getId(), job);
- }
+ Map<String, Future<IaasProvider>> jobList = new HashMap<String, Future<IaasProvider>>();
+ for (Partition partition : partitions) {
+ if (validatedPartitions.contains(partition.getId())) {
+ // partition cache hit
+ continue;
+ }
- // Retrieve the results of the concurrently performed sanity checks.
- for (Entry<String, Future<IaasProvider>> entry : jobList.entrySet()) {
- if (entry == null) {
- continue;
+ Callable<IaasProvider> worker = new PartitionValidatorCallable(
+ partition, cartridge);
+ Future<IaasProvider> job = CloudControllerContext.getInstance()
+ .getExecutorService().submit(worker);
+ jobList.put(partition.getId(), job);
}
- String partitionId = entry.getKey();
- Future<IaasProvider> job = entry.getValue();
- try {
- // add to a temporary Map
- partitionToIaasProviders.put(partitionId, job.get());
- // add to cache
- this.cloudControllerContext.addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId);
+ // Retrieve the results of the concurrently performed sanity checks.
+ for (Entry<String, Future<IaasProvider>> entry : jobList.entrySet()) {
+ if (entry == null) {
+ continue;
+ }
+ String partitionId = entry.getKey();
+ Future<IaasProvider> job = entry.getValue();
+ try {
+ // add to a temporary Map
+ partitionToIaasProviders.put(partitionId, job.get());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Partition " + partitionId + " added to the cache against cartridge type: " + cartridgeType);
+ // add to cache
+ CloudControllerContext.getInstance().addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partition " + partitionId + " added to the cache against cartridge type: " + cartridgeType);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new InvalidPartitionException(e.getMessage(), e);
}
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new InvalidPartitionException(e.getMessage(), e);
}
- }
- // if and only if the deployment policy valid
- cartridge.addIaasProviders(partitionToIaasProviders);
+ // if and only if the deployment policy valid
+ cartridge.addIaasProviders(partitionToIaasProviders);
+ CloudControllerContext.getInstance().updateCartridge(cartridge);
- // persist data
- persist();
+ // persist data
+ persist();
- LOG.info("All partitions " + CloudControllerUtil.getPartitionIds(partitions) +
- " were validated successfully, against the Cartridge: " + cartridgeType);
+ LOG.info("All partitions " + CloudControllerUtil.getPartitionIds(partitions) +
+ " were validated successfully, against the Cartridge: " + cartridgeType);
- return true;
+ return true;
+ } finally {
+ if(lock != null) {
+ CloudControllerContext.getInstance().releaseWriteLock(lock);
+ }
+ }
}
private void onClusterRemoval(final String clusterId) {
- ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
+ ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
TopologyBuilder.handleClusterRemoved(ctxt);
- cloudControllerContext.removeClusterContext(clusterId);
- cloudControllerContext.removeMemberContextsOfCluster(clusterId);
+ CloudControllerContext.getInstance().removeClusterContext(clusterId);
+ CloudControllerContext.getInstance().removeMemberContextsOfCluster(clusterId);
persist();
}
@@ -1524,160 +1546,166 @@ public class CloudControllerServiceImpl implements CloudControllerService {
}
public ClusterContext getClusterContext(String clusterId) {
-
- return cloudControllerContext.getClusterContext(clusterId);
+ return CloudControllerContext.getInstance().getClusterContext(clusterId);
}
@Override
public MemberContext[] startContainers(ContainerClusterContext containerClusterContext)
throws UnregisteredCartridgeException {
+ Lock lock = null;
+ try {
+ lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock();
- if (LOG.isDebugEnabled()) {
- LOG.debug("CloudControllerServiceImpl:startContainers");
- }
-
- handleNullObject(containerClusterContext, "Container start-up failed. ContainerClusterContext is null.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CloudControllerServiceImpl:startContainers");
+ }
- String clusterId = containerClusterContext.getClusterId();
- handleNullObject(clusterId, "Container start-up failed. Cluster id is null.");
+ handleNullObject(containerClusterContext, "Container start-up failed. ContainerClusterContext is null.");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received a container spawn request : " + containerClusterContext.toString());
- }
+ String clusterId = containerClusterContext.getClusterId();
+ handleNullObject(clusterId, "Container start-up failed. Cluster id is null.");
- ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
- handleNullObject(ctxt, "Container start-up failed. Invalid cluster id. " + containerClusterContext.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received a container spawn request : " + containerClusterContext.toString());
+ }
- String cartridgeType = ctxt.getCartridgeType();
+ ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
+ handleNullObject(ctxt, "Container start-up failed. Invalid cluster id. " + containerClusterContext.toString());
- Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+ String cartridgeType = ctxt.getCartridgeType();
- if (cartridge == null) {
- String msg =
- "Instance start-up failed. No matching Cartridge found [type] " + cartridgeType + ". " +
- containerClusterContext.toString();
- LOG.error(msg);
- throw new UnregisteredCartridgeException(msg);
- }
+ Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
- try {
- String minReplicas = validateProperty(StratosConstants.KUBERNETES_MIN_REPLICAS, ctxt);
- String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
- String kubernetesMasterIp = validateProperty(StratosConstants.KUBERNETES_MASTER_IP, containerClusterContext);
- String kubernetesPortRange = validateProperty(StratosConstants.KUBERNETES_PORT_RANGE, containerClusterContext);
+ if (cartridge == null) {
+ String msg = "Instance start-up failed. No matching Cartridge found [type] " + cartridgeType + ". " +
+ containerClusterContext.toString();
+ LOG.error(msg);
+ throw new UnregisteredCartridgeException(msg);
+ }
- KubernetesClusterContext kubClusterContext = getKubernetesClusterContext(kubernetesClusterId, kubernetesMasterIp, kubernetesPortRange);
+ try {
+ String minReplicas = validateProperty(StratosConstants.KUBERNETES_MIN_REPLICAS, ctxt);
+ String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
+ String kubernetesMasterIp = validateProperty(StratosConstants.KUBERNETES_MASTER_IP, containerClusterContext);
+ String kubernetesPortRange = validateProperty(StratosConstants.KUBERNETES_PORT_RANGE, containerClusterContext);
- KubernetesApiClient kubApi = kubClusterContext.getKubApi();
+ KubernetesClusterContext kubClusterContext = getKubernetesClusterContext(kubernetesClusterId,
+ kubernetesMasterIp, kubernetesPortRange);
+ KubernetesApiClient kubApi = kubClusterContext.getKubApi();
- // first let's create a replication controller.
- ContainerClusterContextToReplicationController controllerFunction = new ContainerClusterContextToReplicationController();
- ReplicationController controller = controllerFunction.apply(containerClusterContext);
+ // first let's create a replication controller.
+ ContainerClusterContextToReplicationController controllerFunction = new ContainerClusterContextToReplicationController();
+ ReplicationController controller = controllerFunction.apply(containerClusterContext);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cloud Controller is delegating request to start a replication controller " + controller +
- " for " + containerClusterContext + " to Kubernetes layer.");
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cloud Controller is delegating request to start a replication controller " + controller +
+ " for " + containerClusterContext + " to Kubernetes layer.");
+ }
- kubApi.createReplicationController(controller);
+ kubApi.createReplicationController(controller);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cloud Controller successfully started the controller "
- + controller + " via Kubernetes layer.");
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cloud Controller successfully started the controller "
+ + controller + " via Kubernetes layer.");
+ }
- // secondly let's create a kubernetes service proxy to load balance these containers
- ContainerClusterContextToKubernetesService serviceFunction = new ContainerClusterContextToKubernetesService();
- Service service = serviceFunction.apply(containerClusterContext);
+ // secondly let's create a kubernetes service proxy to load balance these containers
+ ContainerClusterContextToKubernetesService serviceFunction = new ContainerClusterContextToKubernetesService();
+ Service service = serviceFunction.apply(containerClusterContext);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cloud Controller is delegating request to start a service " + service +
- " for " + containerClusterContext + " to Kubernetes layer.");
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cloud Controller is delegating request to start a service " + service +
+ " for " + containerClusterContext + " to Kubernetes layer.");
+ }
- kubApi.createService(service);
+ kubApi.createService(service);
- // set host port and update
- Property allocatedServiceHostPortProp = new Property();
- allocatedServiceHostPortProp.setName(StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
- allocatedServiceHostPortProp.setValue(String.valueOf(service.getPort()));
- ctxt.getProperties().addProperty(allocatedServiceHostPortProp);
- cloudControllerContext.addClusterContext(ctxt);
+ // set host port and update
+ Property allocatedServiceHostPortProp = new Property();
+ allocatedServiceHostPortProp.setName(StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
+ allocatedServiceHostPortProp.setValue(String.valueOf(service.getPort()));
+ ctxt.getProperties().addProperty(allocatedServiceHostPortProp);
+ CloudControllerContext.getInstance().addClusterContext(ctxt);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cloud Controller successfully started the service "
- + controller + " via Kubernetes layer.");
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cloud Controller successfully started the service "
+ + controller + " via Kubernetes layer.");
+ }
- // create a label query
- Label l = new Label();
- l.setName(clusterId);
- // execute the label query
- Pod[] newlyCreatedPods = new Pod[0];
- int expectedCount = Integer.parseInt(minReplicas);
+ // create a label query
+ Label l = new Label();
+ l.setName(clusterId);
+ // execute the label query
+ Pod[] newlyCreatedPods = new Pod[0];
+ int expectedCount = Integer.parseInt(minReplicas);
- for (int i = 0; i < expectedCount; i++) {
- newlyCreatedPods = kubApi.getSelectedPods(new Label[]{l});
+ for (int i = 0; i < expectedCount; i++) {
+ newlyCreatedPods = kubApi.getSelectedPods(new Label[]{l});
- if (LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
- LOG.debug("Pods Count: " + newlyCreatedPods.length + " for cluster: " + clusterId);
+ LOG.debug("Pods Count: " + newlyCreatedPods.length + " for cluster: " + clusterId);
+ }
+ if (newlyCreatedPods.length == expectedCount) {
+ break;
+ }
+ Thread.sleep(10000);
}
- if (newlyCreatedPods.length == expectedCount) {
- break;
+
+ if (newlyCreatedPods.length == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Pods are not created for cluster : %s, hence deleting the service", clusterId));
+ }
+ terminateAllContainers(clusterId);
+ return new MemberContext[0];
}
- Thread.sleep(10000);
- }
- if (newlyCreatedPods.length == 0) {
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Pods are not created for cluster : %s, hence deleting the service", clusterId));
- }
- terminateAllContainers(clusterId);
- return new MemberContext[0];
- }
- if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Pods created : %s for cluster : %s", newlyCreatedPods.length, clusterId));
+ }
- LOG.debug(String.format("Pods created : %s for cluster : %s", newlyCreatedPods.length, clusterId));
- }
+ List<MemberContext> memberContexts = new ArrayList<MemberContext>();
- List<MemberContext> memberContexts = new ArrayList<MemberContext>();
+ PodToMemberContext podToMemberContextFunc = new PodToMemberContext();
+ // generate Member Contexts
+ for (Pod pod : newlyCreatedPods) {
+ MemberContext context = podToMemberContextFunc.apply(pod);
+ context.setCartridgeType(cartridgeType);
+ context.setClusterId(clusterId);
- PodToMemberContext podToMemberContextFunc = new PodToMemberContext();
- // generate Member Contexts
- for (Pod pod : newlyCreatedPods) {
- MemberContext context = podToMemberContextFunc.apply(pod);
- context.setCartridgeType(cartridgeType);
- context.setClusterId(clusterId);
+ context.setProperties(CloudControllerUtil.addProperty(context
+ .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
+ String.valueOf(service.getPort())));
- context.setProperties(CloudControllerUtil.addProperty(context
- .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
- String.valueOf(service.getPort())));
+ CloudControllerContext.getInstance().addMemberContext(context);
- cloudControllerContext.addMemberContext(context);
+ // wait till Pod status turns to running and send member spawned.
+ ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cloud Controller is starting the instance start up thread.");
+ }
+ CloudControllerContext.getInstance().addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
- // wait till Pod status turns to running and send member spawned.
- ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cloud Controller is starting the instance start up thread.");
+ memberContexts.add(context);
}
- cloudControllerContext.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
-
- memberContexts.add(context);
- }
- // persist in registry
- persist();
+ // persist in registry
+ persist();
- LOG.info("Kubernetes entities are successfully starting up: " + memberContexts);
+ LOG.info("Kubernetes entities are successfully starting up: " + memberContexts);
- return memberContexts.toArray(new MemberContext[0]);
+ return memberContexts.toArray(new MemberContext[0]);
- } catch (Exception e) {
- String msg = "Failed to start an instance. " + containerClusterContext.toString() + " Cause: " + e.getMessage();
- LOG.error(msg, e);
- throw new IllegalStateException(msg, e);
+ } catch (Exception e) {
+ String msg = "Failed to start an instance. " + containerClusterContext.toString() + " Cause: " + e.getMessage();
+ LOG.error(msg, e);
+ throw new IllegalStateException(msg, e);
+ }
+ } finally {
+ if(lock != null) {
+ CloudControllerContext.getInstance().releaseWriteLock(lock);
+ }
}
}
@@ -1700,18 +1728,18 @@ public class CloudControllerServiceImpl implements CloudControllerService {
String kubernetesClusterId, String kubernetesMasterIp,
String kubernetesPortRange) {
- KubernetesClusterContext origCtxt = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId);
+ KubernetesClusterContext origCtxt = CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId);
KubernetesClusterContext newCtxt = new KubernetesClusterContext(kubernetesClusterId, kubernetesPortRange, kubernetesMasterIp);
if (origCtxt == null) {
- cloudControllerContext.addKubernetesClusterContext(newCtxt);
+ CloudControllerContext.getInstance().addKubernetesClusterContext(newCtxt);
return newCtxt;
}
if (!origCtxt.equals(newCtxt)) {
// if for some reason master IP etc. have changed
newCtxt.setAvailableHostPorts(origCtxt.getAvailableHostPorts());
- cloudControllerContext.addKubernetesClusterContext(newCtxt);
+ CloudControllerContext.getInstance().addKubernetesClusterContext(newCtxt);
return newCtxt;
} else {
return origCtxt;
@@ -1721,228 +1749,242 @@ public class CloudControllerServiceImpl implements CloudControllerService {
@Override
public MemberContext[] terminateAllContainers(String clusterId)
throws InvalidClusterException {
+ Lock lock = null;
+ try {
+ lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock();
- ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
- handleNullObject(ctxt, "Kubernetes units temrination failed. Invalid cluster id. " + clusterId);
-
- String kubernetesClusterId = CloudControllerUtil.getProperty(ctxt.getProperties(),
- StratosConstants.KUBERNETES_CLUSTER_ID);
- handleNullObject(kubernetesClusterId, "Kubernetes units termination failed. Cannot find '" +
- StratosConstants.KUBERNETES_CLUSTER_ID + "'. " + ctxt);
+ ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
+ handleNullObject(ctxt, "Kubernetes units temrination failed. Invalid cluster id. " + clusterId);
- KubernetesClusterContext kubClusterContext = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId);
- handleNullObject(kubClusterContext, "Kubernetes units termination failed. Cannot find a matching Kubernetes Cluster for cluster id: "
- + kubernetesClusterId);
+ String kubernetesClusterId = CloudControllerUtil.getProperty(ctxt.getProperties(),
+ StratosConstants.KUBERNETES_CLUSTER_ID);
+ handleNullObject(kubernetesClusterId, "Kubernetes units termination failed. Cannot find '" +
+ StratosConstants.KUBERNETES_CLUSTER_ID + "'. " + ctxt);
- KubernetesApiClient kubApi = kubClusterContext.getKubApi();
- // delete the service
- try {
- kubApi.deleteService(CloudControllerUtil.getCompatibleId(clusterId));
- } catch (KubernetesClientException e) {
- // we're not going to throw this error, but proceed with other deletions
- LOG.error("Failed to delete Kubernetes service with id: " + clusterId, e);
- }
+ KubernetesClusterContext kubClusterContext = CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId);
+ handleNullObject(kubClusterContext, "Kubernetes units termination failed. Cannot find a matching Kubernetes Cluster for cluster id: "
+ + kubernetesClusterId);
- // set replicas=0 for the replication controller
- try {
- kubApi.updateReplicationController(clusterId, 0);
- } catch (KubernetesClientException e) {
- // we're not going to throw this error, but proceed with other deletions
- LOG.error("Failed to update Kubernetes Controller with id: " + clusterId, e);
- }
+ KubernetesApiClient kubApi = kubClusterContext.getKubApi();
+ // delete the service
+ try {
+ kubApi.deleteService(CloudControllerUtil.getCompatibleId(clusterId));
+ } catch (KubernetesClientException e) {
+ // we're not going to throw this error, but proceed with other deletions
+ LOG.error("Failed to delete Kubernetes service with id: " + clusterId, e);
+ }
- // delete pods forcefully
- try {
- // create a label query
- Label l = new Label();
- l.setName(clusterId);
- // execute the label query
- Pod[] pods = kubApi.getSelectedPods(new Label[]{l});
+ // set replicas=0 for the replication controller
+ try {
+ kubApi.updateReplicationController(clusterId, 0);
+ } catch (KubernetesClientException e) {
+ // we're not going to throw this error, but proceed with other deletions
+ LOG.error("Failed to update Kubernetes Controller with id: " + clusterId, e);
+ }
- for (Pod pod : pods) {
- try {
- // delete pods forcefully
- kubApi.deletePod(pod.getId());
- } catch (KubernetesClientException ignore) {
- // we can't do nothing here
- LOG.warn(String.format("Failed to delete Pod [%s] forcefully!", pod.getId()));
+ // delete pods forcefully
+ try {
+ // create a label query
+ Label l = new Label();
+ l.setName(clusterId);
+ // execute the label query
+ Pod[] pods = kubApi.getSelectedPods(new Label[]{l});
+
+ for (Pod pod : pods) {
+ try {
+ // delete pods forcefully
+ kubApi.deletePod(pod.getId());
+ } catch (KubernetesClientException ignore) {
+ // we can't do nothing here
+ LOG.warn(String.format("Failed to delete Pod [%s] forcefully!", pod.getId()));
+ }
}
+ } catch (KubernetesClientException e) {
+ // we're not going to throw this error, but proceed with other deletions
+ LOG.error("Failed to delete pods forcefully for cluster: " + clusterId, e);
}
- } catch (KubernetesClientException e) {
- // we're not going to throw this error, but proceed with other deletions
- LOG.error("Failed to delete pods forcefully for cluster: " + clusterId, e);
- }
-
- // delete the replication controller.
- try {
- kubApi.deleteReplicationController(clusterId);
- } catch (KubernetesClientException e) {
- String msg = "Failed to delete Kubernetes Controller with id: " + clusterId;
- LOG.error(msg, e);
- throw new InvalidClusterException(msg, e);
- }
- String allocatedPort = CloudControllerUtil.getProperty(ctxt.getProperties(),
- StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
+ // delete the replication controller.
+ try {
+ kubApi.deleteReplicationController(clusterId);
+ } catch (KubernetesClientException e) {
+ String msg = "Failed to delete Kubernetes Controller with id: " + clusterId;
+ LOG.error(msg, e);
+ throw new InvalidClusterException(msg, e);
+ }
- if (allocatedPort != null) {
- kubClusterContext.deallocateHostPort(Integer
- .parseInt(allocatedPort));
- } else {
- LOG.warn("Host port dealloacation failed due to a missing property: "
- + StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
- }
+ String allocatedPort = CloudControllerUtil.getProperty(ctxt.getProperties(),
+ StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
- List<MemberContext> membersToBeRemoved = cloudControllerContext.getMemberContextsOfClusterId(clusterId);
+ if (allocatedPort != null) {
+ kubClusterContext.deallocateHostPort(Integer
+ .parseInt(allocatedPort));
+ } else {
+ LOG.warn("Host port dealloacation failed due to a missing property: "
+ + StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
+ }
- for (MemberContext memberContext : membersToBeRemoved) {
- logTermination(memberContext);
- }
+ List<MemberContext> membersToBeRemoved = CloudControllerContext.getInstance().getMemberContextsOfClusterId(clusterId);
- // persist
- persist();
+ for (MemberContext memberContext : membersToBeRemoved) {
+ logTermination(memberContext);
+ }
- return membersToBeRemoved.toArray(new MemberContext[0]);
+ // persist
+ persist();
+ return membersToBeRemoved.toArray(new MemberContext[0]);
+ } finally {
+ if(lock != null) {
+ CloudControllerContext.getInstance().releaseWriteLock(lock);
+ }
+ }
}
@Override
public MemberContext[] updateContainers(String clusterId, int replicas)
throws UnregisteredCartridgeException {
+ Lock lock = null;
+ try {
+ lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock();
- if (LOG.isDebugEnabled()) {
- LOG.debug("CloudControllerServiceImpl:updateContainers for cluster : " + clusterId);
- }
-
- ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId);
- handleNullObject(ctxt, "Container update failed. Invalid cluster id. " + clusterId);
-
- String cartridgeType = ctxt.getCartridgeType();
-
- Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CloudControllerServiceImpl:updateContainers for cluster : " + clusterId);
+ }
- if (cartridge == null) {
- String msg =
- "Container update failed. No matching Cartridge found [type] " + cartridgeType
- + ". [cluster id] " + clusterId;
- LOG.error(msg);
- throw new UnregisteredCartridgeException(msg);
- }
+ ClusterContext ctxt = CloudControllerContext.getInstance().getClusterContext(clusterId);
+ handleNullObject(ctxt, "Container update failed. Invalid cluster id. " + clusterId);
- try {
- String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
+ String cartridgeType = ctxt.getCartridgeType();
- KubernetesClusterContext kubClusterContext = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId);
+ Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
- if (kubClusterContext == null) {
+ if (cartridge == null) {
String msg =
- "Instance start-up failed. No matching Kubernetes Context Found for [id] " + kubernetesClusterId
+ "Container update failed. No matching Cartridge found [type] " + cartridgeType
+ ". [cluster id] " + clusterId;
LOG.error(msg);
throw new UnregisteredCartridgeException(msg);
}
- KubernetesApiClient kubApi = kubClusterContext.getKubApi();
- // create a label query
- Label l = new Label();
- l.setName(clusterId);
+ try {
+ String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
- // get the current pods - useful when scale down
- Pod[] previousStatePods = kubApi.getSelectedPods(new Label[]{l});
+ KubernetesClusterContext kubClusterContext = CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId);
- // update the replication controller - cluster id = replication controller id
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cloud Controller is delegating request to update a replication controller " + clusterId +
- " to Kubernetes layer.");
- }
+ if (kubClusterContext == null) {
+ String msg =
+ "Instance start-up failed. No matching Kubernetes Context Found for [id] " + kubernetesClusterId
+ + ". [cluster id] " + clusterId;
+ LOG.error(msg);
+ throw new UnregisteredCartridgeException(msg);
+ }
- kubApi.updateReplicationController(clusterId, replicas);
+ KubernetesApiClient kubApi = kubClusterContext.getKubApi();
+ // create a label query
+ Label l = new Label();
+ l.setName(clusterId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cloud Controller successfully updated the controller "
- + clusterId + " via Kubernetes layer.");
- }
+ // get the current pods - useful when scale down
+ Pod[] previousStatePods = kubApi.getSelectedPods(new Label[]{l});
- // execute the label query
- Pod[] allPods = new Pod[0];
+ // update the replication controller - cluster id = replication controller id
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cloud Controller is delegating request to update a replication controller " + clusterId +
+ " to Kubernetes layer.");
+ }
- // wait replicas*5s time in the worst case ; best case = 0s
- for (int i = 0; i < (replicas * previousStatePods.length + 1); i++) {
- allPods = kubApi.getSelectedPods(new Label[]{l});
+ kubApi.updateReplicationController(clusterId, replicas);
if (LOG.isDebugEnabled()) {
-
- LOG.debug("Pods Count: " + allPods.length + " for cluster: " + clusterId);
+ LOG.debug("Cloud Controller successfully updated the controller "
+ + clusterId + " via Kubernetes layer.");
}
- if (allPods.length == replicas) {
- break;
- }
- Thread.sleep(10000);
- }
- if (LOG.isDebugEnabled()) {
+ // execute the label query
+ Pod[] allPods = new Pod[0];
- LOG.debug(String.format("Pods created : %s for cluster : %s", allPods.length, clusterId));
- }
+ // wait replicas*5s time in the worst case ; best case = 0s
+ for (int i = 0; i < (replicas * previousStatePods.length + 1); i++) {
+ allPods = kubApi.getSelectedPods(new Label[]{l});
- List<MemberContext> memberContexts = new ArrayList<MemberContext>();
+ if (LOG.isDebugEnabled()) {
- PodToMemberContext podToMemberContextFunc = new PodToMemberContext();
- // generate Member Contexts
- for (Pod pod : allPods) {
- MemberContext context;
- // if member context does not exist -> a new member (scale up)
- if ((context = cloudControllerContext.getMemberContextOfMemberId(pod.getId())) == null) {
+ LOG.debug("Pods Count: " + allPods.length + " for cluster: " + clusterId);
+ }
+ if (allPods.length == replicas) {
+ break;
+ }
+ Thread.sleep(10000);
+ }
- context = podToMemberContextFunc.apply(pod);
- context.setCartridgeType(cartridgeType);
- context.setClusterId(clusterId);
+ if (LOG.isDebugEnabled()) {
- context.setProperties(CloudControllerUtil.addProperty(context
- .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
- CloudControllerUtil.getProperty(ctxt.getProperties(),
- StratosConstants.ALLOCATED_SERVICE_HOST_PORT)));
+ LOG.debug(String.format("Pods created : %s for cluster : %s", allPods.length, clusterId));
+ }
- // wait till Pod status turns to running and send member spawned.
- ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cloud Controller is starting the instance start up thread.");
- }
- cloudControllerContext.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
+ List<MemberContext> memberContexts = new ArrayList<MemberContext>();
- memberContexts.add(context);
+ PodToMemberContext podToMemberContextFunc = new PodToMemberContext();
+ // generate Member Contexts
+ for (Pod pod : allPods) {
+ MemberContext context;
+ // if member context does not exist -> a new member (scale up)
+ if ((context = CloudControllerContext.getInstance().getMemberContextOfMemberId(pod.getId())) == null) {
- }
- // publish data
- // TODO
-// CartridgeInstanceDataPublisher.publish(context.getMemberId(), null, null, context.getClusterId(), cartridgeType, MemberStatus.Created.toString(), node);
+ context = podToMemberContextFunc.apply(pod);
+ context.setCartridgeType(cartridgeType);
+ context.setClusterId(clusterId);
- }
+ context.setProperties(CloudControllerUtil.addProperty(context
+ .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
+ CloudControllerUtil.getProperty(ctxt.getProperties(),
+ StratosConstants.ALLOCATED_SERVICE_HOST_PORT)));
+
+ // wait till Pod status turns to running and send member spawned.
+ ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cloud Controller is starting the instance start up thread.");
+ }
+ CloudControllerContext.getInstance().addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
- if (memberContexts.isEmpty()) {
- // terminated members
- @SuppressWarnings("unchecked")
- List<Pod> difference = ListUtils.subtract(Arrays.asList(previousStatePods), Arrays.asList(allPods));
- for (Pod pod : difference) {
- if (pod != null) {
- MemberContext context = cloudControllerContext.getMemberContextOfMemberId(pod.getId());
- logTermination(context);
memberContexts.add(context);
+
}
+ // publish data
+ // TODO
+// CartridgeInstanceDataPublisher.publish(context.getMemberId(), null, null, context.getClusterId(), cartridgeType, MemberStatus.Created.toString(), node);
+
}
- }
+ if (memberContexts.isEmpty()) {
+ // terminated members
+ @SuppressWarnings("unchecked")
+ List<Pod> difference = ListUtils.subtract(Arrays.asList(previousStatePods), Arrays.asList(allPods));
+ for (Pod pod : difference) {
+ if (pod != null) {
+ MemberContext context = CloudControllerContext.getInstance().getMe
<TRUNCATED>
[2/3] stratos git commit: Adding distributed locks to cloud
controller service methods
Posted by im...@apache.org.
Adding distributed locks to cloud controller service methods
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/71fab2b4
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/71fab2b4
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/71fab2b4
Branch: refs/heads/master
Commit: 71fab2b440d40b82f80407ce6481b36259a26e1b
Parents: e8914f3
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Dec 4 00:14:06 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 4 00:14:40 2014 +0530
----------------------------------------------------------------------
.../context/CloudControllerContext.java | 164 +-
.../impl/CloudControllerServiceImpl.java | 1642 +++++++++---------
2 files changed, 941 insertions(+), 865 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/71fab2b4/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index 2d4e195..53e7be4 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -37,10 +37,7 @@ import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -55,23 +52,23 @@ public class CloudControllerContext implements Serializable {
private static final long serialVersionUID = -2662307358852779897L;
private static final Log log = LogFactory.getLog(CloudControllerContext.class);
- private static final String CC_CLUSTER_ID_TO_MEMBER_CTX = "CC_CLUSTER_ID_TO_MEMBER_CTX";
+ private static final String CC_CLUSTER_ID_TO_MEMBER_CTX_MAP = "CC_CLUSTER_ID_TO_MEMBER_CTX_MAP";
private static final String CC_CLUSTER_ID_TO_CLUSTER_CTX = "CC_CLUSTER_ID_TO_CLUSTER_CTX";
- private static final String CC_MEMBER_ID_TO_MEMBER_CTX = "CC_MEMBER_ID_TO_MEMBER_CTX";
- private static final String CC_MEMBER_ID_TO_SCH_TASK = "CC_MEMBER_ID_TO_SCH_TASK";
+ private static final String CC_MEMBER_ID_TO_MEMBER_CTX_MAP = "CC_MEMBER_ID_TO_MEMBER_CTX_MAP";
+ private static final String CC_MEMBER_ID_TO_SCH_TASK_MAP = "CC_MEMBER_ID_TO_SCH_TASK_MAP";
private static final String CC_KUB_GROUP_ID_TO_GROUP_MAP = "CC_KUB_GROUP_ID_TO_GROUP_MAP";
- private static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX";
- private static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS = "CC_CARTRIDGE_TYPE_TO_PARTITION_IDS";
- private static final String CC_CARTRIDGES = "CC_CARTRIDGES";
- private static final String CC_SERVICE_GROUPS = "CC_SERVICE_GROUPS";
-
- private static final String CC_CLUSTER_CTX_LOCK = "CC_CLUSTER_ID_TO_MEMBER_CTX_LOCK";
- private static final String CC_MEMBER_CTX_LOCK = "CC_MEMBER_ID_TO_MEMBER_CTX_LOCK";
- private static final String CC_SCH_TASK_LOCK = "CC_MEMBER_ID_TO_SCH_TASK_LOCK";
- private static final String CC_KUB_GROUP_LOCK = "CC_KUB_GROUP_ID_TO_GROUP_MAP_LOCK";
- private static final String CC_KUB_CLUSTER_CTX_LOCK = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX_LOCK";
- private static final String CC_CARTRIDGES_LOCK = "CC_CARTRIDGES_LOCK";
- private static final String CC_SERVICE_GROUPS_LOCK = "CC_SERVICE_GROUPS_LOCK";
+ private static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX_MAP = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX_MAP";
+ private static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS_MAP = "CC_CARTRIDGE_TYPE_TO_PARTITION_IDS_MAP";
+ private static final String CC_CARTRIDGE_TYPE_TO_CARTRIDGES_MAP = "CC_CARTRIDGE_TYPE_TO_CARTRIDGES_MAP";
+ private static final String CC_SERVICE_GROUP_NAME_TO_SERVICE_GROUP_MAP = "CC_SERVICE_GROUP_NAME_TO_SERVICE_GROUP_MAP";
+
+ private static final String CC_CLUSTER_CTX_WRITE_LOCK = "CC_CLUSTER_CTX_WRITE_LOCK";
+ private static final String CC_MEMBER_CTX_WRITE_LOCK = "CC_MEMBER_CTX_WRITE_LOCK";
+ private static final String CC_SCH_TASK_WRITE_LOCK = "CC_SCH_TASK_WRITE_LOCK";
+ private static final String CC_KUB_GROUP_WRITE_LOCK = "CC_KUB_GROUP_WRITE_LOCK";
+ private static final String CC_KUB_CLUSTER_CTX_WRITE_LOCK = "CC_KUB_CLUSTER_CTX_WRITE_LOCK";
+ private static final String CC_CARTRIDGES_WRITE_LOCK = "CC_CARTRIDGES_WRITE_LOCK";
+ private static final String CC_SERVICE_GROUPS_WRITE_LOCK = "CC_SERVICE_GROUPS_WRITE_LOCK";
private static volatile CloudControllerContext instance;
@@ -129,14 +126,18 @@ public class CloudControllerContext implements Serializable {
private transient ExecutorService executorService = Executors.newFixedThreadPool(20);
/**
- * List of registered {@link org.apache.stratos.cloud.controller.domain.Cartridge}s
+ * Map of registered {@link org.apache.stratos.cloud.controller.domain.Cartridge}s
+ * Key - cartridge type
+ * Value - cartridge
*/
- private List<Cartridge> cartridges;
+ private Map<String, Cartridge> cartridgeTypeToCartridgeMap;
/**
- * List of deployed service groups
+ * Map of deployed service groups
+ * Key - service group name
+ * Value service group
*/
- private List<ServiceGroup> serviceGroups;
+ private Map<String, ServiceGroup> serviceGroupNameToServiceGroupMap;
private String streamId;
private boolean isPublisherRunning;
@@ -157,14 +158,14 @@ public class CloudControllerContext implements Serializable {
// Initialize objects
kubernetesGroupsMap = distributedObjectProvider.getMap(CC_KUB_GROUP_ID_TO_GROUP_MAP);
- clusterIdToMemberContextListMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX);
- memberIdToMemberContextMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_MEMBER_CTX);
- memberIdToScheduledTaskMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_SCH_TASK);
- kubClusterIdToKubClusterContextMap = distributedObjectProvider.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX);
+ clusterIdToMemberContextListMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX_MAP);
+ memberIdToMemberContextMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_MEMBER_CTX_MAP);
+ memberIdToScheduledTaskMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_SCH_TASK_MAP);
+ kubClusterIdToKubClusterContextMap = distributedObjectProvider.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX_MAP);
clusterIdToContextMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_CLUSTER_CTX);
- cartridgeTypeToPartitionIdsMap = distributedObjectProvider.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS);
- cartridges = distributedObjectProvider.getList(CC_CARTRIDGES);
- serviceGroups = distributedObjectProvider.getList(CC_SERVICE_GROUPS);
+ cartridgeTypeToPartitionIdsMap = distributedObjectProvider.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS_MAP);
+ cartridgeTypeToCartridgeMap = distributedObjectProvider.getMap(CC_CARTRIDGE_TYPE_TO_CARTRIDGES_MAP);
+ serviceGroupNameToServiceGroupMap = distributedObjectProvider.getMap(CC_SERVICE_GROUP_NAME_TO_SERVICE_GROUP_MAP);
// Update context from the registry
updateContextFromRegistry();
@@ -181,115 +182,97 @@ public class CloudControllerContext implements Serializable {
return instance;
}
- public List<Cartridge> getCartridges() {
- return cartridges;
+ public java.util.Collection<Cartridge> getCartridges() {
+ return cartridgeTypeToCartridgeMap.values();
}
- public void setCartridges(List<Cartridge> cartridges) {
- this.cartridges = cartridges;
+ public void addCartridges(List<Cartridge> cartridges) {
+ for(Cartridge cartridge : cartridges) {
+ addCartridge(cartridge);
+ }
}
- public void setServiceGroups(List<ServiceGroup> serviceGroups) {
- this.serviceGroups = serviceGroups;
+ public void addServiceGroups(List<ServiceGroup> serviceGroups) {
+ for(ServiceGroup serviceGroup : serviceGroups) {
+ addServiceGroup(serviceGroup);
+ }
}
- public List<ServiceGroup> getServiceGroups() {
- return this.serviceGroups;
+ public Collection<ServiceGroup> getServiceGroups() {
+ return serviceGroupNameToServiceGroupMap.values();
}
public Cartridge getCartridge(String cartridgeType) {
- for (Cartridge cartridge : cartridges) {
- if (cartridge.getType().equals(cartridgeType)) {
- return cartridge;
- }
- }
- return null;
+ return cartridgeTypeToCartridgeMap.get(cartridgeType);
}
private Lock acquireWriteLock(String object) {
return distributedObjectProvider.acquireLock(object);
}
- private void releaseWriteLock(Lock lock) {
+ public void releaseWriteLock(Lock lock) {
distributedObjectProvider.releaseLock(lock);
}
- public Lock acquireKubernetesWriteLock() {
- return acquireWriteLock(CC_CLUSTER_CTX_LOCK);
+ public Lock acquireClusterContextWriteLock() {
+ return acquireWriteLock(CC_CLUSTER_CTX_WRITE_LOCK);
}
public Lock acquireMemberContextWriteLock() {
- return acquireWriteLock(CC_MEMBER_CTX_LOCK);
+ return acquireWriteLock(CC_MEMBER_CTX_WRITE_LOCK);
}
public Lock acquireScheduleTaskWriteLock() {
- return acquireWriteLock(CC_SCH_TASK_LOCK);
+ return acquireWriteLock(CC_SCH_TASK_WRITE_LOCK);
}
public Lock acquireKubernetesGroupWriteLock() {
- return acquireWriteLock(CC_KUB_GROUP_LOCK);
+ return acquireWriteLock(CC_KUB_GROUP_WRITE_LOCK);
}
public Lock acquireKubernetesClusterContextWriteLock() {
- return acquireWriteLock(CC_KUB_CLUSTER_CTX_LOCK);
+ return acquireWriteLock(CC_KUB_CLUSTER_CTX_WRITE_LOCK);
}
public Lock acquireCartridgesWriteLock() {
- return acquireWriteLock(CC_CARTRIDGES_LOCK);
+ return acquireWriteLock(CC_CARTRIDGES_WRITE_LOCK);
}
public Lock acquireServiceGroupsWriteLock() {
- return acquireWriteLock(CC_SERVICE_GROUPS_LOCK);
- }
-
- public void releaseKubernetesWriteLock(Lock lock) {
- releaseWriteLock(lock);
- }
-
- public void releaseMemberContextWriteLock(Lock lock) {
- releaseWriteLock(lock);
+ return acquireWriteLock(CC_SERVICE_GROUPS_WRITE_LOCK);
}
- public void releaseScheduleTaskWriteLock(Lock lock) {
- releaseWriteLock(lock);
+ public void addCartridge(Cartridge cartridge) {
+ cartridgeTypeToCartridgeMap.put(cartridge.getType(), cartridge);
}
- public void releaseKubernetesGroupWriteLock(Lock lock) {
- releaseWriteLock(lock);
- }
-
- public void releaseKubernetesClusterContextWriteLock(Lock lock) {
- releaseWriteLock(lock);
+ public void removeCartridge(Cartridge cartridge) {
+ if(cartridgeTypeToCartridgeMap.containsKey(cartridge.getType())) {
+ cartridgeTypeToCartridgeMap.remove(cartridge.getType());
+ }
}
- public void releaseCartridgesWriteLock(Lock lock) {
- releaseWriteLock(lock);
+ public void updateCartridge(Cartridge cartridge) {
+ cartridgeTypeToCartridgeMap.put(cartridge.getType(), cartridge);
}
- public void releaseServiceGroupsWriteLock(Lock lock) {
- releaseWriteLock(lock);
+ public ServiceGroup getServiceGroup(String name) {
+ return serviceGroupNameToServiceGroupMap.get(name);
}
- public void addCartridge(Cartridge newCartridges) {
- cartridges.add(newCartridges);
+ public void addServiceGroup(ServiceGroup serviceGroup) {
+ serviceGroupNameToServiceGroupMap.put(serviceGroup.getName(), serviceGroup);
}
- public ServiceGroup getServiceGroup(String name) {
- for (ServiceGroup serviceGroup : serviceGroups) {
- if (serviceGroup.getName().equals(name)) {
- return serviceGroup;
- }
+ public void removeServiceGroups(List<ServiceGroup> serviceGroups) {
+ for(ServiceGroup serviceGroup : serviceGroups) {
+ removeServiceGroup(serviceGroup);
}
- return null;
- }
-
- public void addServiceGroup(ServiceGroup newServiceGroup) {
- serviceGroups.add(newServiceGroup);
}
- public void removeServiceGroup(List<ServiceGroup> serviceGroup) {
- if (this.serviceGroups != null) {
- this.serviceGroups.removeAll(serviceGroup);
+ private void removeServiceGroup(ServiceGroup serviceGroup) {
+ if(serviceGroupNameToServiceGroupMap.containsKey(serviceGroup.getName())) {
+ serviceGroupNameToServiceGroupMap.remove(serviceGroup.getName());
}
}
@@ -623,9 +606,8 @@ public class CloudControllerContext implements Serializable {
copyMap(serializedObj.kubClusterIdToKubClusterContextMap, kubClusterIdToKubClusterContextMap);
copyMap(serializedObj.clusterIdToContextMap, clusterIdToContextMap);
copyMap(serializedObj.cartridgeTypeToPartitionIdsMap, cartridgeTypeToPartitionIdsMap);
-
- copyList(serializedObj.getCartridges(), cartridges);
- copyList(serializedObj.getServiceGroups(), serviceGroups);
+ copyMap(serializedObj.cartridgeTypeToCartridgeMap, cartridgeTypeToCartridgeMap);
+ copyMap(serializedObj.serviceGroupNameToServiceGroupMap, serviceGroupNameToServiceGroupMap);
if (log.isDebugEnabled()) {
log.debug("Cloud controller context is read from the registry");
[3/3] stratos git commit: Moving distributed locks handled by
distributed object provider and to functional methods
Posted by im...@apache.org.
Moving distributed locks handled by distributed object provider and to functional methods
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/e8914f3d
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/e8914f3d
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/e8914f3d
Branch: refs/heads/master
Commit: e8914f3d048866ab8913801aa702c6f234b41ce5
Parents: 08de729
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Dec 3 18:19:36 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 4 00:14:40 2014 +0530
----------------------------------------------------------------------
.../context/CloudControllerContext.java | 148 +++++++---
.../impl/CloudControllerServiceImpl.java | 19 +-
.../clustering/DistributedObjectProvider.java | 9 +-
.../impl/HazelcastDistributedListProvider.java | 268 +++++++++++++++++++
.../HazelcastDistributedObjectProvider.java | 219 ++++++++-------
.../clustering/impl/ListEntryListener.java | 37 +++
.../test/DistributedObjectProviderTest.java | 78 ++----
.../load/balancer/context/AlgorithmContext.java | 22 +-
.../context/map/AlgorithmContextMap.java | 16 +-
9 files changed, 598 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index af8741c..2d4e195 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -36,12 +36,8 @@ import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
-import com.google.common.net.InetAddresses;
-
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -49,6 +45,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.locks.Lock;
/**
* This object holds all runtime data and provides faster access. This is a Singleton class.
@@ -58,15 +55,23 @@ public class CloudControllerContext implements Serializable {
private static final long serialVersionUID = -2662307358852779897L;
private static final Log log = LogFactory.getLog(CloudControllerContext.class);
- public static final String KUB_GROUP_ID_TO_GROUP_MAP = "KUB_GROUP_ID_TO_GROUP_MAP";
- public static final String CC_CLUSTER_ID_TO_MEMBER_CTX = "CC_CLUSTER_ID_TO_MEMBER_CTX";
- public static final String CC_MEMBER_ID_TO_MEMBER_CTX = "CC_MEMBER_ID_TO_MEMBER_CTX";
- public static final String CC_MEMBER_ID_TO_SCH_TASK = "CC_MEMBER_ID_TO_SCH_TASK";
- public static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX";
- public static final String CC_CLUSTER_ID_TO_CLUSTER_CTX = "CC_CLUSTER_ID_TO_CLUSTER_CTX";
- public static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS = "CC_CARTRIDGE_TYPE_TO_PARTITION_IDS";
- public static final String CC_CARTRIDGES = "CC_CARTRIDGES";
- public static final String CC_SERVICE_GROUPS = "CC_SERVICE_GROUPS";
+ private static final String CC_CLUSTER_ID_TO_MEMBER_CTX = "CC_CLUSTER_ID_TO_MEMBER_CTX";
+ private static final String CC_CLUSTER_ID_TO_CLUSTER_CTX = "CC_CLUSTER_ID_TO_CLUSTER_CTX";
+ private static final String CC_MEMBER_ID_TO_MEMBER_CTX = "CC_MEMBER_ID_TO_MEMBER_CTX";
+ private static final String CC_MEMBER_ID_TO_SCH_TASK = "CC_MEMBER_ID_TO_SCH_TASK";
+ private static final String CC_KUB_GROUP_ID_TO_GROUP_MAP = "CC_KUB_GROUP_ID_TO_GROUP_MAP";
+ private static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX";
+ private static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS = "CC_CARTRIDGE_TYPE_TO_PARTITION_IDS";
+ private static final String CC_CARTRIDGES = "CC_CARTRIDGES";
+ private static final String CC_SERVICE_GROUPS = "CC_SERVICE_GROUPS";
+
+ private static final String CC_CLUSTER_CTX_LOCK = "CC_CLUSTER_ID_TO_MEMBER_CTX_LOCK";
+ private static final String CC_MEMBER_CTX_LOCK = "CC_MEMBER_ID_TO_MEMBER_CTX_LOCK";
+ private static final String CC_SCH_TASK_LOCK = "CC_MEMBER_ID_TO_SCH_TASK_LOCK";
+ private static final String CC_KUB_GROUP_LOCK = "CC_KUB_GROUP_ID_TO_GROUP_MAP_LOCK";
+ private static final String CC_KUB_CLUSTER_CTX_LOCK = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX_LOCK";
+ private static final String CC_CARTRIDGES_LOCK = "CC_CARTRIDGES_LOCK";
+ private static final String CC_SERVICE_GROUPS_LOCK = "CC_SERVICE_GROUPS_LOCK";
private static volatile CloudControllerContext instance;
@@ -151,7 +156,7 @@ public class CloudControllerContext implements Serializable {
distributedObjectProvider = ServiceReferenceHolder.getInstance().getDistributedObjectProvider();
// Initialize objects
- kubernetesGroupsMap = distributedObjectProvider.getMap(KUB_GROUP_ID_TO_GROUP_MAP);
+ kubernetesGroupsMap = distributedObjectProvider.getMap(CC_KUB_GROUP_ID_TO_GROUP_MAP);
clusterIdToMemberContextListMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX);
memberIdToMemberContextMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_MEMBER_CTX);
memberIdToScheduledTaskMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_SCH_TASK);
@@ -201,8 +206,72 @@ public class CloudControllerContext implements Serializable {
return null;
}
+ private Lock acquireWriteLock(String object) {
+ return distributedObjectProvider.acquireLock(object);
+ }
+
+ private void releaseWriteLock(Lock lock) {
+ distributedObjectProvider.releaseLock(lock);
+ }
+
+ public Lock acquireKubernetesWriteLock() {
+ return acquireWriteLock(CC_CLUSTER_CTX_LOCK);
+ }
+
+ public Lock acquireMemberContextWriteLock() {
+ return acquireWriteLock(CC_MEMBER_CTX_LOCK);
+ }
+
+ public Lock acquireScheduleTaskWriteLock() {
+ return acquireWriteLock(CC_SCH_TASK_LOCK);
+ }
+
+ public Lock acquireKubernetesGroupWriteLock() {
+ return acquireWriteLock(CC_KUB_GROUP_LOCK);
+ }
+
+ public Lock acquireKubernetesClusterContextWriteLock() {
+ return acquireWriteLock(CC_KUB_CLUSTER_CTX_LOCK);
+ }
+
+ public Lock acquireCartridgesWriteLock() {
+ return acquireWriteLock(CC_CARTRIDGES_LOCK);
+ }
+
+ public Lock acquireServiceGroupsWriteLock() {
+ return acquireWriteLock(CC_SERVICE_GROUPS_LOCK);
+ }
+
+ public void releaseKubernetesWriteLock(Lock lock) {
+ releaseWriteLock(lock);
+ }
+
+ public void releaseMemberContextWriteLock(Lock lock) {
+ releaseWriteLock(lock);
+ }
+
+ public void releaseScheduleTaskWriteLock(Lock lock) {
+ releaseWriteLock(lock);
+ }
+
+ public void releaseKubernetesGroupWriteLock(Lock lock) {
+ releaseWriteLock(lock);
+ }
+
+ public void releaseKubernetesClusterContextWriteLock(Lock lock) {
+ releaseWriteLock(lock);
+ }
+
+ public void releaseCartridgesWriteLock(Lock lock) {
+ releaseWriteLock(lock);
+ }
+
+ public void releaseServiceGroupsWriteLock(Lock lock) {
+ releaseWriteLock(lock);
+ }
+
public void addCartridge(Cartridge newCartridges) {
- distributedObjectProvider.addToList(cartridges, newCartridges);
+ cartridges.add(newCartridges);
}
public ServiceGroup getServiceGroup(String name) {
@@ -215,7 +284,7 @@ public class CloudControllerContext implements Serializable {
}
public void addServiceGroup(ServiceGroup newServiceGroup) {
- distributedObjectProvider.addToList(serviceGroups, newServiceGroup);
+ serviceGroups.add(newServiceGroup);
}
public void removeServiceGroup(List<ServiceGroup> serviceGroup) {
@@ -257,38 +326,37 @@ public class CloudControllerContext implements Serializable {
}
public void addMemberContext(MemberContext memberContext) {
- distributedObjectProvider.putToMap(memberIdToMemberContextMap, memberContext.getMemberId(), memberContext);
+ memberIdToMemberContextMap.put(memberContext.getMemberId(), memberContext);
List<MemberContext> memberContextList;
if ((memberContextList = clusterIdToMemberContextListMap.get(memberContext.getClusterId())) == null) {
memberContextList = new ArrayList<MemberContext>();
}
if (memberContextList.contains(memberContext)) {
- distributedObjectProvider.removeFromList(memberContextList,memberContext);
+ memberContextList.remove(memberContext);
}
- distributedObjectProvider.addToList(memberContextList, memberContext);
- distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, memberContext.getClusterId(),
- memberContextList);
+ memberContextList.add(memberContext);
+ clusterIdToMemberContextListMap.put(memberContext.getClusterId(), memberContextList);
if (log.isDebugEnabled()) {
log.debug("Added member context to the cloud controller context: " + memberContext);
}
}
public void addScheduledFutureJob(String memberId, ScheduledFuture<?> job) {
- distributedObjectProvider.putToMap(memberIdToScheduledTaskMap, memberId, job);
+ memberIdToScheduledTaskMap.put(memberId, job);
}
public List<MemberContext> removeMemberContextsOfCluster(String clusterId) {
List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId);
- distributedObjectProvider.removeFromMap(clusterIdToMemberContextListMap, clusterId);
+ clusterIdToMemberContextListMap.remove(clusterId);
if (memberContextList == null) {
return new ArrayList<MemberContext>();
}
for (MemberContext memberContext : memberContextList) {
String memberId = memberContext.getMemberId();
- distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId);
+ memberIdToMemberContextMap.remove(memberId);
ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
- distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId);
+ memberIdToScheduledTaskMap.remove(memberId);
stopTask(task);
if (log.isDebugEnabled()) {
@@ -301,7 +369,7 @@ public class CloudControllerContext implements Serializable {
public MemberContext removeMemberContext(String memberId, String clusterId) {
MemberContext removedMemberContext = memberIdToMemberContextMap.get(memberId);
- distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId);
+ memberIdToMemberContextMap.remove(memberId);
List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId);
if (memberContextList != null) {
@@ -315,10 +383,10 @@ public class CloudControllerContext implements Serializable {
iterator.remove();
}
}
- distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, clusterId, newCtxts);
+ clusterIdToMemberContextListMap.put(clusterId, newCtxts);
}
ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
- distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId);
+ memberIdToScheduledTaskMap.remove(memberId);
stopTask(task);
return removedMemberContext;
}
@@ -339,7 +407,7 @@ public class CloudControllerContext implements Serializable {
}
public void addClusterContext(ClusterContext ctxt) {
- distributedObjectProvider.putToMap(clusterIdToContextMap, ctxt.getClusterId(), ctxt);
+ clusterIdToContextMap.put(ctxt.getClusterId(), ctxt);
}
public ClusterContext getClusterContext(String clusterId) {
@@ -348,7 +416,7 @@ public class CloudControllerContext implements Serializable {
public ClusterContext removeClusterContext(String clusterId) {
ClusterContext removed = clusterIdToContextMap.get(clusterId);
- distributedObjectProvider.removeFromMap(clusterIdToContextMap, clusterId);
+ clusterIdToContextMap.remove(clusterId);
return removed;
}
@@ -366,11 +434,11 @@ public class CloudControllerContext implements Serializable {
list = new ArrayList<String>();
}
list.add(partitionId);
- distributedObjectProvider.putToMap(cartridgeTypeToPartitionIdsMap, cartridgeType, list);
+ cartridgeTypeToPartitionIdsMap.put(cartridgeType, list);
}
public void removeFromCartridgeTypeToPartitionIds(String cartridgeType) {
- distributedObjectProvider.removeFromMap(cartridgeTypeToPartitionIdsMap, cartridgeType);
+ cartridgeTypeToPartitionIdsMap.remove(cartridgeType);
}
public KubernetesClusterContext getKubernetesClusterContext(String kubClusterId) {
@@ -378,7 +446,7 @@ public class CloudControllerContext implements Serializable {
}
public void addKubernetesClusterContext(KubernetesClusterContext kubernetesClusterContext) {
- distributedObjectProvider.putToMap(kubClusterIdToKubClusterContextMap,
+ kubClusterIdToKubClusterContextMap.put(
kubernetesClusterContext.getKubernetesClusterId(),
kubernetesClusterContext);
}
@@ -388,7 +456,7 @@ public class CloudControllerContext implements Serializable {
*/
public synchronized void removeKubernetesGroup(String kubernetesGroupId) {
// Remove entry from information model
- distributedObjectProvider.removeFromMap(kubernetesGroupsMap, kubernetesGroupId);
+ kubernetesGroupsMap.remove(kubernetesGroupId);
}
/**
@@ -435,8 +503,12 @@ public class CloudControllerContext implements Serializable {
}
}
- public void addKubernetesGroupToInformationModel(KubernetesGroup kubernetesGroup) {
- distributedObjectProvider.putToMap(kubernetesGroupsMap, kubernetesGroup.getGroupId(), kubernetesGroup);
+ public void addKubernetesGroup(KubernetesGroup kubernetesGroup) {
+ kubernetesGroupsMap.put(kubernetesGroup.getGroupId(), kubernetesGroup);
+ }
+
+ public void updateKubernetesGroup(KubernetesGroup kubernetesGroup) {
+ kubernetesGroupsMap.put(kubernetesGroup.getGroupId(), kubernetesGroup);
}
public boolean kubernetesGroupExists(KubernetesGroup kubernetesGroup) {
@@ -574,13 +646,13 @@ public class CloudControllerContext implements Serializable {
private void copyMap(Map sourceMap, Map destinationMap) {
for(Object key : sourceMap.keySet()) {
- distributedObjectProvider.putToMap(destinationMap, key, sourceMap.get(key));
+ destinationMap.put(key, sourceMap.get(key));
}
}
private void copyList(List sourceList, List destinationList) {
for(Object item : sourceList) {
- distributedObjectProvider.addToList(destinationList, item);
+ destinationList.add(item);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 4843565..548b743 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -73,6 +73,7 @@ import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
/**
* Cloud Controller Service is responsible for starting up new server instances,
@@ -2087,20 +2088,24 @@ public class CloudControllerServiceImpl implements CloudControllerService {
LOG.info("Deploying new Kubernetes group: " + kubernetesGroup);
}
CloudControllerUtil.validateKubernetesGroup(kubernetesGroup);
+ Lock lock = null;
try {
+ lock = cloudControllerContext.acquireKubernetesGroupWriteLock();
// Add to information model
- cloudControllerContext.addKubernetesGroupToInformationModel(kubernetesGroup);
-
+ cloudControllerContext.addKubernetesGroup(kubernetesGroup);
persist();
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Kubernetes group deployed successfully: [id] %s, [description] %s",
kubernetesGroup.getGroupId(), kubernetesGroup.getDescription()));
}
-
return true;
} catch (Exception e) {
throw new InvalidKubernetesGroupException(e.getMessage(), e);
+ } finally {
+ if(lock != null) {
+ cloudControllerContext.releaseKubernetesGroupWriteLock(lock);
+ }
}
}
@@ -2117,7 +2122,9 @@ public class CloudControllerServiceImpl implements CloudControllerService {
LOG.info("Deploying new Kubernetes Host: " + kubernetesHost + " for Kubernetes group id: " + kubernetesGroupId);
}
CloudControllerUtil.validateKubernetesHost(kubernetesHost);
+ Lock lock = null;
try {
+ lock = CloudControllerContext.getInstance().acquireKubernetesGroupWriteLock();
KubernetesGroup kubernetesGroupStored = getKubernetesGroup(kubernetesGroupId);
ArrayList<KubernetesHost> kubernetesHostArrayList;
@@ -2134,7 +2141,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
// Update information model
kubernetesGroupStored.setKubernetesHosts(kubernetesHostArrayList.toArray(new KubernetesHost[kubernetesHostArrayList.size()]));
-
+ CloudControllerContext.getInstance().updateKubernetesGroup(kubernetesGroupStored);
persist();
if (LOG.isInfoEnabled()) {
@@ -2144,6 +2151,10 @@ public class CloudControllerServiceImpl implements CloudControllerService {
return true;
} catch (Exception e) {
throw new InvalidKubernetesHostException(e.getMessage(), e);
+ } finally {
+ if(lock != null) {
+ cloudControllerContext.releaseKubernetesGroupWriteLock(lock);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
index 2fd471b..7e7d130 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
@@ -22,6 +22,7 @@ package org.apache.stratos.common.clustering;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
/**
* Distributed object provider service interface.
@@ -31,11 +32,7 @@ public interface DistributedObjectProvider extends Serializable {
List getList(String name);
- void putToMap(Map map, Object key, Object value);
+ Lock acquireLock(Object object);
- void removeFromMap(Map map, Object key);
-
- void addToList(List list, Object value);
-
- void removeFromList(List list, Object value);
+ void releaseLock(Lock lock);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java
new file mode 100644
index 0000000..537cea9
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.clustering.impl;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IList;
+import com.hazelcast.core.ItemEvent;
+import com.hazelcast.core.ItemListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+
+/**
+ * Hazelcast distributed list provider.
+ */
+public class HazelcastDistributedListProvider {
+ private static final Log log = LogFactory.getLog(HazelcastDistributedListProvider.class);
+
+ private HazelcastInstance hazelcastInstance;
+ private Map<String, DistList> listMap;
+
+ public HazelcastDistributedListProvider(HazelcastInstance hazelcastInstance) {
+ this.hazelcastInstance = hazelcastInstance;
+ }
+
+ public List getList(String name, ListEntryListener listEntryListener) {
+ List list = listMap.get(name);
+ if(list == null) {
+ synchronized (HazelcastDistributedListProvider.class) {
+ if(list == null) {
+ list = new DistList(name, listEntryListener);
+ }
+ }
+ }
+ return list;
+ }
+
+ public void removeList(String name) {
+ DistList list = listMap.get(name);
+ if(list != null) {
+ IList ilist = (IList) list;
+ ilist.removeItemListener(list.getListenerId());
+ listMap.remove(list);
+ ilist.destroy();
+ }
+ }
+
+ private class DistList implements List {
+ private IList list;
+ private String listenerId;
+
+ public DistList(String name, final ListEntryListener listEntryListener) {
+ this.list = hazelcastInstance.getList(name);
+ listenerId = list.addItemListener(new ItemListener() {
+ @Override
+ public void itemAdded(ItemEvent itemEvent) {
+ listEntryListener.itemAdded(itemEvent.getItem());
+ }
+
+ @Override
+ public void itemRemoved(ItemEvent itemEvent) {
+ listEntryListener.itemRemoved(itemEvent.getItem());
+ }
+ }, false);
+ }
+
+ @Override
+ public int size() {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.size();
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.isEmpty();
+ }
+ return true;
+ }
+
+ @Override
+ public boolean contains(Object object) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.contains(object);
+ }
+ return false;
+ }
+
+ @Override
+ public Iterator iterator() {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.iterator();
+ }
+ return null;
+ }
+
+ @Override
+ public Object[] toArray() {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.toArray();
+ }
+ return new Object[0];
+ }
+
+ @Override
+ public boolean add(Object object) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.add(object);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean remove(Object object) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.remove(object);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean addAll(Collection collection) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.addAll(collection);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean addAll(int i, Collection collection) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.addAll(i, collection);
+ }
+ return false;
+ }
+
+ @Override
+ public void clear() {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ list.clear();
+ }
+ }
+
+ @Override
+ public Object get(int i) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.get(i);
+ }
+ return null;
+ }
+
+ @Override
+ public Object set(int i, Object o) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.set(i, o);
+ }
+ return null;
+ }
+
+ @Override
+ public void add(int i, Object o) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ list.add(i, o);
+ }
+ }
+
+ @Override
+ public Object remove(int i) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.remove(i);
+ }
+ return null;
+ }
+
+ @Override
+ public int indexOf(Object o) {
+ return list.indexOf(o);
+ }
+
+ @Override
+ public int lastIndexOf(Object o) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.lastIndexOf(o);
+ }
+ return -1;
+ }
+
+ @Override
+ public ListIterator listIterator() {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.listIterator();
+ }
+ return null;
+ }
+
+ @Override
+ public ListIterator listIterator(int i) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.listIterator(i);
+ }
+ return null;
+ }
+
+ @Override
+ public List subList(int i, int i2) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.subList(i, i2);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean retainAll(Collection collection) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.retainAll(collection);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean removeAll(Collection collection) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.removeAll(collection);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean containsAll(Collection collection) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.containsAll(collection);
+ }
+ return false;
+ }
+
+ @Override
+ public Object[] toArray(Object[] objects) {
+ if (hazelcastInstance.getLifecycleService().isRunning()) {
+ return list.toArray(objects);
+ }
+ return null;
+ }
+
+ public String getListenerId() {
+ return listenerId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
index 55d765e..e5dab09 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
@@ -20,19 +20,22 @@
package org.apache.stratos.common.clustering.impl;
import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IList;
import com.hazelcast.core.ILock;
-import com.hazelcast.core.IMap;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.clustering.DistributedObjectProvider;
import org.apache.stratos.common.internal.ServiceReferenceHolder;
+import org.wso2.carbon.caching.impl.MapEntryListener;
+import org.wso2.carbon.core.clustering.hazelcast.HazelcastDistributedMapProvider;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Provides objects to be managed in distributed and non-distributed environments.
@@ -40,46 +43,15 @@ import java.util.concurrent.ConcurrentHashMap;
public class HazelcastDistributedObjectProvider implements DistributedObjectProvider {
private static final Log log = LogFactory.getLog(HazelcastDistributedObjectProvider.class);
- public HazelcastDistributedObjectProvider() {
- }
-
- private boolean isClustered() {
- AxisConfiguration axisConfiguration = ServiceReferenceHolder.getInstance().getAxisConfiguration();
- return ((axisConfiguration != null) && (axisConfiguration.getClusteringAgent() != null)
- && (getHazelcastInstance() != null));
- }
-
- private HazelcastInstance getHazelcastInstance() {
- return ServiceReferenceHolder.getInstance().getHazelcastInstance();
- }
+ private HazelcastDistributedMapProvider mapProvider;
+ private HazelcastDistributedListProvider listProvider;
+ private Map<Object, Lock> locksMap;
- private com.hazelcast.core.ILock acquireDistributedLock(Object object) {
- if((!isClustered()) || (object == null)) {
- return null;
- }
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName()));
- }
- ILock lock = getHazelcastInstance().getLock(object);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName()));
- }
- return lock;
- }
-
- private void releaseDistributedLock(ILock lock) {
- if(lock == null) {
- return;
- }
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Releasing distributed lock for %s...", lock.getKey()));
- }
- lock.forceUnlock();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Distributed lock released for %s", lock.getKey()));
- }
+ public HazelcastDistributedObjectProvider() {
+ HazelcastInstance hazelcastInstance = ServiceReferenceHolder.getInstance().getHazelcastInstance();
+ mapProvider = new HazelcastDistributedMapProvider(hazelcastInstance);
+ listProvider = new HazelcastDistributedListProvider(hazelcastInstance);
+ locksMap = new HashMap<Object, Lock>();
}
/**
@@ -91,7 +63,28 @@ public class HazelcastDistributedObjectProvider implements DistributedObjectProv
@Override
public Map getMap(String key) {
if(isClustered()) {
- return getHazelcastInstance().getMap(key);
+ return mapProvider.getMap(key, new MapEntryListener() {
+ @Override
+ public <X> void entryAdded(X key) {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Entry added to distributed map: [key] %s", key));
+ }
+ }
+
+ @Override
+ public <X> void entryRemoved(X key) {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Entry removed from distributed map: [key] %s", key));
+ }
+ }
+
+ @Override
+ public <X> void entryUpdated(X key) {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Entry updated in distributed map: [key] %s", key));
+ }
+ }
+ });
} else {
return new ConcurrentHashMap<Object, Object>();
}
@@ -106,94 +99,96 @@ public class HazelcastDistributedObjectProvider implements DistributedObjectProv
@Override
public List getList(String name) {
if(isClustered()) {
- return getHazelcastInstance().getList(name);
+ return listProvider.getList(name, new ListEntryListener() {
+ @Override
+ public void itemAdded(Object item) {
+ if(log.isDebugEnabled()) {
+ log.debug("Item added to distributed list: " + item);
+ }
+ }
+
+ @Override
+ public void itemRemoved(Object item) {
+ if(log.isDebugEnabled()) {
+ log.debug("Item removed from distributed list: " + item);
+ }
+ }
+ });
} else {
return new ArrayList();
}
}
- /**
- * Put a key value pair to a map, if clustered use a distributed lock.
- * @param map
- * @param key
- * @param value
- */
@Override
- public void putToMap(Map map, Object key, Object value) {
+ public Lock acquireLock(Object object) {
+ if(isClustered()) {
+ return acquireDistributedLock(object);
+ } else {
+ Lock lock = locksMap.get(object);
+ if(lock == null) {
+ synchronized (object) {
+ if(lock == null) {
+ lock = new ReentrantLock();
+ locksMap.put(object, lock);
+ }
+ }
+ }
+ lock.lock();
+ return lock;
+ }
+ }
+
+ @Override
+ public void releaseLock(Lock lock) {
if(isClustered()) {
- ILock lock = null;
- try {
- IMap imap = (IMap) map;
- lock = acquireDistributedLock(imap.getName());
- imap.set(key, value);
- } finally {
- releaseDistributedLock(lock);
- }
+ releaseDistributedLock((ILock)lock);
} else {
- map.put(key, value);
+ lock.unlock();
}
}
- /**
- * Remove an object from a map, if clustered use a distributed lock.
- * @param map
- * @param key
- */
- @Override
- public void removeFromMap(Map map, Object key) {
- if(isClustered()) {
- ILock lock = null;
- try {
- IMap imap = (IMap) map;
- lock = acquireDistributedLock(imap.getName());
- imap.delete(key);
- } finally {
- releaseDistributedLock(lock);
- }
- } else {
- map.remove(key);
- }
+ private boolean isClustered() {
+ AxisConfiguration axisConfiguration = ServiceReferenceHolder.getInstance().getAxisConfiguration();
+ return ((axisConfiguration != null) && (axisConfiguration.getClusteringAgent() != null)
+ && (getHazelcastInstance() != null));
}
- /**
- * Add an object to a list, if clustered use a distributed lock.
- * @param list
- * @param value
- */
- @Override
- public void addToList(List list, Object value) {
- if(isClustered()) {
- ILock lock = null;
- try {
- IList ilist = (IList) list;
- lock = acquireDistributedLock(ilist.getName());
- ilist.add(value);
- } finally {
- releaseDistributedLock(lock);
+ private HazelcastInstance getHazelcastInstance() {
+ return ServiceReferenceHolder.getInstance().getHazelcastInstance();
+ }
+
+ protected com.hazelcast.core.ILock acquireDistributedLock(Object object) {
+ if(object == null) {
+ if(log.isWarnEnabled()) {
+ log.warn("Could not acquire distributed lock, object is null");
}
- } else {
- list.add(value);
+ return null;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName()));
+ }
+ ILock lock = getHazelcastInstance().getLock(object);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName()));
}
+ return lock;
}
- /**
- * Remove an object from a list, if clustered use a distributed lock.
- * @param list
- * @param value
- */
- @Override
- public void removeFromList(List list, Object value) {
- if(isClustered()) {
- ILock lock = null;
- try {
- IList ilist = (IList) list;
- lock = acquireDistributedLock(ilist.getName());
- ilist.remove(value);
- } finally {
- releaseDistributedLock(lock);
+ protected void releaseDistributedLock(ILock lock) {
+ if(lock == null) {
+ if(log.isWarnEnabled()) {
+ log.warn("Could not release distributed lock, lock is null");
}
- } else {
- list.remove(value);
+ return;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Releasing distributed lock for %s...", lock.getKey()));
+ }
+ lock.forceUnlock();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Distributed lock released for %s", lock.getKey()));
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java
new file mode 100644
index 0000000..96f72dc
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.clustering.impl;
+
+/**
+ * List entry listener interface.
+ */
+public interface ListEntryListener {
+ /**
+ * Invoked when an item is added to the distributed list.
+ * @param item
+ */
+ void itemAdded(Object item);
+
+ /**
+ * Invoked when an item is removed from the distributed list.
+ * @param item
+ */
+ void itemRemoved(Object item);
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
index afb0c83..60ebc99 100644
--- a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
+++ b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
@@ -28,10 +28,9 @@ import org.junit.Test;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
-import static junit.framework.TestCase.assertFalse;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -51,6 +50,7 @@ public class DistributedObjectProviderTest {
ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
testPutToMap(provider);
+ testPutToMap(provider);
}
@Test
@@ -58,77 +58,47 @@ public class DistributedObjectProviderTest {
ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
testPutToMap(provider);
+ testPutToMap(provider);
}
private void testPutToMap(HazelcastDistributedObjectProvider provider) {
Map<String, String> map = provider.getMap("MAP1");
- provider.putToMap(map, "key1", "value1");
- assertEquals(map.get("key1"), "value1");
- }
-
- @Test
- public void testRemoveFromMapLocal() {
- ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
- HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
- testRemoveFromMap(provider);
- }
-
- @Test
- public void testRemoveFromMapDistributed() {
- ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
- HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
- testRemoveFromMap(provider);
- }
-
- private void testRemoveFromMap(HazelcastDistributedObjectProvider provider) {
- Map<String, String> map = provider.getMap("MAP1");
- provider.putToMap(map, "key1", "value1");
- assertEquals(map.get("key1"), "value1");
- provider.removeFromMap(map, "key1");
- assertNull(map.get("key1"));
+ Lock lock = null;
+ try {
+ lock = provider.acquireLock("MAP1_WRITE_LOCK");
+ map.put("key1", "value1");
+ assertEquals(map.get("key1"), "value1");
+ } finally {
+ provider.releaseLock(lock);
+ }
}
@Test
- public void testAddToListLocal() {
+ public void testGetListLocal() {
ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
testAddToList(provider);
+ testAddToList(provider);
}
@Test
- public void testAddToListDistributed() {
+ public void testGetListDistributed() {
ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
testAddToList(provider);
+ testAddToList(provider);
}
private void testAddToList(HazelcastDistributedObjectProvider provider) {
List list = provider.getList("LIST1");
- String value1 = "value1";
- provider.addToList(list, value1);
- assertTrue(list.contains(value1));
- }
-
- @Test
- public void testRemoveFromListLocal() {
- ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
- HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
- testRemovalFromList(provider);
- }
-
- @Test
- public void testRemoveFromListDistributed() {
- ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
- HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
- testRemovalFromList(provider);
- }
-
- private void testRemovalFromList(HazelcastDistributedObjectProvider provider) {
- List list = provider.getList("LIST1");
- String value1 = "value1";
- provider.addToList(list, value1);
- assertTrue(list.contains(value1));
- provider.removeFromList(list, value1);
- assertFalse(list.contains(value1));
+ Lock lock = null;
+ try {
+ lock = provider.acquireLock("LIST1_WRITE_LOCK");
+ String value1 = "value1";
+ list.add(value1);
+ assertTrue(list.contains(value1));
+ } finally {
+ provider.releaseLock(lock);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
index 3ff824d..240ebba 100755
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
@@ -21,6 +21,8 @@ package org.apache.stratos.load.balancer.context;
import org.apache.stratos.load.balancer.context.map.AlgorithmContextMap;
+import java.util.concurrent.locks.Lock;
+
/**
* Algorithm context is used for identifying the cluster and its current member for executing load balancing algorithms.
* Key: service name, cluster id
@@ -32,7 +34,15 @@ public class AlgorithmContext {
public AlgorithmContext(String serviceName, String clusterId) {
this.serviceName = serviceName;
this.clusterId = clusterId;
- AlgorithmContextMap.getInstance().putCurrentMemberIndex(serviceName, clusterId, 0);
+ Lock lock = null;
+ try {
+ lock = AlgorithmContextMap.getInstance().acquireCurrentMemberIndexLock();
+ AlgorithmContextMap.getInstance().putCurrentMemberIndex(serviceName, clusterId, 0);
+ } finally {
+ if(lock != null) {
+ AlgorithmContextMap.getInstance().releaseCurrentMemberIndexLock(lock);
+ }
+ }
}
public String getServiceName() {
@@ -48,6 +58,14 @@ public class AlgorithmContext {
}
public void setCurrentMemberIndex(int currentMemberIndex) {
- AlgorithmContextMap.getInstance().putCurrentMemberIndex(getServiceName(), getClusterId(), currentMemberIndex);
+ Lock lock = null;
+ try {
+ lock = AlgorithmContextMap.getInstance().acquireCurrentMemberIndexLock();
+ AlgorithmContextMap.getInstance().putCurrentMemberIndex(getServiceName(), getClusterId(), currentMemberIndex);
+ } finally {
+ if(lock != null) {
+ AlgorithmContextMap.getInstance().releaseCurrentMemberIndexLock(lock);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
index 2c03ccb..35acdb0 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
@@ -25,6 +25,7 @@ import org.apache.stratos.common.clustering.DistributedObjectProvider;
import org.apache.stratos.load.balancer.internal.ServiceReferenceHolder;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
/**
* Algorithm context map is a singleton class for managing load balancing algorithm context
@@ -34,6 +35,7 @@ public class AlgorithmContextMap {
@SuppressWarnings("unused")
private static final Log log = LogFactory.getLog(AlgorithmContextMap.class);
private static final String LOAD_BALANCER_ALGORITHM_CONTEXT_MAP = "LOAD_BALANCER_ALGORITHM_CONTEXT_MAP";
+ private static final String CURRENT_MEMBER_INDEX_MAP_LOCK = "CURRENT_MEMBER_INDEX_MAP_LOCK";
private static AlgorithmContextMap instance;
private final Map<String, Integer> clusterMemberIndexMap;
@@ -61,14 +63,24 @@ public class AlgorithmContextMap {
return String.format("%s-%s", serviceName, clusterId);
}
+ public Lock acquireCurrentMemberIndexLock() {
+ return distributedObjectProvider.acquireLock(CURRENT_MEMBER_INDEX_MAP_LOCK);
+ }
+
+ public void releaseCurrentMemberIndexLock(Lock lock) {
+ if(lock != null) {
+ distributedObjectProvider.releaseLock(lock);
+ }
+ }
+
public void putCurrentMemberIndex(String serviceName, String clusterId, int currentMemberIndex) {
String key = constructKey(serviceName, clusterId);
- distributedObjectProvider.putToMap(clusterMemberIndexMap, key, currentMemberIndex);
+ clusterMemberIndexMap.put(key, currentMemberIndex);
}
public void removeCluster(String serviceName, String clusterId) {
String key = constructKey(serviceName, clusterId);
- distributedObjectProvider.removeFromMap(clusterMemberIndexMap, key);
+ clusterMemberIndexMap.remove(key);
}
public int getCurrentMemberIndex(String serviceName, String clusterId) {