You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ga...@apache.org on 2015/08/20 08:44:47 UTC
[3/5] stratos git commit: Adding Metering and Monitoring Service
Implementation
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index dab6827..f76c928 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -31,6 +31,7 @@ import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPubl
import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
import org.apache.stratos.common.Property;
+import org.apache.stratos.common.constants.StratosConstants;
import org.apache.stratos.messaging.domain.application.ClusterDataHolder;
import org.apache.stratos.messaging.domain.instance.ClusterInstance;
import org.apache.stratos.messaging.domain.topology.*;
@@ -67,8 +68,11 @@ public class TopologyBuilder {
TopologyManager.acquireWriteLock();
for (Cartridge cartridge : cartridgeList) {
if (!topology.serviceExists(cartridge.getType())) {
- ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant;
+
+ ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant :
+ ServiceType.SingleTenant;
service = new Service(cartridge.getType(), serviceType, cartridge.getUuid());
+
Properties properties = new Properties();
try {
@@ -199,867 +203,899 @@ public class TopologyBuilder {
}
log.debug("Creating cluster port mappings: [application-id] " + appUuid);
- for(Cluster cluster : appClusters) {
+ for (Cluster cluster : appClusters) {
String cartridgeUuid = cluster.getServiceUuid();
Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeUuid);
- if(cartridge == null) {
+ if (cartridge == null) {
throw new CloudControllerException("Cartridge not found: [cartridge-uuid] " + cartridgeUuid);
}
- for(PortMapping portMapping : cartridge.getPortMappings()) {
+ for (PortMapping portMapping : cartridge.getPortMappings()) {
ClusterPortMapping clusterPortMapping = new ClusterPortMapping(appUuid,
cluster.getClusterId(), portMapping.getName(), portMapping.getProtocol(), portMapping.getPort(),
- portMapping.getProxyPort());
- CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
- log.debug("Cluster port mapping created: " + clusterPortMapping.toString());
- }
- }
-
- // Persist cluster port mappings
- CloudControllerContext.getInstance().persist();
-
- // Send application clusters created event
- TopologyEventPublisher.sendApplicationClustersCreated(appUuid, appClusters);
- }
-
- public static void handleApplicationClustersRemoved(String appId,
- Set<ClusterDataHolder> clusterData) {
- TopologyManager.acquireWriteLock();
-
- List<Cluster> removedClusters = new ArrayList<Cluster>();
- CloudControllerContext context = CloudControllerContext.getInstance();
- try {
- Topology topology = TopologyManager.getTopology();
-
- if (clusterData != null) {
- // remove clusters from CC topology model and remove runtime information
- for (ClusterDataHolder aClusterData : clusterData) {
- Service aService = topology.getService(aClusterData.getServiceUuid());
- if (aService != null) {
- removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
- } else {
- log.warn("Service " + aClusterData.getServiceType() + " not found, " +
- "unable to remove Cluster " + aClusterData.getClusterId());
+ portMapping.getProxyPort());
+ CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
+ log.debug("Cluster port mapping created: " + clusterPortMapping.toString());
}
- // remove runtime data
- context.removeClusterContext(aClusterData.getClusterId());
-
- log.info("Removed application [ " + appId + " ]'s Cluster " +
- "[ " + aClusterData.getClusterId() + " ] from the topology");
}
- // persist runtime data changes
- CloudControllerContext.getInstance().persist();
- } else {
- log.info("No cluster data found for application " + appId + " to remove");
- }
- TopologyManager.updateTopology(topology);
+ // Persist cluster port mappings
+ CloudControllerContext.getInstance().persist();
- } finally {
- TopologyManager.releaseWriteLock();
- }
+ // Send application clusters created event
+ TopologyEventPublisher.sendApplicationClustersCreated(appUuid, appClusters);
+ }
- // Remove cluster port mappings of application
- CloudControllerContext.getInstance().removeClusterPortMappings(appId);
- CloudControllerContext.getInstance().persist();
+ public static void handleApplicationClustersRemoved (String appId,
+ Set < ClusterDataHolder > clusterData){
+ TopologyManager.acquireWriteLock();
- TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData);
+ List<Cluster> removedClusters = new ArrayList<Cluster>();
+ CloudControllerContext context = CloudControllerContext.getInstance();
+ try {
+ Topology topology = TopologyManager.getTopology();
+
+ if (clusterData != null) {
+ // remove clusters from CC topology model and remove runtime information
+ for (ClusterDataHolder aClusterData : clusterData) {
+ Service aService = topology.getService(aClusterData.getServiceUuid());
+ if (aService != null) {
+ removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
+ } else {
+ log.warn("Service " + aClusterData.getServiceType() + " not found, " +
+ "unable to remove Cluster " + aClusterData.getClusterId());
+ }
+ // remove runtime data
+ context.removeClusterContext(aClusterData.getClusterId());
- }
+ log.info("Removed application [ " + appId + " ]'s Cluster " +
+ "[ " + aClusterData.getClusterId() + " ] from the topology");
+ }
+ // persist runtime data changes
+ CloudControllerContext.getInstance().persist();
+ } else {
+ log.info("No cluster data found for application " + appId + " to remove");
+ }
- public static void handleClusterReset(ClusterStatusClusterResetEvent event) {
- TopologyManager.acquireWriteLock();
+ TopologyManager.updateTopology(topology);
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- log.error("Service " + event.getServiceName() +
- " not found in Topology, unable to update the cluster status to Created");
- return;
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " +
- "status to Created");
- return;
- }
+ // Remove cluster port mappings of application
+ CloudControllerContext.getInstance().removeClusterPortMappings(appId);
+ CloudControllerContext.getInstance().persist();
- ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Created;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Created adding status started for" + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(),
- event.getClusterId(), event.getInstanceId());
- } else {
- log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- }
+ TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData);
- } finally {
- TopologyManager.releaseWriteLock();
}
+ public static void handleClusterReset (ClusterStatusClusterResetEvent event){
+ TopologyManager.acquireWriteLock();
- }
-
- public static void handleClusterInstanceCreated(String serviceUuid, String clusterId,
- String alias, String instanceId, String partitionId,
- String networkPartitionUuid) {
-
- TopologyManager.acquireWriteLock();
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ log.error("Service " + event.getServiceName() +
+ " not found in Topology, unable to update the cluster status to Created");
+ return;
+ }
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(serviceUuid);
- if (service == null) {
- log.error("Service " + serviceUuid +
- " not found in Topology, unable to update the cluster status to Created");
- return;
- }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " +
+ "status to Created");
+ return;
+ }
- Cluster cluster = service.getCluster(clusterId);
- if (cluster == null) {
- log.error("Cluster " + clusterId + " not found in Topology, unable to update " +
- "status to Created");
- return;
- }
+ ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster] " +
+ event.getClusterId() + " [instance-id] " +
+ event.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Created;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Created adding status started for" + cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(),
+ event.getClusterId(), event.getInstanceId());
+ } else {
+ log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s [status-requested] %s",
+ event.getClusterId(), event.getInstanceId(),
+ context.getStatus(), status));
+ }
- if (cluster.getInstanceContexts(instanceId) != null) {
- log.warn("The Instance context for the cluster already exists for [cluster] " +
- clusterId + " [instance-id] " + instanceId);
- return;
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId);
- clusterInstance.setNetworkPartitionUuid(networkPartitionUuid);
- clusterInstance.setPartitionId(partitionId);
- cluster.addInstanceContext(instanceId, clusterInstance);
- TopologyManager.updateTopology(topology);
-
- ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
- new ClusterInstanceCreatedEvent(serviceUuid, clusterId,
- clusterInstance);
- clusterInstanceCreatedEvent.setPartitionId(partitionId);
- TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
-
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
- public static void handleClusterRemoved(ClusterContext ctxt) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(ctxt.getCartridgeUuid());
- String deploymentPolicy;
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- ctxt.getCartridgeUuid()));
- return;
}
- if (!service.clusterExists(ctxt.getClusterId())) {
- log.warn(String.format("Cluster %s does not exist for service %s",
- ctxt.getClusterId(),
- ctxt.getCartridgeUuid()));
- return;
- }
+ public static void handleClusterInstanceCreated (String serviceUuid, String clusterId,
+ String alias, String instanceId, String partitionId,
+ String networkPartitionUuid){
- try {
TopologyManager.acquireWriteLock();
- Cluster cluster = service.removeCluster(ctxt.getClusterId());
- deploymentPolicy = cluster.getDeploymentPolicyUuid();
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
- }
- /**
- * Add member object to the topology and publish member created event
- *
- * @param memberContext
- */
- public static void handleMemberCreatedEvent(MemberContext memberContext) {
- Topology topology = TopologyManager.getTopology();
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(serviceUuid);
+ if (service == null) {
+ log.error("Service " + serviceUuid +
+ " not found in Topology, unable to update the cluster status to Created");
+ return;
+ }
- Service service = topology.getService(memberContext.getCartridgeType());
- String clusterId = memberContext.getClusterId();
- Cluster cluster = service.getCluster(clusterId);
- String memberId = memberContext.getMemberId();
- String clusterInstanceId = memberContext.getClusterInstanceId();
- String networkPartitionId = memberContext.getNetworkPartitionId();
- String partitionId = memberContext.getPartition().getUuid();
- String lbClusterId = memberContext.getLbClusterId();
- long initTime = memberContext.getInitTime();
-
- if (cluster.memberExists(memberId)) {
- log.warn(String.format("Member %s already exists", memberId));
- return;
- }
+ Cluster cluster = service.getCluster(clusterId);
+ if (cluster == null) {
+ log.error("Cluster " + clusterId + " not found in Topology, unable to update " +
+ "status to Created");
+ return;
+ }
- try {
- TopologyManager.acquireWriteLock();
- Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId,
- networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime);
- member.setStatus(MemberStatus.Created);
- member.setLbClusterId(lbClusterId);
- member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
- cluster.addMember(member);
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
+ if (cluster.getInstanceContexts(instanceId) != null) {
+ log.warn("The Instance context for the cluster already exists for [cluster] " +
+ clusterId + " [instance-id] " + instanceId);
+ return;
+ }
- TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
- }
+ ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId);
+ clusterInstance.setNetworkPartitionUuid(networkPartitionUuid);
+ clusterInstance.setPartitionId(partitionId);
+ cluster.addInstanceContext(instanceId, clusterInstance);
+ TopologyManager.updateTopology(topology);
- /**
- * Update member status to initialized and publish member initialized event
- *
- * @param memberContext
- */
- public static void handleMemberInitializedEvent(MemberContext memberContext) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(memberContext.getCartridgeType());
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- memberContext.getCartridgeType()));
- return;
- }
- if (!service.clusterExists(memberContext.getClusterId())) {
- log.warn(String.format("Cluster %s does not exist in service %s",
- memberContext.getClusterId(),
- memberContext.getCartridgeType()));
- return;
- }
+ ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
+ new ClusterInstanceCreatedEvent(serviceUuid, clusterId,
+ clusterInstance);
+ clusterInstanceCreatedEvent.setPartitionId(partitionId);
+ TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
- Member member = service.getCluster(memberContext.getClusterId()).
- getMember(memberContext.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- memberContext.getMemberId()));
- return;
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
}
- try {
- TopologyManager.acquireWriteLock();
- // Set ip addresses
- member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
- if (memberContext.getPrivateIPs() != null) {
- member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
- }
- member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
- if (memberContext.getPublicIPs() != null) {
- member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
+ public static void handleClusterRemoved (ClusterContext ctxt){
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(ctxt.getCartridgeUuid());
+ String deploymentPolicy;
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ ctxt.getCartridgeUuid()));
+ return;
}
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Initialized)) {
- log.error("Invalid state transition from " + member.getStatus() + " to " +
- MemberStatus.Initialized);
+ if (!service.clusterExists(ctxt.getClusterId())) {
+ log.warn(String.format("Cluster %s does not exist for service %s",
+ ctxt.getClusterId(),
+ ctxt.getCartridgeUuid()));
return;
- } else {
- member.setStatus(MemberStatus.Initialized);
- log.info("Member status updated to initialized");
+ }
+ try {
+ TopologyManager.acquireWriteLock();
+ Cluster cluster = service.removeCluster(ctxt.getClusterId());
+ deploymentPolicy = cluster.getDeploymentPolicyUuid();
TopologyManager.updateTopology(topology);
-
- TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
- //publishing data
- BAMUsageDataPublisher.publish(memberContext.getMemberId(),
- memberContext.getPartition().getUuid(),
- memberContext.getNetworkPartitionId(),
- memberContext.getClusterId(),
- memberContext.getCartridgeType(),
- MemberStatus.Initialized.toString(),
- null);
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- } finally {
- TopologyManager.releaseWriteLock();
+ TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
}
- }
- private static int findKubernetesServicePort(String clusterId, List<KubernetesService> kubernetesServices,
- PortMapping portMapping) {
- for (KubernetesService kubernetesService : kubernetesServices) {
- if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
- return kubernetesService.getPort();
+ /**
+ * Add member object to the topology and publish member created event
+ *
+ * @param memberContext
+ */
+ public static void handleMemberCreatedEvent (MemberContext memberContext){
+ Topology topology = TopologyManager.getTopology();
+
+ Service service = topology.getService(memberContext.getCartridgeType());
+ String clusterId = memberContext.getClusterId();
+ Cluster cluster = service.getCluster(clusterId);
+ String memberId = memberContext.getMemberId();
+ String clusterInstanceId = memberContext.getClusterInstanceId();
+ String networkPartitionId = memberContext.getNetworkPartitionId();
+ String partitionId = memberContext.getPartition().getUuid();
+ String lbClusterId = memberContext.getLbClusterId();
+ long initTime = memberContext.getInitTime();
+ String autoscalingReason = memberContext.getProperties().getProperty(
+ StratosConstants.SCALING_REASON).getValue();
+ long scalingTime = Long.parseLong(memberContext.getProperties().getProperty(
+ StratosConstants.SCALING_TIME).getValue());
+
+
+ if (cluster.memberExists(memberId)) {
+ log.warn(String.format("Member %s already exists", memberId));
+ return;
}
+
+ try {
+ TopologyManager.acquireWriteLock();
+ Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId,
+ networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime);
+ member.setStatus(MemberStatus.Created);
+ member.setLbClusterId(lbClusterId);
+ member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
+ cluster.addMember(member);
+ TopologyManager.updateTopology(topology);
+ //member created time
+ Long timeStamp = System.currentTimeMillis();
+ //publishing to BAM
+ BAMUsageDataPublisher
+ .publish(memberContext.getMemberId(),
+ memberContext.getPartition().getId(),
+ memberContext.getNetworkPartitionId(),
+ memberContext.getClusterId(),
+ memberContext.getClusterInstanceId(),
+ memberContext.getCartridgeType(),
+ MemberStatus.Created.toString(),
+ timeStamp, autoscalingReason,
+ scalingTime, null);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
+ TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
}
- throw new RuntimeException("Kubernetes service port not found: [cluster-id] " + clusterId + " [port] "
- + portMapping.getPort());
- }
- public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) {
- try {
+ /**
+ * Update member status to initialized and publish member initialized event
+ *
+ * @param memberContext
+ */
+ public static void handleMemberInitializedEvent (MemberContext memberContext){
Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceStartedEvent.getServiceName());
+ Service service = topology.getService(memberContext.getCartridgeType());
if (service == null) {
log.warn(String.format("Service %s does not exist",
- instanceStartedEvent.getServiceName()));
+ memberContext.getCartridgeType()));
return;
}
- if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
+ if (!service.clusterExists(memberContext.getClusterId())) {
log.warn(String.format("Cluster %s does not exist in service %s",
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getServiceName()));
+ memberContext.getClusterId(),
+ memberContext.getCartridgeType()));
return;
}
- Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId());
- Member member = cluster.getMember(instanceStartedEvent.getMemberId());
+ Member member = service.getCluster(memberContext.getClusterId()).
+ getMember(memberContext.getMemberId());
if (member == null) {
log.warn(String.format("Member %s does not exist",
- instanceStartedEvent.getMemberId()));
+ memberContext.getMemberId()));
return;
}
try {
TopologyManager.acquireWriteLock();
+
+ // Set ip addresses
+ member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
+ if (memberContext.getPrivateIPs() != null) {
+ member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
+ }
+ member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
+ if (memberContext.getPublicIPs() != null) {
+ member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
+ }
+
// try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Starting)) {
- log.error("Invalid State Transition from " + member.getStatus() + " to " +
- MemberStatus.Starting);
+ if (!member.isStateTransitionValid(MemberStatus.Initialized)) {
+ log.error("Invalid state transition from " + member.getStatus() + " to " +
+ MemberStatus.Initialized);
return;
} else {
- member.setStatus(MemberStatus.Starting);
- log.info("member started event adding status started");
+ member.setStatus(MemberStatus.Initialized);
+ log.info("Member status updated to initialized");
TopologyManager.updateTopology(topology);
- //memberStartedEvent.
- TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
+ //member intialized time
+ Long timeStamp = System.currentTimeMillis();
+ TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
//publishing data
- BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
- instanceStartedEvent.getPartitionId(),
- instanceStartedEvent.getNetworkPartitionId(),
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getServiceName(),
- MemberStatus.Starting.toString(),
- null);
+ BAMUsageDataPublisher.publish(memberContext.getMemberId(),
+ memberContext.getPartition().getUuid(),
+ memberContext.getNetworkPartitionId(),
+ memberContext.getClusterInstanceId(),
+ memberContext.getClusterId(),
+ memberContext.getCartridgeType(),
+ MemberStatus.Initialized.toString(),
+ timeStamp, null, null, null);
}
} finally {
TopologyManager.releaseWriteLock();
}
- } catch (Exception e) {
- String message = String.format("Could not handle member started event: [application-id] %s " +
- "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(),
- instanceStartedEvent.getServiceName(), instanceStartedEvent.getMemberId());
- log.warn(message, e);
- }
- }
-
- public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceActivatedEvent.getServiceName());
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceActivatedEvent.getServiceName()));
- return;
}
- Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceActivatedEvent.getClusterId()));
- return;
+ private static int findKubernetesServicePort (String clusterId, List < KubernetesService > kubernetesServices,
+ PortMapping portMapping){
+ for (KubernetesService kubernetesService : kubernetesServices) {
+ if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
+ return kubernetesService.getPort();
+ }
+ }
+ throw new RuntimeException("Kubernetes service port not found: [cluster-id] " + clusterId + " [port] "
+ + portMapping.getPort());
}
- Member member = cluster.getMember(instanceActivatedEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceActivatedEvent.getMemberId()));
- return;
- }
+ public static void handleMemberStarted (InstanceStartedEvent instanceStartedEvent){
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(instanceStartedEvent.getServiceName());
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ instanceStartedEvent.getServiceName()));
+ return;
+ }
+ if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
+ log.warn(String.format("Cluster %s does not exist in service %s",
+ instanceStartedEvent.getClusterId(),
+ instanceStartedEvent.getServiceName()));
+ return;
+ }
- MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent(
- instanceActivatedEvent.getServiceName(),
- instanceActivatedEvent.getClusterId(),
- instanceActivatedEvent.getClusterInstanceId(),
- instanceActivatedEvent.getMemberId(),
- instanceActivatedEvent.getNetworkPartitionId(),
- instanceActivatedEvent.getPartitionId());
-
- // grouping - set grouid
- //TODO
- memberActivatedEvent.setApplicationId(null);
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Active)) {
- log.error("Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]");
- return;
- } else {
- member.setStatus(MemberStatus.Active);
+ Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId());
+ Member member = cluster.getMember(instanceStartedEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ instanceStartedEvent.getMemberId()));
+ return;
+ }
- // Set member ports
try {
- Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceUuid());
- if (cartridge == null) {
- throw new RuntimeException(String.format("Cartridge not found: [cartridge-type] %s",
- service.getServiceName()));
- }
-
- Port port;
- int portValue;
- List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings());
- String clusterId = cluster.getClusterId();
- ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
- List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
+ TopologyManager.acquireWriteLock();
+ // try update lifecycle state
+ if (!member.isStateTransitionValid(MemberStatus.Starting)) {
+ log.error("Invalid State Transition from " + member.getStatus() + " to " +
+ MemberStatus.Starting);
+ return;
+ } else {
+ member.setStatus(MemberStatus.Starting);
+ log.info("member started event adding status started");
- for (PortMapping portMapping : portMappings) {
- if (kubernetesServices != null) {
- portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping);
- } else {
- portValue = portMapping.getPort();
- }
- port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort());
- member.addPort(port);
- memberActivatedEvent.addPort(port);
+ TopologyManager.updateTopology(topology);
+ //member started time
+ Long timeStamp = System.currentTimeMillis();
+ //memberStartedEvent.
+ TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
+ //publishing data
+ BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
+ instanceStartedEvent.getPartitionId(),
+ instanceStartedEvent.getNetworkPartitionId(),
+ instanceStartedEvent.getClusterInstanceId(),
+ instanceStartedEvent.getClusterId(),
+ instanceStartedEvent.getServiceName(),
+ MemberStatus.Starting.toString(),
+ timeStamp, null, null, null);
}
- } catch (Exception e) {
- String message = String.format("Could not add member ports: [service-name] %s [member-id] %s",
- memberActivatedEvent.getServiceName(), memberActivatedEvent.getMemberId());
- log.error(message, e);
+ } finally {
+ TopologyManager.releaseWriteLock();
}
-
- // Set member ip addresses
- memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
- memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
- memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
- memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
- TopologyManager.updateTopology(topology);
-
- // Publish member activated event
- TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
-
- // Publish statistics data
- BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
- memberActivatedEvent.getPartitionId(),
- memberActivatedEvent.getNetworkPartitionId(),
- memberActivatedEvent.getClusterId(),
- memberActivatedEvent.getServiceName(),
- MemberStatus.Active.toString(),
- null);
+ } catch (Exception e) {
+ String message = String.format("Could not handle member started event: [application-id] %s " +
+ "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(),
+ instanceStartedEvent.getServiceName(), instanceStartedEvent.getMemberId());
+ log.warn(message, e);
}
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
- public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent)
- throws InvalidMemberException, InvalidCartridgeTypeException {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName());
- //update the status of the member
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceReadyToShutdownEvent.getServiceName()));
- return;
- }
-
- Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceReadyToShutdownEvent.getClusterId()));
- return;
}
+ public static void handleMemberActivated (InstanceActivatedEvent instanceActivatedEvent){
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(instanceActivatedEvent.getServiceName());
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ instanceActivatedEvent.getServiceName()));
+ return;
+ }
- Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceReadyToShutdownEvent.getMemberId()));
- return;
- }
- MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent(
- instanceReadyToShutdownEvent.getServiceName(),
- instanceReadyToShutdownEvent.getClusterId(),
- instanceReadyToShutdownEvent.getClusterInstanceId(),
- instanceReadyToShutdownEvent.getMemberId(),
- instanceReadyToShutdownEvent.getNetworkPartitionId(),
- instanceReadyToShutdownEvent.getPartitionId());
- try {
- TopologyManager.acquireWriteLock();
+ Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ instanceActivatedEvent.getClusterId()));
+ return;
+ }
- if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
- log.error("Invalid State Transition from " + member.getStatus() + " to " +
- MemberStatus.ReadyToShutDown);
+ Member member = cluster.getMember(instanceActivatedEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ instanceActivatedEvent.getMemberId()));
return;
}
- member.setStatus(MemberStatus.ReadyToShutDown);
- log.info("Member Ready to shut down event adding status started");
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
- //publishing data
- BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
- instanceReadyToShutdownEvent.getPartitionId(),
- instanceReadyToShutdownEvent.getNetworkPartitionId(),
- instanceReadyToShutdownEvent.getClusterId(),
- instanceReadyToShutdownEvent.getServiceName(),
- MemberStatus.ReadyToShutDown.toString(),
- null);
- //termination of particular instance will be handled by autoscaler
- }
+ MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent(
+ instanceActivatedEvent.getServiceName(),
+ instanceActivatedEvent.getClusterId(),
+ instanceActivatedEvent.getClusterInstanceId(),
+ instanceActivatedEvent.getMemberId(),
+ instanceActivatedEvent.getNetworkPartitionId(),
+ instanceActivatedEvent.getPartitionId());
+
+ // grouping - set grouid
+ //TODO
+ memberActivatedEvent.setApplicationId(null);
+ try {
+ TopologyManager.acquireWriteLock();
+ // try update lifecycle state
+ if (!member.isStateTransitionValid(MemberStatus.Active)) {
+ log.error("Invalid state transition from [" + member.getStatus() + "] to [" +
+ MemberStatus.Active + "]");
+ return;
+ } else {
+ member.setStatus(MemberStatus.Active);
- public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent)
- throws InvalidMemberException, InvalidCartridgeTypeException {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName());
- //update the status of the member
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceMaintenanceModeEvent.getServiceName()));
- return;
- }
+ // Set member ports
+ try {
+ Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceUuid());
+ if (cartridge == null) {
+ throw new RuntimeException(String.format("Cartridge not found: [cartridge-type] %s",
+ service.getServiceName()));
+ }
- Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceMaintenanceModeEvent.getClusterId()));
- return;
- }
+ Port port;
+ int portValue;
+ List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings());
+ String clusterId = cluster.getClusterId();
+ ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
+ List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
- Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceMaintenanceModeEvent.getMemberId()));
- return;
+ for (PortMapping portMapping : portMappings) {
+ if (kubernetesServices != null) {
+ portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping);
+ } else {
+ portValue = portMapping.getPort();
+ }
+ port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort());
+ member.addPort(port);
+ memberActivatedEvent.addPort(port);
+ }
+ } catch (Exception e) {
+ String message = String.format("Could not add member ports: [service-name] %s [member-id] %s",
+ memberActivatedEvent.getServiceName(), memberActivatedEvent.getMemberId());
+ log.error(message, e);
+ }
+
+ // Set member ip addresses
+ memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
+ memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
+ memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
+ memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
+ TopologyManager.updateTopology(topology);
+ //member activated time
+ Long timeStamp = System.currentTimeMillis();
+ // Publish member activated event
+ TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
+
+ // Publish statistics data
+ BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
+ memberActivatedEvent.getPartitionId(),
+ memberActivatedEvent.getNetworkPartitionId(),
+ memberActivatedEvent.getClusterInstanceId(),
+ memberActivatedEvent.getClusterId(),
+ memberActivatedEvent.getServiceName(),
+ MemberStatus.Active.toString(),
+ timeStamp, null, null, null);
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
}
+ public static void handleMemberReadyToShutdown (InstanceReadyToShutdownEvent instanceReadyToShutdownEvent)
+ throws InvalidMemberException, InvalidCartridgeTypeException {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName());
+ //update the status of the member
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ instanceReadyToShutdownEvent.getServiceName()));
+ return;
+ }
- MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent(
- instanceMaintenanceModeEvent.getServiceName(),
- instanceMaintenanceModeEvent.getClusterId(),
- instanceMaintenanceModeEvent.getClusterInstanceId(),
- instanceMaintenanceModeEvent.getMemberId(),
- instanceMaintenanceModeEvent.getNetworkPartitionId(),
- instanceMaintenanceModeEvent.getPartitionId());
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
- log.error("Invalid State Transition from " + member.getStatus() + " to "
- + MemberStatus.In_Maintenance);
+ Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ instanceReadyToShutdownEvent.getClusterId()));
return;
}
- member.setStatus(MemberStatus.In_Maintenance);
- log.info("member maintenance mode event adding status started");
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- //publishing data
- TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
- }
+ Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ instanceReadyToShutdownEvent.getMemberId()));
+ return;
+ }
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent(
+ instanceReadyToShutdownEvent.getServiceName(),
+ instanceReadyToShutdownEvent.getClusterId(),
+ instanceReadyToShutdownEvent.getClusterInstanceId(),
+ instanceReadyToShutdownEvent.getMemberId(),
+ instanceReadyToShutdownEvent.getNetworkPartitionId(),
+ instanceReadyToShutdownEvent.getPartitionId());
+ //member ReadyToShutDown state change time
+ Long timeStamp = null;
+ try {
+ TopologyManager.acquireWriteLock();
- /**
- * Remove member from topology and send member terminated event.
- *
- * @param serviceName
- * @param clusterId
- * @param networkPartitionId
- * @param partitionId
- * @param memberId
- */
- public static void handleMemberTerminated(String serviceName, String clusterId,
- String networkPartitionId, String partitionId,
- String memberId) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(serviceName);
- Properties properties;
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- serviceName));
- return;
- }
- Cluster cluster = service.getCluster(clusterId);
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterId));
- return;
- }
+ if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
+ log.error("Invalid State Transition from " + member.getStatus() + " to " +
+ MemberStatus.ReadyToShutDown);
+ return;
+ }
+ member.setStatus(MemberStatus.ReadyToShutDown);
+ log.info("Member Ready to shut down event adding status started");
- Member member = cluster.getMember(memberId);
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- memberId));
- return;
+ TopologyManager.updateTopology(topology);
+ timeStamp = System.currentTimeMillis();
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+ //publishing data
+ BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
+ instanceReadyToShutdownEvent.getPartitionId(),
+ instanceReadyToShutdownEvent.getNetworkPartitionId(),
+ instanceReadyToShutdownEvent.getClusterInstanceId(),
+ instanceReadyToShutdownEvent.getClusterId(),
+ instanceReadyToShutdownEvent.getServiceName(),
+ MemberStatus.ReadyToShutDown.toString(),
+ timeStamp, null, null, null);
+ //termination of particular instance will be handled by autoscaler
}
- String clusterInstanceId = member.getClusterInstanceId();
+ public static void handleMemberMaintenance (InstanceMaintenanceModeEvent instanceMaintenanceModeEvent)
+ throws InvalidMemberException, InvalidCartridgeTypeException {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName());
+ //update the status of the member
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ instanceMaintenanceModeEvent.getServiceName()));
+ return;
+ }
- try {
- TopologyManager.acquireWriteLock();
- properties = member.getProperties();
- cluster.removeMember(member);
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- /* @TODO leftover from grouping_poc*/
- String groupAlias = null;
- TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId,
- clusterInstanceId, networkPartitionId,
- partitionId, properties, groupAlias);
- }
+ Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ instanceMaintenanceModeEvent.getClusterId()));
+ return;
+ }
- public static void handleMemberSuspended() {
- //TODO
- try {
- TopologyManager.acquireWriteLock();
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
+ Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ instanceMaintenanceModeEvent.getMemberId()));
+ return;
+ }
- public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
- //update the status of the cluster
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- clusterStatusClusterActivatedEvent.getServiceName()));
- return;
- }
+ MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent(
+ instanceMaintenanceModeEvent.getServiceName(),
+ instanceMaintenanceModeEvent.getClusterId(),
+ instanceMaintenanceModeEvent.getClusterInstanceId(),
+ instanceMaintenanceModeEvent.getMemberId(),
+ instanceMaintenanceModeEvent.getNetworkPartitionId(),
+ instanceMaintenanceModeEvent.getPartitionId());
+ try {
+ TopologyManager.acquireWriteLock();
+ // try update lifecycle state
+ if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
+ log.error("Invalid State Transition from " + member.getStatus() + " to "
+ + MemberStatus.In_Maintenance);
+ return;
+ }
+ member.setStatus(MemberStatus.In_Maintenance);
+ log.info("member maintenance mode event adding status started");
- Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterStatusClusterActivatedEvent.getClusterId()));
- return;
- }
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ //publishing data
+ TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
- String clusterId = cluster.getClusterId();
- ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
- if (clusterContext == null) {
- log.warn("Cluster context not found: [cluster-id] " + clusterId);
- return;
}
- ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
- new ClusterInstanceActivatedEvent(
- clusterStatusClusterActivatedEvent.getAppId(),
- clusterStatusClusterActivatedEvent.getServiceName(),
- clusterStatusClusterActivatedEvent.getClusterId(),
- clusterStatusClusterActivatedEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
- cluster.setKubernetesServices(kubernetesServices);
- clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices);
-
- ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
- if (context == null) {
- log.warn("Cluster instance context is not found for [cluster] " +
- clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " +
- clusterStatusClusterActivatedEvent.getInstanceId());
+ /**
+ * Remove member from topology and send member terminated event.
+ *
+ * @param serviceName
+ * @param clusterId
+ * @param networkPartitionId
+ * @param partitionId
+ * @param memberId
+ */
+ public static void handleMemberTerminated (String serviceName, String clusterId,
+ String networkPartitionId, String partitionId,
+ String memberId){
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(serviceName);
+ Properties properties;
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ serviceName));
return;
}
- ClusterStatus status = ClusterStatus.Active;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster activated adding status started for " + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- // publish event
- TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
- } else {
- log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId(),
- context.getStatus(), status));
+ Cluster cluster = service.getCluster(clusterId);
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ clusterId));
return;
}
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
+ Member member = cluster.getMember(memberId);
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ memberId));
+ return;
+ }
- public static void handleClusterInactivateEvent(
- ClusterStatusClusterInactivateEvent clusterInactivateEvent) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(clusterInactivateEvent.getServiceName());
- //update the status of the cluster
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- clusterInactivateEvent.getServiceName()));
- return;
+ String clusterInstanceId = member.getClusterInstanceId();
+
+ try {
+ TopologyManager.acquireWriteLock();
+ properties = member.getProperties();
+ cluster.removeMember(member);
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ /* @TODO leftover from grouping_poc*/
+ String groupAlias = null;
+ TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId,
+ clusterInstanceId, networkPartitionId,
+ partitionId, properties, groupAlias);
}
- Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterInactivateEvent.getClusterId()));
- return;
+ public static void handleMemberSuspended () {
+ //TODO
+ try {
+ TopologyManager.acquireWriteLock();
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
}
- ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
- new ClusterInstanceInactivateEvent(
- clusterInactivateEvent.getAppId(),
- clusterInactivateEvent.getServiceName(),
- clusterInactivateEvent.getClusterId(),
- clusterInactivateEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- clusterInactivateEvent.getClusterId() + " [instance-id] " +
- clusterInactivateEvent.getInstanceId());
+ public static void handleClusterActivatedEvent (ClusterStatusClusterActivatedEvent
+ clusterStatusClusterActivatedEvent){
+
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
+ //update the status of the cluster
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ clusterStatusClusterActivatedEvent.getServiceName()));
return;
}
- ClusterStatus status = ClusterStatus.Inactive;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Inactive adding status started for" + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
- } else {
- log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(),
- context.getStatus(), status));
+
+ Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ clusterStatusClusterActivatedEvent.getClusterId()));
return;
}
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
+ String clusterId = cluster.getClusterId();
+ ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
+ if (clusterContext == null) {
+ log.warn("Cluster context not found: [cluster-id] " + clusterId);
+ return;
+ }
- private static void deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event) {
- try {
- MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient();
- metadataClient.deleteApplicationProperties(event.getAppId());
- } catch (Exception e) {
- log.error("Error occurred while deleting the application resources frm metadata service ", e);
- }
- }
-
- public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) {
+ ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
+ new ClusterInstanceActivatedEvent(
+ clusterStatusClusterActivatedEvent.getAppId(),
+ clusterStatusClusterActivatedEvent.getServiceName(),
+ clusterStatusClusterActivatedEvent.getClusterId(),
+ clusterStatusClusterActivatedEvent.getInstanceId());
+ try {
+ TopologyManager.acquireWriteLock();
+ List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
+ cluster.setKubernetesServices(kubernetesServices);
+ clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices);
+
+ ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster instance context is not found for [cluster] " +
+ clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " +
+ clusterStatusClusterActivatedEvent.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Active;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster activated adding status started for " + cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ // publish event
+ TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
+ } else {
+ log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s [status-requested] %s",
+ clusterStatusClusterActivatedEvent.getClusterId(),
+ clusterStatusClusterActivatedEvent.getInstanceId(),
+ context.getStatus(), status));
+ return;
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
- TopologyManager.acquireWriteLock();
+ }
- try {
+ public static void handleClusterInactivateEvent (
+ ClusterStatusClusterInactivateEvent clusterInactivateEvent){
Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(event.getServiceName());
-
+ Service service = topology.getService(clusterInactivateEvent.getServiceName());
//update the status of the cluster
if (service == null) {
log.warn(String.format("Service %s does not exist",
- event.getServiceName()));
+ clusterInactivateEvent.getServiceName()));
return;
}
- Cluster cluster = service.getCluster(event.getClusterId());
+ Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId());
if (cluster == null) {
log.warn(String.format("Cluster %s does not exist",
- event.getClusterId()));
+ clusterInactivateEvent.getClusterId()));
return;
}
- ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
+ ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
+ new ClusterInstanceInactivateEvent(
+ clusterInactivateEvent.getAppId(),
+ clusterInactivateEvent.getServiceName(),
+ clusterInactivateEvent.getClusterId(),
+ clusterInactivateEvent.getInstanceId());
+ try {
+ TopologyManager.acquireWriteLock();
+ ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster] " +
+ clusterInactivateEvent.getClusterId() + " [instance-id] " +
+ clusterInactivateEvent.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Inactive;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Inactive adding status started for" + cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
+ } else {
+ log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s [status-requested] %s",
+ clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(),
+ context.getStatus(), status));
+ return;
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- ClusterStatus status = ClusterStatus.Terminated;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Terminated adding status started for and removing the cluster instance"
- + cluster.getClusterId());
- cluster.removeInstanceContext(event.getInstanceId());
- TopologyManager.updateTopology(topology);
- //publishing data
- ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(event.getAppId(),
- event.getServiceName(), event.getClusterId(), event.getInstanceId());
+ }
- TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
- } else {
- log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- return;
+
+ private static void deleteAppResourcesFromMetadataService (ApplicationInstanceTerminatedEvent event){
+ try {
+ MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient();
+ metadataClient.deleteApplicationProperties(event.getAppId());
+ } catch (Exception e) {
+ log.error("Error occurred while deleting the application resources frm metadata service ", e);
}
- } finally {
- TopologyManager.releaseWriteLock();
}
+ public static void handleClusterTerminatedEvent (ClusterStatusClusterTerminatedEvent event){
- }
+ TopologyManager.acquireWriteLock();
- public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) {
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(event.getServiceName());
- TopologyManager.acquireWriteLock();
+ //update the status of the cluster
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ event.getServiceName()));
+ return;
+ }
- try {
- Topology topology = TopologyManager.getTopology();
- Cluster cluster = topology.getService(event.getServiceName()).
- getCluster(event.getClusterId());
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ event.getClusterId()));
+ return;
+ }
- if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, event.getInstanceId())) {
- log.error("Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + " to " +
- ClusterStatus.Terminating);
- }
- ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
+ ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster] " +
+ event.getClusterId() + " [instance-id] " +
+ event.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Terminated;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Terminated adding status started for and removing the cluster instance"
+ + cluster.getClusterId());
+ cluster.removeInstanceContext(event.getInstanceId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(
+ event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId());
+
+ TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
+ } else {
+ log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s [status-requested] %s",
+ event.getClusterId(), event.getInstanceId(),
+ context.getStatus(), status));
+ return;
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- ClusterStatus status = ClusterStatus.Terminating;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Terminating started for " + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(event.getAppId(),
- event.getServiceName(), event.getClusterId(), event.getInstanceId());
- TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
- // Remove kubernetes services if available
- ClusterContext clusterContext =
- CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
- if(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
- KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId());
+ }
+
+ public static void handleClusterTerminatingEvent (ClusterStatusClusterTerminatingEvent event){
+
+ TopologyManager.acquireWriteLock();
+
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Cluster cluster = topology.getService(event.getServiceName()).
+ getCluster(event.getClusterId());
+
+ if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, event.getInstanceId())) {
+ log.error("Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + " to " +
+ ClusterStatus.Terminating);
}
- } else {
- log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
+ ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster] " +
+ event.getClusterId() + " [instance-id] " +
+ event.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Terminating;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Terminating started for " + cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(
+ event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId());
+
+ TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
+
+ // Remove kubernetes services if available
+ ClusterContext clusterContext =
+ CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
+ if (StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
+ KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId());
+ }
+ } else {
+ log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s [status-requested] %s",
+ event.getClusterId(), event.getInstanceId(),
+ context.getStatus(), status));
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- } finally {
- TopologyManager.releaseWriteLock();
}
}
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 316984a..96bb38c 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -460,13 +460,13 @@ public class CloudControllerServiceImpl implements CloudControllerService {
clusterContext.setVolumes(volumes);
}
- // Handle member created event
- TopologyBuilder.handleMemberCreatedEvent(memberContext);
-
// Persist member context
CloudControllerContext.getInstance().addMemberContext(memberContext);
CloudControllerContext.getInstance().persist();
+ // Handle member created event
+ TopologyBuilder.handleMemberCreatedEvent(memberContext);
+
// Start instance in a new thread
if (log.isDebugEnabled()) {
log.debug(String.format("Starting instance creator thread: [cluster] %s [cluster-instance] %s " +