You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2014/11/10 12:56:38 UTC

[2/2] stratos git commit: implementing application cluster removal after undeployment

implementing application cluster removal after undeployment


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/bfa253cf
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/bfa253cf
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/bfa253cf

Branch: refs/heads/master
Commit: bfa253cf9e45a0f4bac99e214120e84ca4aebe55
Parents: a8146cb
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Mon Nov 10 15:34:46 2014 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Mon Nov 10 17:24:02 2014 +0530

----------------------------------------------------------------------
 .../applications/topic/ApplicationBuilder.java  |   7 +-
 .../topic/ApplicationsEventPublisher.java       |   9 +-
 .../application/ApplicationTopicReceiver.java   |   2 +-
 .../controller/topology/TopologyBuilder.java    |  49 ++++++---
 .../topology/TopologyEventPublisher.java        |   6 +-
 .../ApplicationClustersRemovedEvent.java        |  44 ++++++++
 ...ApplicationClustersRemovedEventListener.java |  25 +++++
 ...licationClustersRemovedMessageProcessor.java | 100 +++++++++++++++++++
 .../topology/TopologyMessageProcessorChain.java |   6 ++
 9 files changed, 227 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/bfa253cf/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
index d0ff90b..559a66a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
@@ -130,8 +130,7 @@ public class ApplicationBuilder {
                 application.setStatus(status);
                 updateApplicationMonitor(appId, status);
                 ApplicationHolder.persistApplication(application);
-                //publishing data
-                //TODO ApplicationsEventPublisher.sendApplicationActivatedEvent(appId);
+                ApplicationsEventPublisher.sendApplicationTerminatingEvent(appId);
             } else {
                 log.warn(String.format("Application state transition is not valid: [application-id] %s " +
                         " [current-status] %s [status-requested] %s", appId, application.getStatus(), status));
@@ -155,6 +154,7 @@ public class ApplicationBuilder {
             log.warn("Application does not exist: [application-id] " + appId);
         } else {
             Application application = applications.getApplication(appId);
+            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
 
             ApplicationStatus status = ApplicationStatus.Terminated;
             if (application.isStateTransitionValid(status)) {
@@ -168,7 +168,8 @@ public class ApplicationBuilder {
                 //Removing the application from memory and registry
                 ApplicationHolder.removeApplication(appId);
                 log.info("Application is removed: [application-id] " + appId);
-                //TODO ApplicationsEventPublisher.sendApplicationTerminatedEvent(appId);
+
+                ApplicationsEventPublisher.sendApplicationTerminatedEvent(appId, clusterData);
             } else {
                 log.warn(String.format("Application state transition is not valid: [application-id] %s " +
                         " [current-status] %s [status-requested] %s", appId, application.getStatus(), status));

http://git-wip-us.apache.org/repos/asf/stratos/blob/bfa253cf/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
index cc6f27e..d4969fa 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java
@@ -6,10 +6,13 @@ import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.applications.Application;
 import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.event.applications.*;
 import org.apache.stratos.messaging.util.Util;
 
+import java.util.Set;
+
 /**
  * This will publish application related events to application status topic.
  */
@@ -101,19 +104,19 @@ public class ApplicationsEventPublisher {
 
     public static void sendApplicationTerminatingEvent(String appId) {
         if (log.isInfoEnabled()) {
-            log.info("Publishing Application terminated event for [application]: " + appId);
+            log.info("Publishing Application terminating event for [application]: " + appId);
         }
         ApplicationTerminatingEvent applicationTerminatingEvent =
                 new ApplicationTerminatingEvent(appId);
         publishEvent(applicationTerminatingEvent);
     }
 
-    public static void sendApplicationTerminatedEvent(String appId) {
+    public static void sendApplicationTerminatedEvent(String appId, Set<ClusterDataHolder> clusterData) {
         if (log.isInfoEnabled()) {
             log.info("Publishing Application terminated event for [application]: " + appId);
         }
         ApplicationTerminatedEvent applicationTerminatedEvent =
-                new ApplicationTerminatedEvent(appId, null);
+                new ApplicationTerminatedEvent(appId, clusterData);
         publishEvent(applicationTerminatedEvent);
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/bfa253cf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java
index f331964..58176d3 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java
@@ -70,7 +70,7 @@ public class ApplicationTopicReceiver implements Runnable{
                 log.info("ApplicationTerminatedEvent received for [application]");
                 ApplicationTerminatedEvent terminatedEvent = (ApplicationTerminatedEvent)event;
                 String appId = terminatedEvent.getAppId();
-                TopologyBuilder.handleApplicationClustersRemoved(appId);
+                TopologyBuilder.handleApplicationClustersRemoved(appId, terminatedEvent.getClusterData());
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/bfa253cf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
index 6935dea..2f94d0d 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
@@ -21,14 +21,17 @@ package org.apache.stratos.cloud.controller.topology;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.exception.CloudControllerException;
 import org.apache.stratos.cloud.controller.exception.InvalidCartridgeTypeException;
 import org.apache.stratos.cloud.controller.exception.InvalidMemberException;
 import org.apache.stratos.cloud.controller.pojo.Cartridge;
 import org.apache.stratos.cloud.controller.pojo.*;
 import org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisher;
+import org.apache.stratos.cloud.controller.registry.RegistryManager;
 import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent;
 import org.apache.stratos.messaging.event.cluster.status.*;
@@ -39,11 +42,9 @@ import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
 import org.apache.stratos.messaging.event.topology.*;
 import org.apache.stratos.metadata.client.defaults.DefaultMetaDataServiceClient;
 import org.apache.stratos.metadata.client.defaults.MetaDataServiceClient;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 /**
  * this is to manipulate the received events by cloud controller
@@ -178,21 +179,32 @@ public class TopologyBuilder {
 
     }
 
-    public static void handleApplicationClustersRemoved(String appId) {
+    public static void handleApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusterData) {
         TopologyManager.acquireWriteLock();
 
         List<Cluster> removedClusters = new ArrayList<Cluster>();
+        FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder.getInstance();
         try {
             Topology topology = TopologyManager.getTopology();
-            for(Service service : topology.getServices()) {
-                for(Cluster cluster : service.getClusters()) {
-                    if(cluster.getAppId().equals(appId)) {
-                        removedClusters.add(service.removeCluster(cluster.getClusterId()));
-                        FasterLookUpDataHolder.getInstance().
-                                removeClusterContext(cluster.getClusterId());
+
+            if (clusterData != null) {
+                // remove clusters from CC topology model and remove runtime information
+                for (ClusterDataHolder aClusterData : clusterData) {
+                    Service aService = topology.getService(aClusterData.getServiceType());
+                    if (aService != null) {
+                        removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
+                    } else {
+                        log.warn("Service " + aClusterData.getServiceType() + " not found, unable to remove Cluster " + aClusterData.getClusterId());
                     }
+                    // remove runtime data
+                    dataHolder.removeClusterContext(aClusterData.getClusterId());
                 }
+                // persist runtime data changes
+                persist(dataHolder);
+            } else {
+                log.info("No cluster data found for application " + appId + " to remove");
             }
+
             log.info("Application Cluster " + appId + " are removed from the topology");
 
             TopologyManager.updateTopology(topology);
@@ -201,11 +213,24 @@ public class TopologyBuilder {
             TopologyManager.releaseWriteLock();
         }
 
-        TopologyEventPublisher.sendApplicationClustersRemoved(appId, removedClusters);
+        TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData);
 
     }
 
+    /**
+     * Persist data in registry.
+     */
+    private static void persist(FasterLookUpDataHolder dataHolder) {
+        try {
+            RegistryManager.getInstance().persist(
+                    dataHolder);
+        } catch (RegistryException e) {
 
+            String msg = "Failed to persist the Cloud Controller data in registry. Further, transaction roll back also failed.";
+            log.fatal(msg);
+            throw new CloudControllerException(msg, e);
+        }
+    }
 
     public static void handleClusterReset(ClusterStatusClusterResetEvent event) {
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/bfa253cf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
index 385813a..3a7e813 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
@@ -28,6 +28,7 @@ import org.apache.stratos.cloud.controller.pojo.PortMapping;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Port;
 import org.apache.stratos.messaging.domain.topology.ServiceType;
@@ -40,6 +41,7 @@ import org.apache.stratos.messaging.util.Util;
 
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * this is to send the relevant events from cloud controller to topology topic
@@ -113,13 +115,13 @@ public class TopologyEventPublisher {
         publishEvent(new ApplicationClustersCreatedEvent(clusters, appId));
     }
 
-    public static void sendApplicationClustersRemoved(String appId, List<Cluster> clusters) {
+    public static void sendApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusters) {
 
         if (log.isInfoEnabled()) {
             log.info("Publishing Application Clusters removed event for Application: " + appId);
         }
 
-        //TODO publishEvent(new ApplicationClustersCreatedEvent(clusters, appId));
+        publishEvent(new ApplicationClustersRemovedEvent(clusters, appId));
     }
 
 //    public static void sendApplicationRemovedEvent(String applicationId, Set<ClusterDataHolder> clusterData,

http://git-wip-us.apache.org/repos/asf/stratos/blob/bfa253cf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersRemovedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersRemovedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersRemovedEvent.java
new file mode 100644
index 0000000..3f469cf
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationClustersRemovedEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class ApplicationClustersRemovedEvent extends TopologyEvent implements Serializable {
+
+    private Set<ClusterDataHolder> clusterData;
+    private String appId;
+
+    public ApplicationClustersRemovedEvent (Set<ClusterDataHolder> clusterData, String appId) {
+        this.clusterData = clusterData;
+        this.appId = appId;
+    }
+
+    public Set<ClusterDataHolder> getClusterData() {
+        return clusterData;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bfa253cf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersRemovedEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersRemovedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersRemovedEventListener.java
new file mode 100644
index 0000000..60f4126
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/ApplicationClustersRemovedEventListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.listener.topology;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class ApplicationClustersRemovedEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bfa253cf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersRemovedMessageProcessor.java
new file mode 100644
index 0000000..02ff40c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersRemovedMessageProcessor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ApplicationClustersRemovedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+import java.util.Set;
+
+public class ApplicationClustersRemovedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(ApplicationClustersRemovedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+
+        Topology topology = (Topology) object;
+
+        if (ApplicationClustersRemovedEvent.class.getName().equals(type)) {
+            if (!topology.isInitialized()) {
+                return false;
+            }
+
+            // Parse complete message and build event
+            ApplicationClustersRemovedEvent event = (ApplicationClustersRemovedEvent) Util.
+                    jsonToObject(message, ApplicationClustersRemovedEvent.class);
+
+            return doProcess(event, topology);
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (ApplicationClustersRemovedEvent event,Topology topology) {
+
+        Set<ClusterDataHolder> clusterData = event.getClusterData();
+        if (clusterData != null) {
+            for (ClusterDataHolder aClusterData : clusterData) {
+                TopologyUpdater.acquireWriteLockForService(aClusterData.getServiceType());
+
+                try {
+                    Service aService = topology.getService(aClusterData.getServiceType());
+                    if (aService != null) {
+                        aService.removeCluster(aClusterData.getClusterId());
+                    } else {
+                        log.warn("Service " + aClusterData.getServiceType() + " not found, unable to remove Cluster " + aClusterData.getClusterId());
+                    }
+
+                } finally {
+                    TopologyUpdater.releaseWriteLockForService(aClusterData.getServiceType());
+                }
+            }
+
+        } else {
+            if(log.isDebugEnabled()) {
+                log.debug("No cluster data found in application " + event.getAppId() + " to remove from Topology");
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/bfa253cf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index 1a84c8c..f51a521 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -36,6 +36,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
     private ServiceCreatedMessageProcessor serviceCreatedMessageProcessor;
     private ServiceRemovedMessageProcessor serviceRemovedMessageProcessor;
     private ApplicationClustersCreatedMessageProcessor clustersCreatedMessageProcessor;
+    private ApplicationClustersRemovedMessageProcessor clustersRemovedMessageProcessor;
     private ClusterCreatedMessageProcessor clusterCreatedMessageProcessor;
     private ClusterResetMessageProcessor clusterResetMessageProcessor;
     private ClusterActivatedProcessor clusterActivatedProcessor;
@@ -65,6 +66,9 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
         clustersCreatedMessageProcessor = new ApplicationClustersCreatedMessageProcessor();
         add(clustersCreatedMessageProcessor);
 
+        clustersRemovedMessageProcessor = new ApplicationClustersRemovedMessageProcessor();
+        add(clusterRemovedMessageProcessor);
+
         clusterCreatedMessageProcessor = new ClusterCreatedMessageProcessor();
         add(clusterCreatedMessageProcessor);
 
@@ -119,6 +123,8 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
             clusterCreatedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ApplicationClustersCreatedEventListener) {
             clustersCreatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ApplicationClustersRemovedEventListener) {
+            clustersRemovedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ClusterActivatedEventListener) {
             clusterActivatedProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ClusterInActivateEventListener) {