You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/11/28 16:33:33 UTC
[02/15] stratos git commit: Refactoring cloud controller component to
simplify the package structure
http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
deleted file mode 100644
index d72c552..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ /dev/null
@@ -1,962 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.topology;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-import org.apache.stratos.cloud.controller.exception.InvalidCartridgeTypeException;
-import org.apache.stratos.cloud.controller.exception.InvalidMemberException;
-import org.apache.stratos.cloud.controller.pojo.Cartridge;
-import org.apache.stratos.cloud.controller.pojo.*;
-import org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisher;
-import org.apache.stratos.cloud.controller.registry.RegistryManager;
-import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
-import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.instance.ClusterInstance;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent;
-import org.apache.stratos.messaging.event.cluster.status.*;
-import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
-import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
-import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.metadata.client.defaults.DefaultMetaDataServiceClient;
-import org.apache.stratos.metadata.client.defaults.MetaDataServiceClient;
-import org.wso2.carbon.registry.core.exceptions.RegistryException;
-
-import java.util.*;
-
-/**
- * this is to manipulate the received events by cloud controller
- * and build the complete topology with the events received
- */
-public class TopologyBuilder {
- private static final Log log = LogFactory.getLog(TopologyBuilder.class);
-
-
- public static void handleServiceCreated(List<Cartridge> cartridgeList) {
- Service service;
- Topology topology = TopologyManager.getTopology();
- if (cartridgeList == null) {
- log.warn(String.format("Cartridge list is empty"));
- return;
- }
-
- try {
-
- TopologyManager.acquireWriteLock();
- for (Cartridge cartridge : cartridgeList) {
- if (!topology.serviceExists(cartridge.getType())) {
- service = new Service(cartridge.getType(), cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant);
- List<PortMapping> portMappings = cartridge.getPortMappings();
- Properties properties = new Properties();
- for (Map.Entry<String, String> entry : cartridge.getProperties().entrySet()) {
- properties.setProperty(entry.getKey(), entry.getValue());
- }
- service.setProperties(properties);
- Port port;
- //adding ports to the event
- for (PortMapping portMapping : portMappings) {
- port = new Port(portMapping.getProtocol(),
- Integer.parseInt(portMapping.getPort()),
- Integer.parseInt(portMapping.getProxyPort()));
- service.addPort(port);
- }
- topology.addService(service);
- TopologyManager.updateTopology(topology);
- }
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendServiceCreateEvent(cartridgeList);
-
- }
-
- public static void handleServiceRemoved(List<Cartridge> cartridgeList) {
- Topology topology = TopologyManager.getTopology();
-
- for (Cartridge cartridge : cartridgeList) {
- if (topology.getService(cartridge.getType()).getClusters().size() == 0) {
- if (topology.serviceExists(cartridge.getType())) {
- try {
- TopologyManager.acquireWriteLock();
- topology.removeService(cartridge.getType());
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList);
- } else {
- log.warn(String.format("Service %s does not exist..", cartridge.getType()));
- }
- } else {
- log.warn("Subscription already exists. Hence not removing the service:" + cartridge.getType()
- + " from the topology");
- }
- }
- }
-
- public static void handleClusterCreated(ClusterStatusClusterCreatedEvent event) {
- TopologyManager.acquireWriteLock();
- Cluster cluster;
-
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- log.error("Service " + event.getServiceName() +
- " not found in Topology, unable to update the cluster status to Created");
- return;
- }
-
- if (service.clusterExists(event.getClusterId())) {
- log.warn("Cluster " + event.getClusterId() + " is already in the Topology ");
- return;
- } else {
- cluster = new Cluster(event.getServiceName(),
- event.getClusterId(), event.getDeploymentPolicyName(),
- event.getAutosScalePolicyName(), event.getAppId());
- //cluster.setStatus(Status.Created);
- cluster.setHostNames(event.getHostNames());
- cluster.setTenantRange(event.getTenantRange());
- service.addCluster(cluster);
- TopologyManager.updateTopology(topology);
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
- TopologyEventPublisher.sendClusterCreatedEvent(cluster);
- }
-
- public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters) {
-
- TopologyManager.acquireWriteLock();
-
- try {
- Topology topology = TopologyManager.getTopology();
-
- for (Cluster cluster : appClusters) {
- Service service = topology.getService(cluster.getServiceName());
- if (service == null) {
- log.error("Service " + cluster.getServiceName()
- + " not found in Topology, unable to create Application cluster");
- } else {
- service.addCluster(cluster);
- log.info("Application Cluster " + cluster.getClusterId() + " created in CC topology");
- }
- }
-
- TopologyManager.updateTopology(topology);
-
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
- TopologyEventPublisher.sendApplicationClustersCreated(appId, appClusters);
-
- }
-
- public static void handleApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusterData) {
- TopologyManager.acquireWriteLock();
-
- List<Cluster> removedClusters = new ArrayList<Cluster>();
- FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder.getInstance();
- try {
- Topology topology = TopologyManager.getTopology();
-
- if (clusterData != null) {
- // remove clusters from CC topology model and remove runtime information
- for (ClusterDataHolder aClusterData : clusterData) {
- Service aService = topology.getService(aClusterData.getServiceType());
- if (aService != null) {
- removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
- } else {
- log.warn("Service " + aClusterData.getServiceType() + " not found, unable to remove Cluster " + aClusterData.getClusterId());
- }
- // remove runtime data
- dataHolder.removeClusterContext(aClusterData.getClusterId());
-
- log.info("Removed application [ " + appId + " ]'s Cluster [ " + aClusterData.getClusterId() + " ] from the topology");
- }
- // persist runtime data changes
- persist(dataHolder);
- } else {
- log.info("No cluster data found for application " + appId + " to remove");
- }
-
- TopologyManager.updateTopology(topology);
-
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
- TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData);
-
- }
-
- /**
- * Persist data in registry.
- */
- private static void persist(FasterLookUpDataHolder dataHolder) {
- try {
- RegistryManager.getInstance().persist(
- dataHolder);
- } catch (RegistryException e) {
-
- String msg = "Failed to persist the Cloud Controller data in registry. Further, transaction roll back also failed.";
- log.fatal(msg);
- throw new CloudControllerException(msg, e);
- }
- }
-
- public static void handleClusterReset(ClusterStatusClusterResetEvent event) {
-
- TopologyManager.acquireWriteLock();
-
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- log.error("Service " + event.getServiceName() +
- " not found in Topology, unable to update the cluster status to Created");
- return;
- }
-
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " +
- "status to Created");
- return;
- }
-
- ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Created;
- if(context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Created adding status started for" + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(),
- event.getClusterId(), event.getInstanceId());
- } else {
- log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- }
-
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
-
- }
-
- public static void handleClusterInstanceCreated(String serviceType, String clusterId, String alias, String instanceId) {
-
- TopologyManager.acquireWriteLock();
-
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(serviceType);
- if (service == null) {
- log.error("Service " + serviceType +
- " not found in Topology, unable to update the cluster status to Created");
- return;
- }
-
- Cluster cluster = service.getCluster(clusterId);
- if (cluster == null) {
- log.error("Cluster " + clusterId + " not found in Topology, unable to update " +
- "status to Created");
- return;
- }
-
- if(cluster.getInstanceContexts(instanceId) != null) {
- log.warn("The Instance context for the cluster already exists for [cluster] " +
- clusterId + " [instance-id] " + instanceId);
- return;
- }
-
- //context.setStatus(ClusterStatus.Created);
- cluster.addInstanceContext(instanceId, new ClusterInstance(alias, clusterId, instanceId));
- TopologyManager.updateTopology(topology);
-
- ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
- new ClusterInstanceCreatedEvent(alias, serviceType, clusterId, instanceId);
- TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
-
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
-
-
- public static void handleClusterCreated(Registrant registrant, boolean isLb) {
- /*Topology topology = TopologyManager.getTopology();
- Service service;
- try {
- TopologyManager.acquireWriteLock();
- String cartridgeType = registrant.getCartridgeType();
- service = topology.getService(cartridgeType);
- Properties props = CloudControllerUtil.toJavaUtilProperties(registrant.getProperties());
-
- Cluster cluster;
- String clusterId = registrant.getClusterId();
- if (service.clusterExists(clusterId)) {
- // update the cluster
- cluster = service.getCluster(clusterId);
- cluster.addHostName(registrant.getHostName());
- if (service.getServiceType() == ServiceType.MultiTenant) {
- cluster.setTenantRange(registrant.getTenantRange());
- }
- if (service.getProperties().getProperty(Constants.IS_PRIMARY) != null) {
- props.setProperty(Constants.IS_PRIMARY, service.getProperties().getProperty(Constants.IS_PRIMARY));
- }
- cluster.setProperties(props);
- cluster.setLbCluster(isLb);
- } else {
- cluster = new Cluster(cartridgeType, clusterId,
- registrant.getDeploymentPolicyName(), registrant.getAutoScalerPolicyName(), null);
- cluster.addHostName(registrant.getHostName());
- if (service.getServiceType() == ServiceType.MultiTenant) {
- cluster.setTenantRange(registrant.getTenantRange());
- }
- if (service.getProperties().getProperty(Constants.IS_PRIMARY) != null) {
- props.setProperty(Constants.IS_PRIMARY, service.getProperties().getProperty(Constants.IS_PRIMARY));
- }
- cluster.setProperties(props);
- cluster.setLbCluster(isLb);
- //cluster.setStatus(Status.Created);
- service.addCluster(cluster);
- }
- TopologyManager.updateTopology(topology);
- TopologyEventPublisher.sendClusterCreatedEvent(cartridgeType, clusterId, cluster);
-
- } finally {
- TopologyManager.releaseWriteLock();
- }*/
- }
-
-
- private static void setKubernetesCluster(Cluster cluster) {
- boolean isKubernetesCluster = (cluster.getProperties().getProperty(StratosConstants.KUBERNETES_CLUSTER_ID) != null);
- if (log.isDebugEnabled()) {
- log.debug(" Kubernetes Cluster ["+ isKubernetesCluster + "] ");
- }
- cluster.setKubernetesCluster(isKubernetesCluster);
- }
-
- public static void handleClusterRemoved(ClusterContext ctxt) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(ctxt.getCartridgeType());
- String deploymentPolicy;
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- ctxt.getCartridgeType()));
- return;
- }
-
- if (!service.clusterExists(ctxt.getClusterId())) {
- log.warn(String.format("Cluster %s does not exist for service %s",
- ctxt.getClusterId(),
- ctxt.getCartridgeType()));
- return;
- }
-
- try {
- TopologyManager.acquireWriteLock();
- Cluster cluster = service.removeCluster(ctxt.getClusterId());
- deploymentPolicy = cluster.getDeploymentPolicyName();
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
- }
-
- public static void handleMemberSpawned(String serviceName,
- String clusterId, String partitionId,
- String privateIp, String publicIp, MemberContext context) {
- // adding the new member to the cluster after it is successfully started
- // in IaaS.
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(serviceName);
- Cluster cluster = service.getCluster(clusterId);
- String memberId = context.getMemberId();
- String networkPartitionId = context.getNetworkPartitionId();
- String lbClusterId = context.getLbClusterId();
- long initTime = context.getInitTime();
-
- if (cluster.memberExists(memberId)) {
- log.warn(String.format("Member %s already exists", memberId));
- return;
- }
-
- try {
- TopologyManager.acquireWriteLock();
- Member member = new Member(serviceName, clusterId,
- networkPartitionId, partitionId, memberId, initTime);
- member.setStatus(MemberStatus.Created);
- member.setInstanceId(context.getInstanceId());
- member.setMemberIp(privateIp);
- member.setLbClusterId(lbClusterId);
- member.setMemberPublicIp(publicIp);
- member.setProperties(CloudControllerUtil.toJavaUtilProperties(context.getProperties()));
- try {
-
- Cartridge cartridge = FasterLookUpDataHolder.getInstance().getCartridge(serviceName);
- List<PortMapping> portMappings = cartridge.getPortMappings();
- Port port;
- if(cluster.isKubernetesCluster()){
- // Update port mappings with generated service proxy port
- // TODO: Need to properly fix with the latest Kubernetes version
- String serviceHostPortStr = CloudControllerUtil.getProperty(context.getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
- if(StringUtils.isEmpty(serviceHostPortStr)) {
- log.warn("Kubernetes service host port not found for member: [member-id] " + memberId);
- }
- // Adding ports to the member
- if (StringUtils.isNotEmpty(serviceHostPortStr)) {
- for (PortMapping portMapping : portMappings) {
- port = new Port(portMapping.getProtocol(),
- Integer.parseInt(serviceHostPortStr),
- Integer.parseInt(portMapping.getProxyPort()));
- member.addPort(port);
- }
- }
-
- } else {
-
- // Adding ports to the member
- for (PortMapping portMapping : portMappings) {
-
- port = new Port(portMapping.getProtocol(),
- Integer.parseInt(portMapping.getPort()),
- Integer.parseInt(portMapping.getProxyPort()));
- member.addPort(port);
-
- }
- }
-
- } catch (Exception e) {
- log.error("Could not update member port-map: [member-id] " + memberId, e);
- }
- cluster.addMember(member);
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
- TopologyEventPublisher.sendInstanceSpawnedEvent(serviceName, clusterId,
- networkPartitionId, partitionId, memberId, lbClusterId,
- publicIp, privateIp, context);
- }
-
- public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceStartedEvent.getServiceName());
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceStartedEvent.getServiceName()));
- return;
- }
- if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
- log.warn(String.format("Cluster %s does not exist in service %s",
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getServiceName()));
- return;
- }
-
- Member member = service.getCluster(instanceStartedEvent.getClusterId()).
- getMember(instanceStartedEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceStartedEvent.getMemberId()));
- return;
- }
-
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Starting)) {
- log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.Starting);
- }
- member.setStatus(MemberStatus.Starting);
- log.info("member started event adding status started");
-
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- //memberStartedEvent.
- TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
- //publishing data
- CartridgeInstanceDataPublisher.publish(instanceStartedEvent.getMemberId(),
- instanceStartedEvent.getPartitionId(),
- instanceStartedEvent.getNetworkPartitionId(),
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getServiceName(),
- MemberStatus.Starting.toString(),
- null);
- }
-
- public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceActivatedEvent.getServiceName());
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceActivatedEvent.getServiceName()));
- return;
- }
-
- Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceActivatedEvent.getClusterId()));
- return;
- }
-
- Member member = cluster.getMember(instanceActivatedEvent.getMemberId());
-
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceActivatedEvent.getMemberId()));
- return;
- }
-
- MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent(
- instanceActivatedEvent.getServiceName(),
- instanceActivatedEvent.getClusterId(),
- instanceActivatedEvent.getNetworkPartitionId(),
- instanceActivatedEvent.getPartitionId(),
- instanceActivatedEvent.getMemberId(),
- instanceActivatedEvent.getInstanceId());
-
- // grouping - set grouid
- //TODO
- memberActivatedEvent.setApplicationId(null);
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Activated)) {
- log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.Activated);
- }
- member.setStatus(MemberStatus.Activated);
- log.info("member started event adding status activated");
- Cartridge cartridge = FasterLookUpDataHolder.getInstance().
- getCartridge(instanceActivatedEvent.getServiceName());
-
- List<PortMapping> portMappings = cartridge.getPortMappings();
- Port port;
- //adding ports to the event
- for (PortMapping portMapping : portMappings) {
- port = new Port(portMapping.getProtocol(),
- Integer.parseInt(portMapping.getPort()),
- Integer.parseInt(portMapping.getProxyPort()));
- member.addPort(port);
- memberActivatedEvent.addPort(port);
- }
-
- memberActivatedEvent.setMemberIp(member.getMemberIp());
- memberActivatedEvent.setMemberPublicIp(member.getMemberPublicIp());
- TopologyManager.updateTopology(topology);
-
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
- //publishing data
- CartridgeInstanceDataPublisher.publish(memberActivatedEvent.getMemberId(),
- memberActivatedEvent.getPartitionId(),
- memberActivatedEvent.getNetworkPartitionId(),
- memberActivatedEvent.getClusterId(),
- memberActivatedEvent.getServiceName(),
- MemberStatus.Activated.toString(),
- null);
- }
-
- public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent)
- throws InvalidMemberException, InvalidCartridgeTypeException {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName());
- //update the status of the member
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceReadyToShutdownEvent.getServiceName()));
- return;
- }
-
- Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceReadyToShutdownEvent.getClusterId()));
- return;
- }
-
-
- Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceReadyToShutdownEvent.getMemberId()));
- return;
- }
- MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent(
- instanceReadyToShutdownEvent.getServiceName(),
- instanceReadyToShutdownEvent.getClusterId(),
- instanceReadyToShutdownEvent.getNetworkPartitionId(),
- instanceReadyToShutdownEvent.getPartitionId(),
- instanceReadyToShutdownEvent.getMemberId(),
- instanceReadyToShutdownEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
-
- if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
- log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.ReadyToShutDown);
- }
- member.setStatus(MemberStatus.ReadyToShutDown);
- log.info("Member Ready to shut down event adding status started");
-
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
- //publishing data
- CartridgeInstanceDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
- instanceReadyToShutdownEvent.getPartitionId(),
- instanceReadyToShutdownEvent.getNetworkPartitionId(),
- instanceReadyToShutdownEvent.getClusterId(),
- instanceReadyToShutdownEvent.getServiceName(),
- MemberStatus.ReadyToShutDown.toString(),
- null);
- //termination of particular instance will be handled by autoscaler
- }
-
- public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent)
- throws InvalidMemberException, InvalidCartridgeTypeException {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName());
- //update the status of the member
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceMaintenanceModeEvent.getServiceName()));
- return;
- }
-
- Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceMaintenanceModeEvent.getClusterId()));
- return;
- }
-
- Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceMaintenanceModeEvent.getMemberId()));
- return;
- }
-
-
- MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent(
- instanceMaintenanceModeEvent.getServiceName(),
- instanceMaintenanceModeEvent.getClusterId(),
- instanceMaintenanceModeEvent.getNetworkPartitionId(),
- instanceMaintenanceModeEvent.getPartitionId(),
- instanceMaintenanceModeEvent.getMemberId(),
- instanceMaintenanceModeEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
- log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.In_Maintenance);
- }
- member.setStatus(MemberStatus.In_Maintenance);
- log.info("member maintenance mode event adding status started");
-
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- //publishing data
- TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
-
- }
-
- public static void handleMemberTerminated(String serviceName, String clusterId,
- String networkPartitionId, String partitionId,
- String memberId) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(serviceName);
- Properties properties;
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- serviceName));
- return;
- }
- Cluster cluster = service.getCluster(clusterId);
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterId));
- return;
- }
-
- Member member = cluster.getMember(memberId);
- String instanceId = member.getInstanceId();
-
- if (member == null) {
- log.warn(String.format("Member with member id %s does not exist",
- memberId));
- return;
- }
-
- try {
- TopologyManager.acquireWriteLock();
- properties = member.getProperties();
- cluster.removeMember(member);
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- /* @TODO leftover from grouping_poc*/
- String groupAlias = null;
- TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, networkPartitionId,
- partitionId, memberId, properties, groupAlias, instanceId);
- }
-
- public static void handleMemberSuspended() {
- //TODO
- try {
- TopologyManager.acquireWriteLock();
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
- public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterActivatedEvent) {
-
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(clusterActivatedEvent.getServiceName());
- //update the status of the cluster
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- clusterActivatedEvent.getServiceName()));
- return;
- }
-
- Cluster cluster = service.getCluster(clusterActivatedEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterActivatedEvent.getClusterId()));
- return;
- }
-
- org.apache.stratos.messaging.event.topology.ClusterActivatedEvent clusterActivatedEvent1 =
- new org.apache.stratos.messaging.event.topology.ClusterActivatedEvent(
- clusterActivatedEvent.getAppId(),
- clusterActivatedEvent.getServiceName(),
- clusterActivatedEvent.getClusterId(),
- clusterActivatedEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- ClusterInstance context = cluster.getInstanceContexts(clusterActivatedEvent.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- clusterActivatedEvent.getClusterId() + " [instance-id] " +
- clusterActivatedEvent.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Active;
- if(context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster activated adding status started for" + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- TopologyEventPublisher.sendClusterActivatedEvent(clusterActivatedEvent1);
- } else {
- log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- clusterActivatedEvent.getClusterId(), clusterActivatedEvent.getInstanceId(),
- context.getStatus(), status));
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
- }
-
- public static void handleClusterInActivateEvent(
- ClusterStatusClusterInactivateEvent clusterInActivateEvent) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(clusterInActivateEvent.getServiceName());
- //update the status of the cluster
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- clusterInActivateEvent.getServiceName()));
- return;
- }
-
- Cluster cluster = service.getCluster(clusterInActivateEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterInActivateEvent.getClusterId()));
- return;
- }
-
- ClusterInactivateEvent clusterInActivatedEvent1 =
- new ClusterInactivateEvent(
- clusterInActivateEvent.getAppId(),
- clusterInActivateEvent.getServiceName(),
- clusterInActivateEvent.getClusterId(),
- clusterInActivateEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- ClusterInstance context = cluster.getInstanceContexts(clusterInActivateEvent.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- clusterInActivateEvent.getClusterId() + " [instance-id] " +
- clusterInActivateEvent.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Inactive;
- if(context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster InActive adding status started for" + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- TopologyEventPublisher.sendClusterInactivateEvent(clusterInActivatedEvent1);
- } else {
- log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- clusterInActivateEvent.getClusterId(), clusterInActivateEvent.getInstanceId(),
- context.getStatus(), status));
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
-
- private static void deleteAppResourcesFromMetadataService(ApplicationTerminatedEvent event) {
- try {
- MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient();
- metadataClient.deleteApplicationProperties(event.getAppId());
- } catch (Exception e) {
- log.error("Error occurred while deleting the application resources frm metadata service ", e);
- }
- }
-
- public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) {
-
- TopologyManager.acquireWriteLock();
-
- try {
- Topology topology = TopologyManager.getTopology();
- Cluster cluster = topology.getService(event.getServiceName()).
- getCluster(event.getClusterId());
-
- if (!cluster.isStateTransitionValid(ClusterStatus.Terminated, null)) {
- log.error("Invalid state transfer from " + cluster.getStatus(null) + " to " +
- ClusterStatus.Terminated);
- }
- ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Terminated;
- if(context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Terminated adding status started for" + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- ClusterTerminatedEvent clusterTerminatedEvent = new ClusterTerminatedEvent(event.getAppId(),
- event.getServiceName(), event.getClusterId(), event.getInstanceId());
-
- TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
- } else {
- log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
-
- }
-
- public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) {
-
- TopologyManager.acquireWriteLock();
-
- try {
- Topology topology = TopologyManager.getTopology();
- Cluster cluster = topology.getService(event.getServiceName()).
- getCluster(event.getClusterId());
-
- if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, null)) {
- log.error("Invalid state transfer from " + cluster.getStatus(null) + " to " +
- ClusterStatus.Terminating);
- }
- ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Terminating;
- if(context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Terminating adding status started for" + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- ClusterTerminatingEvent clusterTerminaingEvent = new ClusterTerminatingEvent(event.getAppId(),
- event.getServiceName(), event.getClusterId(), null);
-
- TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
- } else {
- log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
deleted file mode 100644
index 6d85c77..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.stratos.cloud.controller.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.pojo.Cartridge;
-import org.apache.stratos.cloud.controller.pojo.ClusterContext;
-import org.apache.stratos.cloud.controller.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.pojo.PortMapping;
-import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
-import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Port;
-import org.apache.stratos.messaging.domain.topology.ServiceType;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.applications.*;
-import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
-import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.messaging.util.Util;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * this is to send the relevant events from cloud controller to topology topic
- */
-public class TopologyEventPublisher {
- private static final Log log = LogFactory.getLog(TopologyEventPublisher.class);
-
- public static void sendServiceCreateEvent(List<Cartridge> cartridgeList) {
- ServiceCreatedEvent serviceCreatedEvent;
- for (Cartridge cartridge : cartridgeList) {
- serviceCreatedEvent = new ServiceCreatedEvent(cartridge.getType(),
- (cartridge.isMultiTenant() ? ServiceType.MultiTenant
- : ServiceType.SingleTenant));
-
- // Add ports to the event
- Port port;
- List<PortMapping> portMappings = cartridge.getPortMappings();
- for (PortMapping portMapping : portMappings) {
- port = new Port(portMapping.getProtocol(),
- Integer.parseInt(portMapping.getPort()),
- Integer.parseInt(portMapping.getProxyPort()));
- serviceCreatedEvent.addPort(port);
- }
-
- if (log.isInfoEnabled()) {
- log.info(String.format(
- "Publishing service created event: [service] %s",
- cartridge.getType()));
- }
- publishEvent(serviceCreatedEvent);
- }
- }
-
- public static void sendServiceRemovedEvent(List<Cartridge> cartridgeList) {
- ServiceRemovedEvent serviceRemovedEvent;
- for (Cartridge cartridge : cartridgeList) {
- serviceRemovedEvent = new ServiceRemovedEvent(cartridge.getType());
- if (log.isInfoEnabled()) {
- log.info(String.format(
- "Publishing service removed event: [service] %s",
- serviceRemovedEvent.getServiceName()));
- }
- publishEvent(serviceRemovedEvent);
- }
- }
-
- public static void sendClusterResetEvent(String appId, String serviceName, String clusterId,
- String instanceId) {
- ClusterResetEvent clusterResetEvent = new ClusterResetEvent(appId, serviceName,
- clusterId, instanceId);
-
- if (log.isInfoEnabled()) {
- log.info("Publishing cluster reset event: " + clusterId);
- }
- publishEvent(clusterResetEvent);
- }
-
- public static void sendClusterCreatedEvent(Cluster cluster) {
- ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent(cluster);
-
- if (log.isInfoEnabled()) {
- log.info("Publishing cluster created event: " + cluster.getClusterId());
- }
- publishEvent(clusterCreatedEvent);
- }
-
- public static void sendApplicationClustersCreated(String appId, List<Cluster> clusters) {
-
- if (log.isInfoEnabled()) {
- log.info("Publishing Application Clusters Created event for Application: " + appId);
- }
-
- publishEvent(new ApplicationClustersCreatedEvent(clusters, appId));
- }
-
- public static void sendApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusters) {
-
- if (log.isInfoEnabled()) {
- log.info("Publishing Application Clusters removed event for Application: " + appId);
- }
-
- publishEvent(new ApplicationClustersRemovedEvent(clusters, appId));
- }
-
- public static void sendClusterRemovedEvent(ClusterContext ctxt, String deploymentPolicy) {
- ClusterRemovedEvent clusterRemovedEvent = new ClusterRemovedEvent(
- ctxt.getCartridgeType(), ctxt.getClusterId(), deploymentPolicy, ctxt.isLbCluster());
- if (log.isInfoEnabled()) {
- log.info(String
- .format("Publishing cluster removed event: [service] %s [cluster] %s",
- ctxt.getCartridgeType(), ctxt.getClusterId()));
- }
- publishEvent(clusterRemovedEvent);
-
- }
-
- public static void sendInstanceSpawnedEvent(String serviceName,
- String clusterId, String networkPartitionId, String partitionId,
- String memberId, String lbClusterId, String publicIp,
- String privateIp, MemberContext context) {
-
- long initTime = context.getInitTime();
- InstanceSpawnedEvent instanceSpawnedEvent = new InstanceSpawnedEvent(
- serviceName, clusterId, networkPartitionId, partitionId,
- memberId, initTime, context.getInstanceId());
- instanceSpawnedEvent.setLbClusterId(lbClusterId);
- instanceSpawnedEvent.setMemberIp(privateIp);
- instanceSpawnedEvent.setMemberPublicIp(publicIp);
- instanceSpawnedEvent.setProperties(CloudControllerUtil
- .toJavaUtilProperties(context.getProperties()));
- log.info(String.format("Publishing instance spawned event: [service] %s [cluster] %s " +
- " [instance-id] %s [network-partition] %s [partition] %s " +
- "[member]%s [lb-cluster-id] %s",
- serviceName, clusterId, context.getInstanceId(), networkPartitionId, partitionId,
- memberId, lbClusterId));
- publishEvent(instanceSpawnedEvent);
- }
-
- public static void sendMemberStartedEvent(InstanceStartedEvent instanceStartedEvent) {
- MemberStartedEvent memberStartedEventTopology = new MemberStartedEvent(instanceStartedEvent.getServiceName(),
- instanceStartedEvent.getClusterId(), instanceStartedEvent.getNetworkPartitionId(),
- instanceStartedEvent.getPartitionId(), instanceStartedEvent.getMemberId(),
- instanceStartedEvent.getInstanceId());
- if (log.isInfoEnabled()) {
- log.info(String
- .format("Publishing member started event: [service] %s [cluster] %s [instance-id] %s " +
- "[network-partition] %s [partition] %s [member] %s",
- instanceStartedEvent.getServiceName(),
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getInstanceId(),
- instanceStartedEvent.getNetworkPartitionId(),
- instanceStartedEvent.getPartitionId(),
- instanceStartedEvent.getMemberId()));
- }
- publishEvent(memberStartedEventTopology);
- }
-
- public static void sendMemberActivatedEvent(
- MemberActivatedEvent memberActivatedEvent) {
- if (log.isInfoEnabled()) {
- log.info(String
- .format("Publishing member activated event: [service] %s [cluster] %s " +
- "[instance-id] %s [network-partition] %s [partition] %s [member] %s",
- memberActivatedEvent.getServiceName(),
- memberActivatedEvent.getClusterId(),
- memberActivatedEvent.getInstanceId(),
- memberActivatedEvent.getNetworkPartitionId(),
- memberActivatedEvent.getPartitionId(),
- memberActivatedEvent.getMemberId()));
- }
- publishEvent(memberActivatedEvent);
- }
-
- public static void sendMemberReadyToShutdownEvent(MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
- if (log.isInfoEnabled()) {
- log.info(String.format("Publishing member Ready to shut down event: [service] %s " +
- " [instance-id] %s [cluster] %s [network-partition] %s [partition] %s " +
- "[member] %s [groupId] %s",
- memberReadyToShutdownEvent.getServiceName(),
- memberReadyToShutdownEvent.getClusterId(),
- memberReadyToShutdownEvent.getInstanceId(),
- memberReadyToShutdownEvent.getNetworkPartitionId(),
- memberReadyToShutdownEvent.getPartitionId(),
- memberReadyToShutdownEvent.getMemberId(),
- memberReadyToShutdownEvent.getGroupId()));
- }
- // grouping
- memberReadyToShutdownEvent.setGroupId(memberReadyToShutdownEvent.getGroupId());
- publishEvent(memberReadyToShutdownEvent);
- }
-
- public static void sendMemberMaintenanceModeEvent(MemberMaintenanceModeEvent memberMaintenanceModeEvent) {
- if (log.isInfoEnabled()) {
- log.info(String.format("Publishing Maintenance mode event: [service] %s [cluster] %s " +
- " [instance-id] %s [network-partition] %s [partition] %s [member] %s " +
- "[groupId] %s", memberMaintenanceModeEvent.getServiceName(),
- memberMaintenanceModeEvent.getClusterId(),
- memberMaintenanceModeEvent.getInstanceId(),
- memberMaintenanceModeEvent.getNetworkPartitionId(),
- memberMaintenanceModeEvent.getPartitionId(),
- memberMaintenanceModeEvent.getMemberId(),
- memberMaintenanceModeEvent.getGroupId()));
- }
-
- publishEvent(memberMaintenanceModeEvent);
- }
-
- public static void sendClusterActivatedEvent(ClusterActivatedEvent clusterActivatedEvent) {
- if (log.isInfoEnabled()) {
- log.info(String.format("Publishing cluster activated event: [service] %s [cluster] %s " +
- " [instance-id] %s [appId] %s",
- clusterActivatedEvent.getServiceName(),
- clusterActivatedEvent.getClusterId(),
- clusterActivatedEvent.getInstanceId(),
- clusterActivatedEvent.getAppId()));
- }
- publishEvent(clusterActivatedEvent);
- }
-
- public static void sendClusterInactivateEvent(ClusterInactivateEvent clusterInactiveEvent) {
- if (log.isInfoEnabled()) {
- log.info(String.format("Publishing cluster in-active event: [service] %s [cluster] %s " +
- "[instance-id] %s [appId] %s",
- clusterInactiveEvent.getServiceName(), clusterInactiveEvent.getClusterId(),
- clusterInactiveEvent.getInstanceId(), clusterInactiveEvent.getAppId()));
- }
- publishEvent(clusterInactiveEvent);
- }
-
- public static void sendClusterInstanceCreatedEvent(ClusterInstanceCreatedEvent clusterInstanceCreatedEvent) {
- if (log.isInfoEnabled()) {
- log.info(String.format("Publishing cluster Instance Created event: [service] %s [cluster] %s " +
- "[instance-id] %s",
- clusterInstanceCreatedEvent.getServiceName(), clusterInstanceCreatedEvent.getClusterId(),
- clusterInstanceCreatedEvent.getInstanceId()));
- }
- publishEvent(clusterInstanceCreatedEvent);
- }
-
-
- public static void sendMemberTerminatedEvent(String serviceName, String clusterId, String networkPartitionId,
- String partitionId, String memberId,
- Properties properties, String groupId, String instanceId) {
- MemberTerminatedEvent memberTerminatedEvent = new MemberTerminatedEvent(serviceName, clusterId,
- networkPartitionId, partitionId, memberId, instanceId);
- memberTerminatedEvent.setProperties(properties);
- memberTerminatedEvent.setGroupId(groupId);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Publishing member terminated event: [service] %s [cluster] %s " +
- " [instance-id] %s [network-partition] %s [partition] %s [member] %s " +
- "[groupId] %s", serviceName, clusterId, instanceId, networkPartitionId,
- partitionId, memberId, groupId));
- }
-
- publishEvent(memberTerminatedEvent);
- }
-
- public static void sendCompleteTopologyEvent(Topology topology) {
- CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Publishing complete topology event"));
- }
- publishEvent(completeTopologyEvent);
- }
-
- public static void sendClusterTerminatingEvent(ClusterTerminatingEvent clusterTerminatingEvent) {
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Publishing Cluster terminating event: [appId] %s [cluster id] %s" +
- " [instance-id] %s ",
- clusterTerminatingEvent.getAppId(), clusterTerminatingEvent.getClusterId(),
- clusterTerminatingEvent.getInstanceId()));
- }
-
- publishEvent(clusterTerminatingEvent);
- }
-
- public static void sendClusterTerminatedEvent(ClusterTerminatedEvent clusterTerminatedEvent) {
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Publishing Cluster terminated event: [appId] %s [cluster id] %s" +
- " [instance-id] %s ",
- clusterTerminatedEvent.getAppId(), clusterTerminatedEvent.getClusterId(),
- clusterTerminatedEvent.getInstanceId()));
- }
-
- publishEvent(clusterTerminatedEvent);
- }
-
- public static void publishEvent(Event event) {
- String topic = Util.getMessageTopicName(event);
- EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
- eventPublisher.publish(event);
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java
deleted file mode 100644
index 9862b9a..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.topology;
-
-import com.google.gson.Gson;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
-import org.apache.stratos.messaging.domain.topology.Topology;
-
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Persistence and retrieval of Topology from Registry
- */
-public class TopologyManager {
- private static final Log log = LogFactory.getLog(TopologyManager.class);
-
- private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
- private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
- private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
- private static volatile Topology topology;
-
- private TopologyManager() {
- }
-
- public static void acquireReadLock() {
- if(log.isDebugEnabled()) {
- log.debug("Read lock acquired");
- }
- readLock.lock();
- }
-
- public static void releaseReadLock() {
- if(log.isDebugEnabled()) {
- log.debug("Read lock released");
- }
- readLock.unlock();
- }
-
- public static void acquireWriteLock() {
- if(log.isDebugEnabled()) {
- log.debug("Write lock acquired");
- }
- writeLock.lock();
- }
-
- public static void releaseWriteLock() {
- if(log.isDebugEnabled()) {
- log.debug("Write lock released");
- }
- writeLock.unlock();
- }
-
- public static Topology getTopology() {
- if (topology == null) {
- synchronized (TopologyManager.class) {
- if (topology == null) {
- if (log.isDebugEnabled()) {
- log.debug("Trying to retrieve topology from registry");
- }
- topology = CloudControllerUtil.retrieveTopology();
- if (topology == null) {
- if (log.isDebugEnabled()) {
- log.debug("Topology not found in registry, creating new");
- }
- topology = new Topology();
- }
- if (log.isDebugEnabled()) {
- log.debug("Topology initialized");
- }
- }
- }
- }
- return topology;
- }
-
- /**
- * Update in-memory topology and persist it in registry.
- * @param topology_
- */
- public static void updateTopology(Topology topology_) {
- synchronized (TopologyManager.class) {
- if (log.isDebugEnabled()) {
- log.debug("Updating topology");
- }
- topology = topology_;
- CloudControllerUtil.persistTopology(topology);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Topology updated: %s", toJson(topology)));
- }
- }
-
- }
-
- private static String toJson(Object object) {
- Gson gson = new Gson();
- return gson.toJson(object);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java
deleted file mode 100644
index be3051e..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.topology;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
-import org.wso2.carbon.ntask.core.Task;
-
-public class TopologySynchronizerTask implements Task{
- private static final Log log = LogFactory.getLog(TopologySynchronizerTask.class);
-
- @Override
- public void execute() {
- if (log.isDebugEnabled()) {
- log.debug("Executing topology synchronization task");
- }
-
- if(FasterLookUpDataHolder.getInstance().isTopologySyncRunning() ||
- // this is a temporary fix to avoid task execution - limitation with ntask
- (!FasterLookUpDataHolder.getInstance().getEnableTopologySync())){
- if(log.isWarnEnabled()) {
- log.warn("Topology synchronization is disabled.");
- }
- return;
- }
-
- // publish to the topic
- if (TopologyManager.getTopology() != null) {
- TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology());
- }
- }
-
- @Override
- public void init() {
-
- // this is a temporary fix to avoid task execution - limitation with ntask
- if(!FasterLookUpDataHolder.getInstance().getEnableTopologySync()){
- if(log.isWarnEnabled()) {
- log.warn("Topology synchronization is disabled.");
- }
- return;
- }
- }
-
- @Override
- public void setProperties(Map<String, String> arg0) {}
-
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/AxiomXpathParserUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/AxiomXpathParserUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/AxiomXpathParserUtil.java
new file mode 100644
index 0000000..913289d
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/AxiomXpathParserUtil.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.util;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNode;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axiom.om.impl.dom.DOOMAbstractFactory;
+import org.apache.axiom.om.impl.dom.ElementImpl;
+import org.apache.axiom.om.xpath.AXIOMXPath;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.exception.MalformedConfigurationFileException;
+import org.jaxen.JaxenException;
+import org.w3c.dom.Element;
+import org.wso2.securevault.SecretResolver;
+import org.wso2.securevault.SecretResolverFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.Source;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import javax.xml.validation.Validator;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * This class is parsing configuration files using Axiom Xpath.
+ */
+public class AxiomXpathParserUtil {
+
+ private static final Log LOG = LogFactory.getLog(AxiomXpathParserUtil.class);
+
+ private AxiomXpathParserUtil(){}
+
+ public static OMElement parse(File xmlSource) throws MalformedConfigurationFileException,
+ IllegalArgumentException {
+
+ OMElement documentElement;
+
+ if (xmlSource == null) {
+ String msg = "File is null.";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+
+ try {
+ documentElement = new StAXOMBuilder(xmlSource.getPath()).getDocumentElement();
+ return documentElement;
+
+ } catch (XMLStreamException e) {
+ String msg = "Failed to parse the configuration file : " + xmlSource.getPath();
+ LOG.error(msg, e);
+ throw new MalformedConfigurationFileException(msg, e);
+ } catch (FileNotFoundException e) {
+ String msg = "Configuration file cannot be found : " + xmlSource.getPath();
+ LOG.error(msg);
+ throw new MalformedConfigurationFileException(msg);
+ }
+
+ }
+
+ private static Element getDOMElement(final OMElement omElement) {
+
+ // Get the StAX reader from the created element
+ XMLStreamReader llomReader = omElement.getXMLStreamReader();
+
+ // Create the DOOM OMFactory
+ OMFactory doomFactory = DOOMAbstractFactory.getOMFactory();
+
+ // Create the new builder
+ StAXOMBuilder doomBuilder = new StAXOMBuilder(doomFactory, llomReader);
+
+ // Get the document element
+ OMElement newElem = doomBuilder.getDocumentElement();
+
+ return newElem instanceof Element ? (Element) newElem : null;
+ }
+
+ private static OMElement getElement(final Object obj) {
+ OMNode node;
+ if ((obj instanceof OMNode) && (node = (OMNode) obj).getType() == OMNode.ELEMENT_NODE) {
+
+ OMElement element = (OMElement) node;
+
+ return element;
+
+ }
+
+ return null;
+ }
+
+ public static OMElement getElement(final String fileName, final OMElement rootElt,
+ final String eltStr, final String xpath) {
+ List<?> nodes = getMatchingNodes(xpath, rootElt);
+ neglectingWarn(fileName, eltStr, nodes.size());
+ OMElement element = getElement(nodes.get(0));
+ return element;
+ }
+
+ public static OMElement getFirstChildElement(final OMElement root, final String childName) {
+ Iterator<?> it = root.getChildrenWithName(new QName(childName));
+ if (it.hasNext()) {
+ return (OMElement) it.next();
+ }
+
+ return null;
+ }
+
+ private static void neglectingWarn(final String fileName, final String elt, final int size) {
+ if (size > 1) {
+ LOG.warn(fileName + " contains more than one " + elt + " elements!" +
+ " Elements other than the first will be neglected.");
+ }
+ }
+
+ public static void plainTextWarn(final String elt) {
+ LOG.warn("Unable to find a value for " + elt + " element from Secure Vault." +
+ "Hence we will try to assign the plain text value (if specified).");
+ }
+
+ /**
+ * @param xpath
+ * XPATH expression to be read.
+ * @param elt
+ * OMElement to be used for the search.
+ * @return List matching OMNode list
+ */
+ @SuppressWarnings("unchecked")
+ public static OMNode getFirstMatchingNode(final String xpath, final OMElement elt) throws MalformedConfigurationFileException{
+
+ AXIOMXPath axiomXpath;
+ List<OMNode> nodeList = null;
+ try {
+ axiomXpath = new AXIOMXPath(xpath);
+ nodeList = axiomXpath.selectNodes(elt);
+ return nodeList.isEmpty() ? null : nodeList.get(0);
+ } catch (JaxenException e) {
+ String msg = "Error occurred while reading the Xpath (" + xpath + ")";
+ LOG.error(msg, e);
+ throw new MalformedConfigurationFileException(msg, e);
+ }
+
+ }
+
+ /**
+ * @param xpath
+ * XPATH expression to be read.
+ * @return List matching list
+ */
+ @SuppressWarnings("unchecked")
+ public static List<OMNode> getMatchingNodes(OMElement elt, final String xpath) throws MalformedConfigurationFileException{
+
+ AXIOMXPath axiomXpath;
+ List<OMNode> nodeList = null;
+ try {
+ axiomXpath = new AXIOMXPath(xpath);
+ nodeList = axiomXpath.selectNodes(elt);
+ return nodeList;
+ } catch (JaxenException e) {
+ String msg = "Error occurred while reading the Xpath (" + xpath + ")";
+ LOG.error(msg, e);
+ throw new MalformedConfigurationFileException(msg, e);
+ }
+
+ }
+
+ /**
+ * @param xpath
+ * XPATH expression to be read.
+ * @param elt
+ * OMElement to be used for the search.
+ * @return List matching OMNode list
+ */
+ @SuppressWarnings("unchecked")
+ public static List<OMNode> getMatchingNodes(final String xpath, final OMElement elt) throws MalformedConfigurationFileException{
+
+ AXIOMXPath axiomXpath;
+ List<OMNode> nodeList = null;
+ try {
+ axiomXpath = new AXIOMXPath(xpath);
+ nodeList = axiomXpath.selectNodes(elt);
+ return nodeList;
+ } catch (JaxenException e) {
+ String msg = "Error occurred while reading the Xpath (" + xpath + ")";
+ LOG.error(msg, e);
+ throw new MalformedConfigurationFileException(msg, e);
+ }
+
+ }
+
+ public static void validate(final OMElement omElement, final File schemaFile) throws SAXException, IOException {
+
+ Element sourceElement;
+
+ // if the OMElement is created using DOM implementation use it
+ if (omElement instanceof ElementImpl) {
+ sourceElement = (Element) omElement;
+ } else { // else convert from llom to dom
+ sourceElement = getDOMElement(omElement);
+ }
+
+ // Create a SchemaFactory capable of understanding WXS schemas.
+
+ // Load a WXS schema, represented by a Schema instance.
+ SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ Source source = new StreamSource(schemaFile);
+
+ // Create a Validator object, which can be used to validate
+ // an instance document.
+ Schema schema = factory.newSchema(source);
+ Validator validator = schema.newValidator();
+
+ // Validate the DOM tree.
+ validator.validate(new DOMSource(sourceElement));
+ }
+
+ public static String resolveSecret(final OMElement docElt, final OMElement elt) {
+ // retrieve the value using secure vault
+ SecretResolver secretResolver = SecretResolverFactory.create(docElt, false);
+
+ String alias = elt.getAttributeValue(new QName(
+ CloudControllerConstants.ALIAS_NAMESPACE,
+ CloudControllerConstants.ALIAS_ATTRIBUTE,
+ CloudControllerConstants.ALIAS_ATTRIBUTE_PREFIX));
+
+ // retrieve the secured password
+ if (secretResolver != null && secretResolver.isInitialized() &&
+ secretResolver.isTokenProtected(alias)) {
+ return secretResolver.resolve(alias);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java
index ef37078..3808d13 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java
@@ -21,15 +21,14 @@ package org.apache.stratos.cloud.controller.util;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.deployment.partition.Partition;
+import org.apache.stratos.cloud.controller.domain.*;
+import org.apache.stratos.cloud.controller.domain.Partition;
import org.apache.stratos.cloud.controller.exception.CloudControllerException;
import org.apache.stratos.cloud.controller.exception.InvalidIaasProviderException;
-import org.apache.stratos.cloud.controller.interfaces.Iaas;
-import org.apache.stratos.cloud.controller.jcloud.ComputeServiceBuilderUtil;
-import org.apache.stratos.cloud.controller.persist.Deserializer;
-import org.apache.stratos.cloud.controller.pojo.*;
+import org.apache.stratos.cloud.controller.iaas.Iaas;
+import org.apache.stratos.cloud.controller.registry.Deserializer;
import org.apache.stratos.cloud.controller.registry.RegistryManager;
-import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
+import org.apache.stratos.cloud.controller.context.FasterLookUpDataHolder;
import org.apache.stratos.common.Property;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.wso2.carbon.registry.core.exceptions.RegistryException;