You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/20 22:56:43 UTC

[5/7] stratos git commit: Introducing KubernetesIaas class, MemberContext.clusterInstanceId, removing Cartridge.deployerType

http://git-wip-us.apache.org/repos/asf/stratos/blob/b575c7d6/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java
new file mode 100644
index 0000000..9dced60
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java
@@ -0,0 +1,323 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.stratos.cloud.controller.messaging.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.domain.Cartridge;
+import org.apache.stratos.cloud.controller.domain.ClusterContext;
+import org.apache.stratos.cloud.controller.domain.MemberContext;
+import org.apache.stratos.cloud.controller.domain.PortMapping;
+import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Port;
+import org.apache.stratos.messaging.domain.topology.ServiceType;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.util.Util;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * this is to send the relevant events from cloud controller to topology topic
+ */
+public class TopologyEventPublisher {
+    private static final Log log = LogFactory.getLog(TopologyEventPublisher.class);
+
+    public static void sendServiceCreateEvent(List<Cartridge> cartridgeList) {
+        ServiceCreatedEvent serviceCreatedEvent;
+        for (Cartridge cartridge : cartridgeList) {
+            serviceCreatedEvent = new ServiceCreatedEvent(cartridge.getType(),
+                    (cartridge.isMultiTenant() ? ServiceType.MultiTenant
+                            : ServiceType.SingleTenant));
+
+            // Add ports to the event
+            Port port;
+            List<PortMapping> portMappings = cartridge.getPortMappings();
+            for (PortMapping portMapping : portMappings) {
+                port = new Port(portMapping.getProtocol(),
+                        Integer.parseInt(portMapping.getPort()),
+                        Integer.parseInt(portMapping.getProxyPort()));
+                serviceCreatedEvent.addPort(port);
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format(
+                        "Publishing service created event: [service-name] %s",
+                        cartridge.getType()));
+            }
+            publishEvent(serviceCreatedEvent);
+        }
+    }
+
+    public static void sendServiceRemovedEvent(List<Cartridge> cartridgeList) {
+        ServiceRemovedEvent serviceRemovedEvent;
+        for (Cartridge cartridge : cartridgeList) {
+            serviceRemovedEvent = new ServiceRemovedEvent(cartridge.getType());
+            if (log.isInfoEnabled()) {
+                log.info(String.format(
+                        "Publishing service removed event: [service-name] %s",
+                        serviceRemovedEvent.getServiceName()));
+            }
+            publishEvent(serviceRemovedEvent);
+        }
+    }
+
+    public static void sendClusterResetEvent(String appId, String serviceName, String clusterId,
+                                             String instanceId) {
+        ClusterResetEvent clusterResetEvent = new ClusterResetEvent(appId, serviceName,
+                                                                    clusterId, instanceId);
+
+        if (log.isInfoEnabled()) {
+            log.info("Publishing cluster reset event: [cluster-id] " + clusterId);
+        }
+        publishEvent(clusterResetEvent);
+    }
+
+    public static void sendClusterCreatedEvent(Cluster cluster) {
+        ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent(cluster);
+
+        if (log.isInfoEnabled()) {
+            log.info("Publishing cluster created event: [cluster-id] " + cluster.getClusterId());
+        }
+        publishEvent(clusterCreatedEvent);
+    }
+
+    public static void sendApplicationClustersCreated(String appId, List<Cluster> clusters) {
+
+        if (log.isInfoEnabled()) {
+            log.info("Publishing application clusters created event: [application-id] " + appId);
+        }
+
+        publishEvent(new ApplicationClustersCreatedEvent(clusters, appId));
+    }
+
+    public static void sendApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusters) {
+
+        if (log.isInfoEnabled()) {
+            log.info("Publishing application clusters removed event: [application-id] " + appId);
+        }
+
+        publishEvent(new ApplicationClustersRemovedEvent(clusters, appId));
+    }
+
+    public static void sendClusterRemovedEvent(ClusterContext ctxt, String deploymentPolicy) {
+        ClusterRemovedEvent clusterRemovedEvent = new ClusterRemovedEvent(
+                ctxt.getCartridgeType(), ctxt.getClusterId(), deploymentPolicy, ctxt.isLbCluster());
+        if (log.isInfoEnabled()) {
+            log.info(String
+                    .format("Publishing cluster removed event: [service-name] %s [cluster-id] %s",
+                            ctxt.getCartridgeType(), ctxt.getClusterId()));
+        }
+        publishEvent(clusterRemovedEvent);
+
+    }
+
+    public static void sendInstanceSpawnedEvent(MemberContext memberContext) {
+
+        InstanceSpawnedEvent instanceSpawnedEvent = new InstanceSpawnedEvent(
+                memberContext.getCartridgeType(), memberContext.getClusterId(), memberContext.getClusterInstanceId(), memberContext.getMemberId(),
+                memberContext.getInstanceId(),
+                memberContext.getNetworkPartitionId(), memberContext.getPartition().getId(), memberContext.getInitTime());
+
+        instanceSpawnedEvent.setLbClusterId(memberContext.getLbClusterId());
+        instanceSpawnedEvent.setMemberIp(memberContext.getPrivateIpAddress());
+        instanceSpawnedEvent.setMemberPublicIp(memberContext.getPublicIpAddress());
+        instanceSpawnedEvent.setProperties(CloudControllerUtil
+                .toJavaUtilProperties(memberContext.getProperties()));
+
+        log.info(String.format("Publishing instance spawned event: [service-name] %s [cluster-id] %s " +
+                "[cluster-instance-id] %s [member-id] %s [instance-id] %s [network-partition-id] %s " +
+                        "[partition-id] %s [lb-cluster-id] %s",
+                memberContext.getCartridgeType(), memberContext.getClusterId(), memberContext.getClusterInstanceId(),
+                memberContext.getMemberId(), memberContext.getInstanceId(), memberContext.getNetworkPartitionId(),
+                memberContext.getPartition().getId(), memberContext.getLbClusterId()));
+        publishEvent(instanceSpawnedEvent);
+    }
+
+    public static void sendMemberStartedEvent(InstanceStartedEvent instanceStartedEvent) {
+        MemberStartedEvent memberStartedEventTopology = new MemberStartedEvent(instanceStartedEvent.getServiceName(),
+                instanceStartedEvent.getClusterId(), instanceStartedEvent.getClusterInstanceId(),
+                instanceStartedEvent.getMemberId(), instanceStartedEvent.getInstanceId(), instanceStartedEvent.getNetworkPartitionId(), instanceStartedEvent.getPartitionId()
+        );
+        if (log.isInfoEnabled()) {
+            log.info(String
+                    .format("Publishing member started event: [service-name] %s [cluster-id] %s [cluster-instance-id] %s " +
+                                    "[member-id] %s [instance-id] %s [network-partition-id] %s [partition-id] %s",
+                            instanceStartedEvent.getServiceName(),
+                            instanceStartedEvent.getClusterId(),
+                            instanceStartedEvent.getClusterInstanceId(),
+                            instanceStartedEvent.getMemberId(),
+                            instanceStartedEvent.getInstanceId(),
+                            instanceStartedEvent.getNetworkPartitionId(),
+                            instanceStartedEvent.getPartitionId()));
+        }
+        publishEvent(memberStartedEventTopology);
+    }
+
+    public static void sendMemberActivatedEvent(
+            MemberActivatedEvent memberActivatedEvent) {
+        if (log.isInfoEnabled()) {
+            log.info(String
+                    .format("Publishing member activated event: [service-name] %s [cluster-id] %s [cluster-instance-id] %s " +
+                                    "[member-id] %s [instance-id] %s [network-partition-id] %s [partition-id] %s",
+                            memberActivatedEvent.getServiceName(),
+                            memberActivatedEvent.getClusterId(),
+                            memberActivatedEvent.getClusterInstanceId(),
+                            memberActivatedEvent.getMemberId(),
+                            memberActivatedEvent.getInstanceId(),
+                            memberActivatedEvent.getNetworkPartitionId(),
+                            memberActivatedEvent.getPartitionId()));
+        }
+        publishEvent(memberActivatedEvent);
+    }
+
+    public static void sendMemberReadyToShutdownEvent(MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Publishing member ready to shut down event: [service-name] %s [cluster-id] %s " +
+                            "[cluster-instance-id] %s [member-id] %s [instance-id] %s [network-partition-id] %s " +
+                            "[partition-id] %s",
+                    memberReadyToShutdownEvent.getServiceName(),
+                    memberReadyToShutdownEvent.getClusterId(),
+                    memberReadyToShutdownEvent.getClusterInstanceId(),
+                    memberReadyToShutdownEvent.getMemberId(),
+                    memberReadyToShutdownEvent.getInstanceId(),
+                    memberReadyToShutdownEvent.getNetworkPartitionId(),
+                    memberReadyToShutdownEvent.getPartitionId()));
+        }
+        // grouping
+        memberReadyToShutdownEvent.setGroupId(memberReadyToShutdownEvent.getGroupId());
+        publishEvent(memberReadyToShutdownEvent);
+    }
+
+    public static void sendMemberMaintenanceModeEvent(MemberMaintenanceModeEvent memberMaintenanceModeEvent) {
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Publishing member in maintenance mode event: [service-name] %s [cluster-id] %s [cluster-instance-id] %s " +
+                            "[member-id] %s [instance-id] %s [network-partition-id] %s [partition-id] %s",
+                    memberMaintenanceModeEvent.getServiceName(),
+                    memberMaintenanceModeEvent.getClusterId(),
+                    memberMaintenanceModeEvent.getClusterInstanceId(),
+                    memberMaintenanceModeEvent.getMemberId(),
+                    memberMaintenanceModeEvent.getInstanceId(),
+                    memberMaintenanceModeEvent.getNetworkPartitionId(),
+                    memberMaintenanceModeEvent.getPartitionId()));
+        }
+
+        publishEvent(memberMaintenanceModeEvent);
+    }
+
+    public static void sendClusterActivatedEvent(ClusterInstanceActivatedEvent clusterActivatedEvent) {
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Publishing cluster activated event: [service-name] %s [cluster-id] %s " +
+                            " [instance-id] %s [application-id] %s",
+                    clusterActivatedEvent.getServiceName(),
+                    clusterActivatedEvent.getClusterId(),
+                    clusterActivatedEvent.getInstanceId(),
+                    clusterActivatedEvent.getAppId()));
+        }
+        publishEvent(clusterActivatedEvent);
+    }
+
+    public static void sendClusterInactivateEvent(ClusterInstanceInactivateEvent clusterInactiveEvent) {
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Publishing cluster inactive event: [service-name] %s [cluster-id] %s " +
+                            "[instance-id] %s [application-id] %s",
+                    clusterInactiveEvent.getServiceName(), clusterInactiveEvent.getClusterId(),
+                    clusterInactiveEvent.getInstanceId(), clusterInactiveEvent.getAppId()));
+        }
+        publishEvent(clusterInactiveEvent);
+    }
+
+    public static void sendClusterInstanceCreatedEvent(ClusterInstanceCreatedEvent clusterInstanceCreatedEvent) {
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Publishing cluster instance created event: [service-name] %s [cluster-id] %s " +
+                            " in [network-partition-id] %s [instance-id] %s",
+                    clusterInstanceCreatedEvent.getServiceName(), clusterInstanceCreatedEvent.getClusterId(),
+                    clusterInstanceCreatedEvent.getNetworkPartitionId(),
+                    clusterInstanceCreatedEvent.getClusterInstance().getInstanceId()));
+        }
+        publishEvent(clusterInstanceCreatedEvent);
+    }
+
+
+    public static void sendMemberTerminatedEvent(String serviceName, String clusterId, String memberId,
+                                                 String instanceId, String clusterInstanceId,
+                                                 String networkPartitionId, String partitionId, Properties properties,
+                                                 String groupId) {
+        MemberTerminatedEvent memberTerminatedEvent = new MemberTerminatedEvent(serviceName, clusterId,
+                memberId, instanceId, clusterInstanceId, networkPartitionId, partitionId);
+        memberTerminatedEvent.setProperties(properties);
+        memberTerminatedEvent.setGroupId(groupId);
+
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Publishing member terminated event: [service-name] %s [cluster-id] %s " +
+                            "[cluster-instance-id] %s [member-id] %s [instance-id] %s [network-partition-id] %s " +
+                            "[partition-id] %s [group-id] %s", serviceName, clusterId, clusterInstanceId, memberId,
+                    instanceId, networkPartitionId, partitionId, groupId));
+        }
+        publishEvent(memberTerminatedEvent);
+    }
+
+    public static void sendCompleteTopologyEvent(Topology topology) {
+        CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Publishing complete topology event"));
+        }
+        publishEvent(completeTopologyEvent);
+    }
+
+    public static void sendClusterTerminatingEvent(ClusterInstanceTerminatingEvent clusterTerminatingEvent) {
+
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Publishing Cluster terminating event: [application-id] %s [cluster id] %s" +
+                            " [instance-id] %s ",
+                    clusterTerminatingEvent.getAppId(), clusterTerminatingEvent.getClusterId(),
+                    clusterTerminatingEvent.getInstanceId()));
+        }
+
+        publishEvent(clusterTerminatingEvent);
+    }
+
+    public static void sendClusterTerminatedEvent(ClusterInstanceTerminatedEvent clusterTerminatedEvent) {
+
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Publishing Cluster terminated event: [application-id] %s [cluster id] %s" +
+                            " [instance-id] %s ",
+                    clusterTerminatedEvent.getAppId(), clusterTerminatedEvent.getClusterId(),
+                    clusterTerminatedEvent.getInstanceId()));
+        }
+
+        publishEvent(clusterTerminatedEvent);
+    }
+
+    public static void publishEvent(Event event) {
+        String topic = Util.getMessageTopicName(event);
+        EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
+        eventPublisher.publish(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b575c7d6/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java
new file mode 100644
index 0000000..9e226cb
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTask.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.messaging.publisher;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
+import org.apache.stratos.cloud.controller.context.CloudControllerContext;
+import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager;
+import org.wso2.carbon.ntask.core.Task;
+
+public class TopologySynchronizerTask implements Task{
+    private static final Log log = LogFactory.getLog(TopologySynchronizerTask.class);
+
+    @Override
+    public void execute() {
+        if (log.isDebugEnabled()) {
+            log.debug("Executing topology synchronization task");
+        }
+        
+        if(CloudControllerContext.getInstance().isTopologySyncRunning() ||
+        		// this is a temporary fix to avoid task execution - limitation with ntask
+                (!CloudControllerConfig.getInstance().isTopologySyncEnabled())){
+            if(log.isWarnEnabled()) {
+                log.warn("Topology synchronization is disabled.");
+            }
+            return;
+        }
+        
+    	// publish to the topic 
+        if (TopologyManager.getTopology() != null) {
+            TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology());
+        }
+    }
+    
+    @Override
+    public void init() {
+
+    	// this is a temporary fix to avoid task execution - limitation with ntask
+		if(!CloudControllerConfig.getInstance().isTopologySyncEnabled()){
+            if(log.isWarnEnabled()) {
+                log.warn("Topology synchronization is disabled.");
+            }
+			return;
+		}
+    }
+
+    @Override
+    public void setProperties(Map<String, String> arg0) {}
+    
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b575c7d6/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
index ccadf58..eacd368 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java
@@ -24,8 +24,6 @@ import java.util.HashMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
-import org.apache.stratos.cloud.controller.context.CloudControllerContext;
-import org.apache.stratos.cloud.controller.messaging.topology.TopologySynchronizerTask;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.wso2.carbon.ntask.common.TaskException;
 import org.wso2.carbon.ntask.core.TaskInfo;

http://git-wip-us.apache.org/repos/asf/stratos/blob/b575c7d6/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index 27bb2ba..26c3d4a 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -22,11 +22,12 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.context.CloudControllerContext;
-import org.apache.stratos.cloud.controller.domain.*;
 import org.apache.stratos.cloud.controller.domain.Cartridge;
+import org.apache.stratos.cloud.controller.domain.*;
 import org.apache.stratos.cloud.controller.exception.InvalidCartridgeTypeException;
 import org.apache.stratos.cloud.controller.exception.InvalidMemberException;
 import org.apache.stratos.cloud.controller.messaging.publisher.StatisticsDataPublisher;
+import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPublisher;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
@@ -392,18 +393,20 @@ public class TopologyBuilder {
         TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
     }
 
-	public static void handleMemberSpawned(String serviceName,
-			String clusterId, String partitionId,
-			String privateIp, String publicIp, MemberContext context) {
+	public static void handleMemberSpawned(MemberContext memberContext) {
 		// adding the new member to the cluster after it is successfully started
 		// in IaaS.
 		Topology topology = TopologyManager.getTopology();
-		Service service = topology.getService(serviceName);
-		Cluster cluster = service.getCluster(clusterId);
-		String memberId = context.getMemberId();
-		String networkPartitionId = context.getNetworkPartitionId();
-		String lbClusterId = context.getLbClusterId();
-		long initTime = context.getInitTime();
+		Service service = topology.getService(memberContext.getCartridgeType());
+        String clusterId = memberContext.getClusterId();
+        Cluster cluster = service.getCluster(clusterId);
+		String memberId = memberContext.getMemberId();
+        String instanceId = memberContext.getInstanceId();
+        String clusterInstanceId = memberContext.getClusterInstanceId();
+		String networkPartitionId = memberContext.getNetworkPartitionId();
+        String partitionId = memberContext.getPartition().getId();
+		String lbClusterId = memberContext.getLbClusterId();
+		long initTime = memberContext.getInitTime();
 
 		if (cluster.memberExists(memberId)) {
 			log.warn(String.format("Member %s already exists", memberId));
@@ -412,23 +415,22 @@ public class TopologyBuilder {
 
 		try {
 			TopologyManager.acquireWriteLock();
-			Member member = new Member(serviceName, clusterId,
-					networkPartitionId, partitionId, memberId, initTime);
+			Member member = new Member(service.getServiceName(), clusterId, memberId, instanceId, clusterInstanceId,
+					networkPartitionId, partitionId, initTime);
 			member.setStatus(MemberStatus.Created);
-            member.setInstanceId(context.getInstanceId());
-			member.setMemberIp(privateIp);
+			member.setMemberIp(memberContext.getPrivateIpAddress());
 			member.setLbClusterId(lbClusterId);
-			member.setMemberPublicIp(publicIp);
-			member.setProperties(CloudControllerUtil.toJavaUtilProperties(context.getProperties()));
+			member.setMemberPublicIp(memberContext.getPublicIpAddress());
+			member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
             try {
 
-                Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(serviceName);
+                Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceName());
                 List<PortMapping> portMappings = cartridge.getPortMappings();
                 Port port;
                 if(cluster.isKubernetesCluster()){
                     // Update port mappings with generated service proxy port
                     // TODO: Need to properly fix with the latest Kubernetes version
-                    String serviceHostPortStr = CloudControllerUtil.getProperty(context.getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
+                    String serviceHostPortStr = CloudControllerUtil.getProperty(memberContext.getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
                     if(StringUtils.isEmpty(serviceHostPortStr)) {
                         log.warn("Kubernetes service host port not found for member: [member-id] " + memberId);
                     }
@@ -464,9 +466,7 @@ public class TopologyBuilder {
 			TopologyManager.releaseWriteLock();
 		}
 		
-		TopologyEventPublisher.sendInstanceSpawnedEvent(serviceName, clusterId,
-				networkPartitionId, partitionId, memberId, lbClusterId,
-				publicIp, privateIp, context);
+		TopologyEventPublisher.sendInstanceSpawnedEvent(memberContext);
 	}
     
     public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) {
@@ -547,12 +547,12 @@ public class TopologyBuilder {
         }
 
         MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent(
-                                                        instanceActivatedEvent.getServiceName(),
-                                                        instanceActivatedEvent.getClusterId(),
-                                                        instanceActivatedEvent.getNetworkPartitionId(),
-                                                        instanceActivatedEvent.getPartitionId(),
-                                                        instanceActivatedEvent.getMemberId(),
-                                                        instanceActivatedEvent.getInstanceId());
+                instanceActivatedEvent.getServiceName(),
+                instanceActivatedEvent.getClusterId(),
+                instanceActivatedEvent.getClusterInstanceId(), instanceActivatedEvent.getMemberId(),
+                instanceActivatedEvent.getInstanceId(),
+                instanceActivatedEvent.getNetworkPartitionId(),
+                instanceActivatedEvent.getPartitionId());
 
         // grouping - set grouid
         //TODO
@@ -627,12 +627,12 @@ public class TopologyBuilder {
             return;
         }
         MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent(
-                                                                instanceReadyToShutdownEvent.getServiceName(),
-                                                                instanceReadyToShutdownEvent.getClusterId(),
-                                                                instanceReadyToShutdownEvent.getNetworkPartitionId(),
-                                                                instanceReadyToShutdownEvent.getPartitionId(),
-                                                                instanceReadyToShutdownEvent.getMemberId(),
-                                                                instanceReadyToShutdownEvent.getInstanceId());
+                instanceReadyToShutdownEvent.getServiceName(),
+                instanceReadyToShutdownEvent.getClusterId(),
+                instanceReadyToShutdownEvent.getClusterInstanceId(), instanceReadyToShutdownEvent.getMemberId(),
+                instanceReadyToShutdownEvent.getInstanceId(),
+                instanceReadyToShutdownEvent.getNetworkPartitionId(),
+                instanceReadyToShutdownEvent.getPartitionId());
         try {
             TopologyManager.acquireWriteLock();
 
@@ -687,12 +687,12 @@ public class TopologyBuilder {
 
 
         MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent(
-                                                                instanceMaintenanceModeEvent.getServiceName(),
-                                                                instanceMaintenanceModeEvent.getClusterId(),
-                                                                instanceMaintenanceModeEvent.getNetworkPartitionId(),
-                                                                instanceMaintenanceModeEvent.getPartitionId(),
-                                                                instanceMaintenanceModeEvent.getMemberId(),
-                                                                instanceMaintenanceModeEvent.getInstanceId());
+                instanceMaintenanceModeEvent.getServiceName(),
+                instanceMaintenanceModeEvent.getClusterId(),
+                instanceMaintenanceModeEvent.getClusterInstanceId(), instanceMaintenanceModeEvent.getMemberId(),
+                instanceMaintenanceModeEvent.getInstanceId(),
+                instanceMaintenanceModeEvent.getNetworkPartitionId(),
+                instanceMaintenanceModeEvent.getPartitionId());
         try {
             TopologyManager.acquireWriteLock();
             // try update lifecycle state
@@ -746,6 +746,7 @@ public class TopologyBuilder {
 			return;
 		}
         String instanceId = member.getInstanceId();
+        String clusterInstanceId = member.getClusterInstanceId();
 
         try {
             TopologyManager.acquireWriteLock();
@@ -757,8 +758,9 @@ public class TopologyBuilder {
         }
         /* @TODO leftover from grouping_poc*/
         String groupAlias = null;
-        TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, networkPartitionId,
-                partitionId, memberId, properties, groupAlias, instanceId);
+        TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId, instanceId,
+                clusterInstanceId, networkPartitionId,
+                partitionId, properties, groupAlias);
     }
 
     public static void handleMemberSuspended() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/b575c7d6/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyEventPublisher.java
deleted file mode 100644
index 2c9a911..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyEventPublisher.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.stratos.cloud.controller.messaging.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.domain.Cartridge;
-import org.apache.stratos.cloud.controller.domain.ClusterContext;
-import org.apache.stratos.cloud.controller.domain.MemberContext;
-import org.apache.stratos.cloud.controller.domain.PortMapping;
-import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
-import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Port;
-import org.apache.stratos.messaging.domain.topology.ServiceType;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
-import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.messaging.util.Util;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * this is to send the relevant events from cloud controller to topology topic
- */
-public class TopologyEventPublisher {
-    private static final Log log = LogFactory.getLog(TopologyEventPublisher.class);
-
-    public static void sendServiceCreateEvent(List<Cartridge> cartridgeList) {
-        ServiceCreatedEvent serviceCreatedEvent;
-        for (Cartridge cartridge : cartridgeList) {
-            serviceCreatedEvent = new ServiceCreatedEvent(cartridge.getType(),
-                    (cartridge.isMultiTenant() ? ServiceType.MultiTenant
-                            : ServiceType.SingleTenant));
-
-            // Add ports to the event
-            Port port;
-            List<PortMapping> portMappings = cartridge.getPortMappings();
-            for (PortMapping portMapping : portMappings) {
-                port = new Port(portMapping.getProtocol(),
-                        Integer.parseInt(portMapping.getPort()),
-                        Integer.parseInt(portMapping.getProxyPort()));
-                serviceCreatedEvent.addPort(port);
-            }
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format(
-                        "Publishing service created event: [service] %s",
-                        cartridge.getType()));
-            }
-            publishEvent(serviceCreatedEvent);
-        }
-    }
-
-    public static void sendServiceRemovedEvent(List<Cartridge> cartridgeList) {
-        ServiceRemovedEvent serviceRemovedEvent;
-        for (Cartridge cartridge : cartridgeList) {
-            serviceRemovedEvent = new ServiceRemovedEvent(cartridge.getType());
-            if (log.isInfoEnabled()) {
-                log.info(String.format(
-                        "Publishing service removed event: [service] %s",
-                        serviceRemovedEvent.getServiceName()));
-            }
-            publishEvent(serviceRemovedEvent);
-        }
-    }
-
-    public static void sendClusterResetEvent(String appId, String serviceName, String clusterId,
-                                             String instanceId) {
-        ClusterResetEvent clusterResetEvent = new ClusterResetEvent(appId, serviceName,
-                                                                    clusterId, instanceId);
-
-        if (log.isInfoEnabled()) {
-            log.info("Publishing cluster reset event: " + clusterId);
-        }
-        publishEvent(clusterResetEvent);
-    }
-
-    public static void sendClusterCreatedEvent(Cluster cluster) {
-        ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent(cluster);
-
-        if (log.isInfoEnabled()) {
-            log.info("Publishing cluster created event: " + cluster.getClusterId());
-        }
-        publishEvent(clusterCreatedEvent);
-    }
-
-    public static void sendApplicationClustersCreated(String appId, List<Cluster> clusters) {
-
-        if (log.isInfoEnabled()) {
-            log.info("Publishing Application Clusters Created event for Application: " + appId);
-        }
-
-        publishEvent(new ApplicationClustersCreatedEvent(clusters, appId));
-    }
-
-    public static void sendApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusters) {
-
-        if (log.isInfoEnabled()) {
-            log.info("Publishing Application Clusters removed event for Application: " + appId);
-        }
-
-        publishEvent(new ApplicationClustersRemovedEvent(clusters, appId));
-    }
-
-    public static void sendClusterRemovedEvent(ClusterContext ctxt, String deploymentPolicy) {
-        ClusterRemovedEvent clusterRemovedEvent = new ClusterRemovedEvent(
-                ctxt.getCartridgeType(), ctxt.getClusterId(), deploymentPolicy, ctxt.isLbCluster());
-        if (log.isInfoEnabled()) {
-            log.info(String
-                    .format("Publishing cluster removed event: [service] %s [cluster] %s",
-                            ctxt.getCartridgeType(), ctxt.getClusterId()));
-        }
-        publishEvent(clusterRemovedEvent);
-
-    }
-
-    public static void sendInstanceSpawnedEvent(String serviceName,
-                                                String clusterId, String networkPartitionId, String partitionId,
-                                                String memberId, String lbClusterId, String publicIp,
-                                                String privateIp, MemberContext context) {
-
-        long initTime = context.getInitTime();
-        InstanceSpawnedEvent instanceSpawnedEvent = new InstanceSpawnedEvent(
-                serviceName, clusterId, networkPartitionId, partitionId,
-                memberId, initTime, context.getInstanceId());
-        instanceSpawnedEvent.setLbClusterId(lbClusterId);
-        instanceSpawnedEvent.setMemberIp(privateIp);
-        instanceSpawnedEvent.setMemberPublicIp(publicIp);
-        instanceSpawnedEvent.setProperties(CloudControllerUtil
-                .toJavaUtilProperties(context.getProperties()));
-        log.info(String.format("Publishing instance spawned event: [service] %s [cluster] %s " +
-                " [instance-id] %s [network-partition] %s  [partition] %s " +
-                "[member]%s [lb-cluster-id] %s",
-                serviceName, clusterId, context.getInstanceId(), networkPartitionId, partitionId,
-                memberId, lbClusterId));
-        publishEvent(instanceSpawnedEvent);
-    }
-
-    public static void sendMemberStartedEvent(InstanceStartedEvent instanceStartedEvent) {
-        MemberStartedEvent memberStartedEventTopology = new MemberStartedEvent(instanceStartedEvent.getServiceName(),
-                instanceStartedEvent.getClusterId(), instanceStartedEvent.getNetworkPartitionId(),
-                instanceStartedEvent.getPartitionId(), instanceStartedEvent.getMemberId(),
-                instanceStartedEvent.getInstanceId());
-        if (log.isInfoEnabled()) {
-            log.info(String
-                    .format("Publishing member started event: [service] %s [cluster] %s [instance-id] %s " +
-                                    "[network-partition] %s [partition] %s [member] %s",
-                            instanceStartedEvent.getServiceName(),
-                            instanceStartedEvent.getClusterId(),
-                            instanceStartedEvent.getInstanceId(),
-                            instanceStartedEvent.getNetworkPartitionId(),
-                            instanceStartedEvent.getPartitionId(),
-                            instanceStartedEvent.getMemberId()));
-        }
-        publishEvent(memberStartedEventTopology);
-    }
-
-    public static void sendMemberActivatedEvent(
-            MemberActivatedEvent memberActivatedEvent) {
-        if (log.isInfoEnabled()) {
-            log.info(String
-                    .format("Publishing member activated event: [service] %s [cluster] %s " +
-                                    "[instance-id] %s [network-partition] %s [partition] %s [member] %s",
-                            memberActivatedEvent.getServiceName(),
-                            memberActivatedEvent.getClusterId(),
-                            memberActivatedEvent.getInstanceId(),
-                            memberActivatedEvent.getNetworkPartitionId(),
-                            memberActivatedEvent.getPartitionId(),
-                            memberActivatedEvent.getMemberId()));
-        }
-        publishEvent(memberActivatedEvent);
-    }
-
-    public static void sendMemberReadyToShutdownEvent(MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Publishing member Ready to shut down event: [service] %s " +
-                            " [instance-id] %s [cluster] %s [network-partition] %s [partition] %s " +
-                            "[member] %s [groupId] %s",
-                                        memberReadyToShutdownEvent.getServiceName(),
-                                        memberReadyToShutdownEvent.getClusterId(),
-                                        memberReadyToShutdownEvent.getInstanceId(),
-                                        memberReadyToShutdownEvent.getNetworkPartitionId(),
-                                        memberReadyToShutdownEvent.getPartitionId(),
-                                        memberReadyToShutdownEvent.getMemberId(),
-                                        memberReadyToShutdownEvent.getGroupId()));
-        }
-        // grouping
-        memberReadyToShutdownEvent.setGroupId(memberReadyToShutdownEvent.getGroupId());
-        publishEvent(memberReadyToShutdownEvent);
-    }
-
-    public static void sendMemberMaintenanceModeEvent(MemberMaintenanceModeEvent memberMaintenanceModeEvent) {
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Publishing Maintenance mode event: [service] %s [cluster] %s " +
-                            " [instance-id] %s [network-partition] %s [partition] %s [member] %s " +
-                            "[groupId] %s", memberMaintenanceModeEvent.getServiceName(),
-                                            memberMaintenanceModeEvent.getClusterId(),
-                                            memberMaintenanceModeEvent.getInstanceId(),
-                                            memberMaintenanceModeEvent.getNetworkPartitionId(),
-                                            memberMaintenanceModeEvent.getPartitionId(),
-                                            memberMaintenanceModeEvent.getMemberId(),
-                                            memberMaintenanceModeEvent.getGroupId()));
-        }
-
-        publishEvent(memberMaintenanceModeEvent);
-    }
-
-    public static void sendClusterActivatedEvent(ClusterInstanceActivatedEvent clusterActivatedEvent) {
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Publishing cluster activated event: [service] %s [cluster] %s " +
-                            " [instance-id] %s [appId] %s",
-                    clusterActivatedEvent.getServiceName(),
-                    clusterActivatedEvent.getClusterId(),
-                    clusterActivatedEvent.getInstanceId(),
-                    clusterActivatedEvent.getAppId()));
-        }
-        publishEvent(clusterActivatedEvent);
-    }
-
-    public static void sendClusterInactivateEvent(ClusterInstanceInactivateEvent clusterInactiveEvent) {
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Publishing cluster inactive event: [service] %s [cluster] %s " +
-                            "[instance-id] %s [appId] %s",
-                    clusterInactiveEvent.getServiceName(), clusterInactiveEvent.getClusterId(),
-                    clusterInactiveEvent.getInstanceId(), clusterInactiveEvent.getAppId()));
-        }
-        publishEvent(clusterInactiveEvent);
-    }
-
-    public static void sendClusterInstanceCreatedEvent(ClusterInstanceCreatedEvent clusterInstanceCreatedEvent) {
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Publishing cluster Instance Created event: [service] %s [cluster] %s " +
-                            " in [network partition] %s [instance-id] %s",
-                    clusterInstanceCreatedEvent.getServiceName(), clusterInstanceCreatedEvent.getClusterId(),
-                    clusterInstanceCreatedEvent.getNetworkPartitionId(),
-                    clusterInstanceCreatedEvent.getClusterInstance().getInstanceId()));
-        }
-        publishEvent(clusterInstanceCreatedEvent);
-    }
-
-
-    public static void sendMemberTerminatedEvent(String serviceName, String clusterId, String networkPartitionId,
-                                                 String partitionId, String memberId,
-                                                 Properties properties, String groupId, String instanceId) {
-        MemberTerminatedEvent memberTerminatedEvent = new MemberTerminatedEvent(serviceName, clusterId,
-                networkPartitionId, partitionId, memberId, instanceId);
-        memberTerminatedEvent.setProperties(properties);
-        memberTerminatedEvent.setGroupId(groupId);
-
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Publishing member terminated event: [service] %s [cluster] %s " +
-                            " [instance-id] %s [network-partition] %s [partition] %s [member] %s " +
-                            "[groupId] %s", serviceName, clusterId, instanceId, networkPartitionId,
-                    partitionId, memberId, groupId));
-        }
-
-        publishEvent(memberTerminatedEvent);
-    }
-
-    public static void sendCompleteTopologyEvent(Topology topology) {
-        CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
-
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Publishing complete topology event"));
-        }
-        publishEvent(completeTopologyEvent);
-    }
-
-    public static void sendClusterTerminatingEvent(ClusterInstanceTerminatingEvent clusterTerminatingEvent) {
-
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Publishing Cluster terminating event: [appId] %s [cluster id] %s" +
-                            " [instance-id] %s ",
-                    clusterTerminatingEvent.getAppId(), clusterTerminatingEvent.getClusterId(),
-                    clusterTerminatingEvent.getInstanceId()));
-        }
-
-        publishEvent(clusterTerminatingEvent);
-    }
-
-    public static void sendClusterTerminatedEvent(ClusterInstanceTerminatedEvent clusterTerminatedEvent) {
-
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Publishing Cluster terminated event: [appId] %s [cluster id] %s" +
-                            " [instance-id] %s ",
-                    clusterTerminatedEvent.getAppId(), clusterTerminatedEvent.getClusterId(),
-                    clusterTerminatedEvent.getInstanceId()));
-        }
-
-        publishEvent(clusterTerminatedEvent);
-    }
-
-    public static void publishEvent(Event event) {
-        String topic = Util.getMessageTopicName(event);
-        EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic);
-        eventPublisher.publish(event);
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b575c7d6/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologySynchronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologySynchronizerTask.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologySynchronizerTask.java
deleted file mode 100644
index 94843bc..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologySynchronizerTask.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.messaging.topology;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
-import org.apache.stratos.cloud.controller.context.CloudControllerContext;
-import org.wso2.carbon.ntask.core.Task;
-
-public class TopologySynchronizerTask implements Task{
-    private static final Log log = LogFactory.getLog(TopologySynchronizerTask.class);
-
-    @Override
-    public void execute() {
-        if (log.isDebugEnabled()) {
-            log.debug("Executing topology synchronization task");
-        }
-        
-        if(CloudControllerContext.getInstance().isTopologySyncRunning() ||
-        		// this is a temporary fix to avoid task execution - limitation with ntask
-                (!CloudControllerConfig.getInstance().isTopologySyncEnabled())){
-            if(log.isWarnEnabled()) {
-                log.warn("Topology synchronization is disabled.");
-            }
-            return;
-        }
-        
-    	// publish to the topic 
-        if (TopologyManager.getTopology() != null) {
-            TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology());
-        }
-    }
-    
-    @Override
-    public void init() {
-
-    	// this is a temporary fix to avoid task execution - limitation with ntask
-		if(!CloudControllerConfig.getInstance().isTopologySyncEnabled()){
-            if(log.isWarnEnabled()) {
-                log.warn("Topology synchronization is disabled.");
-            }
-			return;
-		}
-    }
-
-    @Override
-    public void setProperties(Map<String, String> arg0) {}
-    
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b575c7d6/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/CloudControllerService.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/CloudControllerService.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/CloudControllerService.java
index ce4887d..5f5124e 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/CloudControllerService.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/CloudControllerService.java
@@ -69,7 +69,7 @@ public interface CloudControllerService {
      * @return
      * @throws InvalidServiceGroupException
      */
-    public ServiceGroup getServiceGroup (String groupName) throws InvalidServiceGroupException;
+    public ServiceGroup getServiceGroup(String groupName) throws InvalidServiceGroupException;
 
     /**
      * Get service group sub group
@@ -77,7 +77,7 @@ public interface CloudControllerService {
      * @return
      * @throws InvalidServiceGroupException
      */
-    public String[] getServiceGroupSubGroups (String groupName) throws InvalidServiceGroupException;
+    public String[] getServiceGroupSubGroups(String groupName) throws InvalidServiceGroupException;
 
     /**
      * Get cartridges of a service group
@@ -85,7 +85,7 @@ public interface CloudControllerService {
      * @return
      * @throws InvalidServiceGroupException
      */
-    public String[] getServiceGroupCartridges (String groupName) throws InvalidServiceGroupException;
+    public String[] getServiceGroupCartridges(String groupName) throws InvalidServiceGroupException;
 
     /**
      * Get service group dependencies
@@ -93,7 +93,7 @@ public interface CloudControllerService {
      * @return
      * @throws InvalidServiceGroupException
      */
-    public Dependencies getServiceGroupDependencies (String groupName) throws InvalidServiceGroupException;
+    public Dependencies getServiceGroupDependencies(String groupName) throws InvalidServiceGroupException;
 
     /**
      * Validate a given {@link Partition} for basic property existence.
@@ -123,31 +123,24 @@ public interface CloudControllerService {
      * @param registrant information about the new subscription.
      * @return whether the registration is successful or not.
      *
-     * @throws UnregisteredCartridgeException
+     * @throws org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException
      *             when the cartridge type requested by this service is
      *             not a registered one.
      */
-    boolean registerService(Registrant registrant) throws UnregisteredCartridgeException;
+    boolean registerService(Registrant registrant) throws CartridgeNotFoundException;
 
     /**
-     * Calling this method will result in an instance startup, which is belong
-     * to the provided Cluster ID. Also note that the instance that is starting up
-     * belongs to the group whose name is derived from its Cluster ID, replacing <i>.</i>
-     * by a hyphen (<i>-</i>).
-     * @param member Context with cluster id, partition etc.
-     * @return updated {@link MemberContext}
-     * @throws UnregisteredCartridgeException if the requested Cartridge type is not a registered one.
-     * @throws InvalidIaasProviderException if the iaas requested is not valid.
-     */
-    MemberContext startInstance(MemberContext member) throws UnregisteredCartridgeException, InvalidIaasProviderException;
-    
-    /**
-     * Create a container cluster.
-     * @param {@link ContainerClusterContext} Context with cluster id, and host cluster details. 
-     * @return a list of {@link MemberContext}s correspond to each Pod created.
-     * @throws UnregisteredCartridgeException if the requested Cartridge type is not a registered one.
-     */
-    MemberContext[] startContainers(ContainerClusterContext clusterContext) throws UnregisteredCartridgeException;
+     * Start instances with the given instance contexts. Instances startup process will run in background and
+     * this method will return with the relevant member contexts.
+     * @param instanceContexts An array of instance contexts
+     * @return member contexts
+     * @throws org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException
+     * if the requested Cartridge type is not a registered one.
+     * @throws org.apache.stratos.cloud.controller.exception.InvalidIaasProviderException
+     * if the iaas requested is not valid.
+     */
+    MemberContext[] startInstances(InstanceContext[] instanceContexts) throws CartridgeNotFoundException,
+            InvalidIaasProviderException;
     
     /**
      * Calling this method will result in termination of the instance with given member id in the given Partition.
@@ -167,31 +160,6 @@ public interface CloudControllerService {
      * @return whether an instance terminated successfully or not.
      */
     void terminateInstances(String clusterId) throws InvalidClusterException;
-    
-    /**
-     * Terminate all containers of the given cluster.
-     * @param clusterId id of the subjected cluster.
-     * @return terminated {@link MemberContext}s
-     * @throws InvalidClusterException
-     */
-    MemberContext[] terminateContainers(String clusterId) throws InvalidClusterException;
-    
-    /**
-     * Terminate a given member/Kubernetes Pod.
-     * @param memberId member/Pod id to be terminated.
-     * @return terminated {@link MemberContext}
-     * @throws MemberTerminationFailedException
-     */
-    MemberContext terminateContainer(String memberId) throws MemberTerminationFailedException;
-    
-    /**
-     * Update the Kubernetes controller created for the given cluster with the specified number of replicas.
-     * @param clusterId id of the subjected cluster.
-     * @param replicas total number of replicas to be set to the controller.
-     * @return newly created Members if any / terminated {@link MemberContext} in scale down scenario.
-     * @throws InvalidClusterException
-     */
-    MemberContext[] updateContainers(String clusterId, int replicas) throws UnregisteredCartridgeException;
 
     /**
      * Update the topology with current cluster status.
@@ -203,13 +171,6 @@ public interface CloudControllerService {
     void updateClusterStatus(String serviceName, String clusterId, String instanceId, ClusterStatus status);
     
     /**
-     * Unregister a docker service identified by the given cluster id.
-     * @param clusterId service cluster id.
-     * @throws UnregisteredClusterException if the service cluster requested is not a registered one.
-     */
-    void unregisterDockerService(String clusterId) throws UnregisteredClusterException;
-    
-    /**
      * Unregister the service cluster identified by the given cluster id.
      * @param clusterId service cluster id.
      * @throws UnregisteredClusterException if the service cluster requested is not a registered one.
@@ -223,9 +184,9 @@ public interface CloudControllerService {
      * @param cartridgeType
      *            type of the cartridge.
      * @return {@link org.apache.stratos.cloud.controller.domain.CartridgeInfo} of the given cartridge type or <code>null</code>.
-     * @throws UnregisteredCartridgeException if there is no registered cartridge with this type.
+     * @throws org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException if there is no registered cartridge with this type.
      */
-    CartridgeInfo getCartridgeInfo(String cartridgeType) throws UnregisteredCartridgeException;
+    CartridgeInfo getCartridgeInfo(String cartridgeType) throws CartridgeNotFoundException;
 
     /**
      * Calling this method will result in returning the types of {@link org.apache.stratos.cloud.controller.domain.Cartridge}s
@@ -262,7 +223,7 @@ public interface CloudControllerService {
      * @param instanceId instance id
      * @throws ClusterInstanceCreationException if an y error occurs in cluster instance creation
      */
-    public void createClusterInstance (String serviceType, String clusterId, String alias,
+    public void createClusterInstance(String serviceType, String clusterId, String alias,
                                        String instanceId, String partitionId,
                                        String networkPartitionId) throws
             ClusterInstanceCreationException;