You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/10/31 08:51:41 UTC

[08/10] removing application status and adding applications, cluster status topic

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
index db5d777..92d61ba 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
@@ -16,48 +16,106 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.stratos.messaging.message.processor.applications;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.ApplicationCreatedEvent;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
 import org.apache.stratos.messaging.util.Util;
 
-public class ApplicationCreatedMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(ApplicationCreatedMessageProcessor.class);
+import java.util.Set;
 
+public class ApplicationCreatedMessageProcessor extends MessageProcessor {
 
+    private static final Log log = LogFactory.getLog(ApplicationCreatedMessageProcessor.class);
     private MessageProcessor nextProcessor;
 
     @Override
     public void setNext(MessageProcessor nextProcessor) {
         this.nextProcessor = nextProcessor;
-
     }
 
     @Override
     public boolean process(String type, String message, Object object) {
+
+        Topology topology = (Topology) object;
+
         if (ApplicationCreatedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            ApplicationCreatedEvent event =
-                    (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
+            if (!topology.isInitialized()) {
+                return false;
+            }
 
-            if (log.isDebugEnabled()) {
-                log.debug("Received ApplicationCreated Event in application status topic: " + event.toString());
+            ApplicationCreatedEvent event = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
+            if (event == null) {
+                log.error("Unable to convert the JSON message to ApplicationCreatedEvent");
+                return false;
             }
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+
+            TopologyUpdater.acquireWriteLockForApplications();
+            // since the Clusters will also get modified, acquire write locks for each Service Type
+            Set<ClusterDataHolder> clusterDataHolders = event.getApplication().getClusterDataRecursively();
+            if (clusterDataHolders != null) {
+                for (ClusterDataHolder clusterData : clusterDataHolders) {
+                    TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType());
+                }
+            }
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                if (clusterDataHolders != null) {
+                    for (ClusterDataHolder clusterData : clusterDataHolders) {
+                        TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType());
+                    }
+                }
+                TopologyUpdater.releaseWriteLockForApplications();
+            }
+
         } else {
             if (nextProcessor != null) {
-                return nextProcessor.process(type, message, object);
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
             } else {
-                throw new RuntimeException(
-                        String.format("Failed to process group activated message " +
-                                "using available message processors: [type] %s [body] %s", type, message));
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
         }
     }
+
+    private boolean doProcess (ApplicationCreatedEvent event,Topology topology) {
+
+        // check if required properties are available
+        if (event.getApplication() == null) {
+            String errorMsg = "Application object of application created event is invalid";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        if (event.getApplication().getUniqueIdentifier() == null || event.getApplication().getUniqueIdentifier().isEmpty()) {
+            String errorMsg = "App id of application created event is invalid: [ " + event.getApplication().getUniqueIdentifier() + " ]";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        // check if an Application with same name exists in topology
+        if (topology.applicationExists(event.getApplication().getUniqueIdentifier())) {
+            log.warn("Application with id [ " + event.getApplication().getUniqueIdentifier() + " ] already exists in Topology");
+
+        } else {
+            // add application and the clusters to Topology
+            for(Cluster cluster: event.getClusterList()) {
+                topology.getService(cluster.getServiceName()).addCluster(cluster);
+            }
+            topology.addApplication(event.getApplication());
+        }
+
+        notifyEventListeners(event);
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
index d8f2aac..91eae8c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationInactivatedMessageProcessor.java
@@ -20,10 +20,17 @@ package org.apache.stratos.messaging.message.processor.applications;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.ApplicationInactivatedEvent;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
 import org.apache.stratos.messaging.util.Util;
 
+/**
+ * This processor responsible to process the application Inactivation even and update the Topology.
+ */
 public class ApplicationInactivatedMessageProcessor extends MessageProcessor {
     private static final Log log =
             LogFactory.getLog(ApplicationInactivatedMessageProcessor.class);
@@ -34,30 +41,64 @@ public class ApplicationInactivatedMessageProcessor extends MessageProcessor {
     @Override
     public void setNext(MessageProcessor nextProcessor) {
         this.nextProcessor = nextProcessor;
-
     }
 
+
     @Override
     public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
         if (ApplicationInactivatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
             // Parse complete message and build event
-            ApplicationInactivatedEvent event =
-                    (ApplicationInactivatedEvent) Util.jsonToObject(message, ApplicationInactivatedEvent.class);
+            ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util.
+                    jsonToObject(message, ApplicationInactivatedEvent.class);
 
-            if (log.isDebugEnabled()) {
-                log.debug("Received ApplicationInActivatedEvent in application status topic: " + event.toString());
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
             }
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+
         } else {
             if (nextProcessor != null) {
-                return nextProcessor.process(type, message, object);
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
             } else {
-                throw new RuntimeException(
-                        String.format("Failed to process group activated message " +
-                                "using available message processors: [type] %s [body] %s", type, message));
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
             }
+            return false;
+        } else {
+            // Apply changes to the topology
+            if (!application.isStateTransitionValid(ApplicationStatus.Inactive)) {
+                log.error("Invalid State transfer from [ " + application.getStatus() +
+                        " ] to [ " + ApplicationStatus.Inactive + " ]");
+            }
+            application.setStatus(ApplicationStatus.Inactive);
+
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
index a121ffb..8cd2182 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatedMessageProcessor.java
@@ -20,10 +20,18 @@ package org.apache.stratos.messaging.message.processor.applications;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.ApplicationTerminatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
 import org.apache.stratos.messaging.util.Util;
 
+import java.util.Set;
+
+/**
+ * This processor responsible to process the application Inactivation even and update the Topology.
+ */
 public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
     private static final Log log =
             LogFactory.getLog(ApplicationTerminatedMessageProcessor.class);
@@ -34,30 +42,95 @@ public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
     @Override
     public void setNext(MessageProcessor nextProcessor) {
         this.nextProcessor = nextProcessor;
-
     }
 
+
     @Override
     public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
         if (ApplicationTerminatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
             // Parse complete message and build event
-            ApplicationTerminatedEvent event =
-                    (ApplicationTerminatedEvent) Util.jsonToObject(message, ApplicationTerminatedEvent.class);
+            ApplicationTerminatedEvent event = (ApplicationTerminatedEvent) Util.
+                    jsonToObject(message, ApplicationTerminatedEvent.class);
 
-            if (log.isDebugEnabled()) {
-                log.debug("Received ApplicationTerminatedEvent in application status topic: " + event.toString());
+            TopologyUpdater.acquireWriteLockForApplications();
+                        Set<ClusterDataHolder> clusterDataHolders = event.getClusterData();
+            if (clusterDataHolders != null) {
+                for (ClusterDataHolder clusterData : clusterDataHolders) {
+                    TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType());
+                }
             }
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplications();
+                if (clusterDataHolders != null) {
+                    for (ClusterDataHolder clusterData : clusterDataHolders) {
+                        TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType());
+                    }
+                }
+            }
+
         } else {
             if (nextProcessor != null) {
-                return nextProcessor.process(type, message, object);
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
             } else {
-                throw new RuntimeException(
-                        String.format("Failed to process group activated message " +
-                                "using available message processors: [type] %s [body] %s", type, message));
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
         }
     }
+
+    private boolean doProcess (ApplicationTerminatedEvent event, Topology topology) {
+
+        // check if required properties are available
+        if (event.getAppId() == null) {
+            String errorMsg = "Application Id of application removed event is invalid";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        if (event.getTenantDomain()== null) {
+            String errorMsg = "Application tenant domain of application removed event is invalid";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        // check if an Application with same name exists in topology
+        String appId = event.getAppId();
+        if (topology.applicationExists(appId)) {
+            log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it");
+            topology.removeApplication(appId);
+        }
+
+        if (event.getClusterData() != null) {
+            // remove the Clusters from the Topology
+            for (ClusterDataHolder clusterData : event.getClusterData()) {
+                Service service = topology.getService(clusterData.getServiceType());
+                if (service != null) {
+                    service.removeCluster(clusterData.getClusterId());
+                    if (log.isDebugEnabled()) {
+                        log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology");
+                    }
+                }  else {
+                    log.warn("Service " + clusterData.getServiceType() + " not found in Topology!");
+                }
+            }
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("ApplicationRemovedMessageProcessor notifying listener ");
+        }
+
+        notifyEventListeners(event);
+        return true;
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
index 280de2c..057d013 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationTerminatingMessageProcessor.java
@@ -20,10 +20,17 @@ package org.apache.stratos.messaging.message.processor.applications;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.ApplicationTerminatingEvent;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ApplicationTerminatingEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
 import org.apache.stratos.messaging.util.Util;
 
+/**
+ * This processor responsible to process the application Inactivation even and update the Topology.
+ */
 public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
     private static final Log log =
             LogFactory.getLog(ApplicationTerminatingMessageProcessor.class);
@@ -34,30 +41,64 @@ public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
     @Override
     public void setNext(MessageProcessor nextProcessor) {
         this.nextProcessor = nextProcessor;
-
     }
 
+
     @Override
     public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
         if (ApplicationTerminatingEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
             // Parse complete message and build event
-            ApplicationTerminatingEvent event =
-                    (ApplicationTerminatingEvent) Util.jsonToObject(message, ApplicationTerminatingEvent.class);
+            ApplicationTerminatingEvent event = (ApplicationTerminatingEvent) Util.
+                    jsonToObject(message, ApplicationTerminatingEvent.class);
 
-            if (log.isDebugEnabled()) {
-                log.debug("Received ApplicationTerminatingEvent in application status topic: " + event.toString());
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
             }
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+
         } else {
             if (nextProcessor != null) {
-                return nextProcessor.process(type, message, object);
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
             } else {
-                throw new RuntimeException(
-                        String.format("Failed to process group activated message " +
-                                "using available message processors: [type] %s [body] %s", type, message));
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (ApplicationTerminatingEvent event, Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
             }
+            return false;
+        } else {
+            // Apply changes to the topology
+            if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) {
+                log.error("Invalid State transfer from [ " + application.getStatus() +
+                        " ] to [ " + ApplicationStatus.Terminating + " ]");
+            }
+            application.setStatus(ApplicationStatus.Terminating);
+
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java
new file mode 100644
index 0000000..7e91ab8
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationUndeployedMessageProcessor.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.ApplicationUndeployedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+import java.util.Set;
+
+public class ApplicationUndeployedMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(ApplicationUndeployedMessageProcessor.class);
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+
+        Topology topology = (Topology) object;
+
+        if (ApplicationUndeployedEvent.class.getName().equals(type)) {
+            if (!topology.isInitialized()) {
+                return false;
+            }
+
+            ApplicationUndeployedEvent event = (ApplicationUndeployedEvent)
+                    Util.jsonToObject(message, ApplicationUndeployedEvent.class);
+            if (event == null) {
+                log.error("Unable to convert the JSON message to ApplicationUndeployedEvent");
+                return false;
+            }
+
+            // get write lock for the application and relevant Clusters
+            TopologyUpdater.acquireWriteLockForApplication(event.getApplicationId());
+            Set<ClusterDataHolder> clusterDataHolders = event.getClusterData();
+            if (clusterDataHolders != null) {
+                for (ClusterDataHolder clusterData : clusterDataHolders) {
+                    TopologyUpdater.acquireWriteLockForCluster(clusterData.getServiceType(),
+                            clusterData.getClusterId());
+                }
+            }
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                // remove locks
+                if (clusterDataHolders != null) {
+                    for (ClusterDataHolder clusterData : clusterDataHolders) {
+                        TopologyUpdater.releaseWriteLockForCluster(clusterData.getServiceType(),
+                                clusterData.getClusterId());
+                    }
+                }
+                TopologyUpdater.releaseWriteLockForApplication(event.getApplicationId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format
+                    ("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (ApplicationUndeployedEvent event, Topology topology) {
+
+        // update the application status to Terminating
+        Application application = topology.getApplication(event.getApplicationId());
+        // check and update application status to 'Terminating'
+        if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) {
+            log.error("Invalid state transfer from " + application.getStatus() + " to " + ApplicationStatus.Terminating);
+        }
+        // for now anyway update the status forcefully
+        application.setStatus(ApplicationStatus.Terminating);
+
+        // update all the Clusters' statuses to 'Terminating'
+        Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+        // update the Cluster statuses to Terminating
+        for (ClusterDataHolder clusterDataHolder : clusterData) {
+            Service service = topology.getService(clusterDataHolder.getServiceType());
+            if (service != null) {
+                Cluster aCluster = service.getCluster(clusterDataHolder.getClusterId());
+                if (aCluster != null) {
+                    // validate state transition
+                    if (!aCluster.isStateTransitionValid(ClusterStatus.Terminating)) {
+                        log.error("Invalid state transfer from " + aCluster.getStatus() + " to "
+                                + ClusterStatus.Terminating);
+                    }
+                    // for now anyway update the status forcefully
+                    aCluster.setStatus(ClusterStatus.Terminating);
+
+                } else {
+                    log.warn("Unable to find Cluster with cluster id " + clusterDataHolder.getClusterId() +
+                            " in Topology");
+                }
+
+            } else {
+                log.warn("Unable to remove cluster with cluster id: " + clusterDataHolder.getClusterId() + " from Topology, " +
+                        " associated Service [ " + clusterDataHolder.getServiceType() + " ] npt found");
+            }
+        }
+
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
new file mode 100644
index 0000000..e75b604
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationsMessageProcessorChain.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.applications.*;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Application Status processor chain is to handle the list processors to parse the application
+ * status.
+ */
+public class ApplicationsMessageProcessorChain extends MessageProcessorChain {
+    private static final Log log = LogFactory.getLog(ApplicationsMessageProcessorChain.class);
+
+    private GroupCreatedProcessor groupCreatedMessageProcessor;
+    private GroupActivatedProcessor groupActivatedMessageProcessor;
+    private GroupInActivateProcessor groupInActivateMessageProcessor;
+    private GroupTerminatedProcessor groupTerminatedProcessor;
+    private GroupTerminatingProcessor groupTerminatingProcessor;
+    private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor;
+    private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
+    private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor;
+    private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor;
+    private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor;
+    private ApplicationUndeployedMessageProcessor applicationUndeployedMessageProcessor;
+
+    public void initialize() {
+        // Add instance notifier event processors
+
+        groupCreatedMessageProcessor = new GroupCreatedProcessor();
+        add(groupCreatedMessageProcessor);
+
+        groupActivatedMessageProcessor = new GroupActivatedProcessor();
+        add(groupActivatedMessageProcessor);
+
+        groupInActivateMessageProcessor = new GroupInActivateProcessor();
+        add(groupInActivateMessageProcessor);
+
+        groupTerminatedProcessor = new GroupTerminatedProcessor();
+        add(groupTerminatedProcessor);
+
+        groupTerminatingProcessor = new GroupTerminatingProcessor();
+        add(groupTerminatingProcessor);
+
+        applicationActivatedMessageProcessor = new ApplicationActivatedMessageProcessor();
+        add(applicationActivatedMessageProcessor);
+
+        applicationCreatedMessageProcessor = new ApplicationCreatedMessageProcessor();
+        add(applicationCreatedMessageProcessor);
+
+        applicationInactivatedMessageProcessor = new ApplicationInactivatedMessageProcessor();
+        add(applicationInactivatedMessageProcessor);
+
+        applicationTerminatingMessageProcessor = new ApplicationTerminatingMessageProcessor();
+        add(applicationTerminatingMessageProcessor);
+
+        applicationTerminatedMessageProcessor = new ApplicationTerminatedMessageProcessor();
+        add(applicationTerminatedMessageProcessor);
+
+        applicationUndeployedMessageProcessor = new ApplicationUndeployedMessageProcessor();
+        add(applicationUndeployedMessageProcessor);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Instance notifier message processor chain initialized");
+        }
+    }
+
+    public void addEventListener(EventListener eventListener) {
+
+        if(eventListener instanceof GroupCreatedEventListener) {
+            groupCreatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof GroupInactivateEventListener) {
+            groupInActivateMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof GroupActivatedEventListener) {
+            groupActivatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof GroupTerminatingEventListener) {
+            groupTerminatingProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof GroupTerminatedEventListener) {
+            groupTerminatedProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ApplicationCreatedEventListener) {
+            applicationCreatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ApplicationActivatedEventListener) {
+            applicationActivatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ApplicationInactivatedEventListener) {
+            applicationInactivatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ApplicationTerminatingEventListener) {
+            applicationTerminatingMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ApplicationTerminatedEventListener) {
+            applicationTerminatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ApplicationUndeployedEventListener) {
+            applicationUndeployedMessageProcessor.addEventListener(eventListener);
+        } else {
+            throw new RuntimeException("Unknown event listener " + eventListener.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
deleted file mode 100644
index 02ddda8..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedMessageProcessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.GroupActivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-public class GroupActivatedMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(GroupActivatedMessageProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        if (GroupActivatedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            GroupActivatedEvent event =
-                    (GroupActivatedEvent) Util.jsonToObject(message, GroupActivatedEvent.class);
-
-            if (log.isDebugEnabled()) {
-                log.debug("Received GroupActivatedEvent: " + event.toString());
-            }
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                return nextProcessor.process(type, message, object);
-            } else {
-                throw new RuntimeException(
-                        String.format("Failed to process group activated message " +
-                                "using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
new file mode 100644
index 0000000..845e933
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupActivatedProcessor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupActivatedProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupActivatedProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (GroupActivatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupActivatedEvent event = (GroupActivatedEvent) Util.
+                    jsonToObject(message, GroupActivatedEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (GroupActivatedEvent event,Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            // Apply changes to the topology
+            if (!group.isStateTransitionValid(GroupStatus.Active)) {
+                log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
+            }
+            group.setStatus(GroupStatus.Active);
+
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
deleted file mode 100644
index d04d7f9..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedMessageProcessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.GroupCreatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-public class GroupCreatedMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(GroupCreatedMessageProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        if (GroupCreatedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            GroupCreatedEvent event =
-                    (GroupCreatedEvent) Util.jsonToObject(message, GroupCreatedEvent.class);
-
-            if (log.isDebugEnabled()) {
-                log.debug("Received AppStatusGroupCreatedEvent: " + event.toString());
-            }
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                return nextProcessor.process(type, message, object);
-            } else {
-                throw new RuntimeException(
-                        String.format("Failed to process group created message " +
-                                "using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java
new file mode 100644
index 0000000..47d4457
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupCreatedProcessor.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.GroupCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupCreatedProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupCreatedProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (GroupCreatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupCreatedEvent event = (GroupCreatedEvent) Util.
+                    jsonToObject(message, GroupCreatedEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (GroupCreatedEvent event,Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            // Apply changes to the topology
+            if (!group.isStateTransitionValid(GroupStatus.Created)) {
+                log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Created  + " " +
+                        "for Group " + group.getAlias());
+            }
+            group.setStatus(GroupStatus.Created);
+
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
new file mode 100644
index 0000000..063a3de
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInActivateProcessor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.GroupInactivateEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupInActivateProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupInActivateProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (GroupInactivateEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupInactivateEvent event = (GroupInactivateEvent) Util.
+                    jsonToObject(message, GroupInactivateEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess(GroupInactivateEvent event, Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            group.setStatus(GroupStatus.Inactive);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Group updated as in-activated : %s",
+                        group.getUniqueIdentifier()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
deleted file mode 100644
index 6cf2587..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupInactivatedMessageProcessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.AppStatusGroupInactivateEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-public class GroupInactivatedMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(GroupInactivatedMessageProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        if (AppStatusGroupInactivateEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            AppStatusGroupInactivateEvent event =
-                    (AppStatusGroupInactivateEvent) Util.jsonToObject(message, AppStatusGroupInactivateEvent.class);
-
-            if (log.isDebugEnabled()) {
-                log.debug("Received GroupInActivateEvent: " + event.toString());
-            }
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                return nextProcessor.process(type, message, object);
-            } else {
-                throw new RuntimeException(
-                        String.format("Failed to process group in activated message " +
-                                "using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
deleted file mode 100644
index a917a14..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedMessageProcessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.GroupTerminatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-public class GroupTerminatedMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(GroupTerminatedMessageProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        if (GroupTerminatedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            GroupTerminatedEvent event =
-                    (GroupTerminatedEvent) Util.jsonToObject(message, GroupTerminatedEvent.class);
-
-            if (log.isDebugEnabled()) {
-                log.debug("Received GroupTerminatingEvent: " + event.toString());
-            }
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                return nextProcessor.process(type, message, object);
-            } else {
-                throw new RuntimeException(
-                        String.format("Failed to process group in GroupTerminatingEvent message " +
-                                "using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
new file mode 100644
index 0000000..3de0914
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatedProcessor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupTerminatedProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupTerminatedProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (GroupTerminatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupTerminatedEvent event = (GroupTerminatedEvent) Util.
+                    jsonToObject(message, GroupTerminatedEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (GroupTerminatedEvent event,Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            // Apply changes to the topology
+            if (!group.isStateTransitionValid(GroupStatus.Terminated)) {
+                log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Terminated);
+            }
+            group.setStatus(GroupStatus.Terminated);
+
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
deleted file mode 100644
index 63c055d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingMessageProcessor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.applications;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.applications.GroupTerminatingEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-public class GroupTerminatingMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(GroupTerminatingMessageProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        if (GroupTerminatingEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            GroupTerminatingEvent event =
-                    (GroupTerminatingEvent) Util.jsonToObject(message, GroupTerminatingEvent.class);
-
-            if (log.isDebugEnabled()) {
-                log.debug("Received GroupTerminatingEvent: " + event.toString());
-            }
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                return nextProcessor.process(type, message, object);
-            } else {
-                throw new RuntimeException(
-                        String.format("Failed to process group in GroupTerminatingEvent message " +
-                                "using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
new file mode 100644
index 0000000..e124b7b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/GroupTerminatingProcessor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Group;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupTerminatingProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupTerminatingProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (GroupTerminatingEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupTerminatingEvent event = (GroupTerminatingEvent) Util.
+                    jsonToObject(message, GroupTerminatingEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (GroupTerminatingEvent event,Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            // Apply changes to the topology
+            if (!group.isStateTransitionValid(GroupStatus.Terminating)) {
+                log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
+            }
+            group.setStatus(GroupStatus.Terminating);
+
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java
new file mode 100644
index 0000000..694c3f3
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterActivatedMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterActivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ClusterStatusClusterActivatedMessageProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(ClusterStatusClusterActivatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ClusterStatusClusterActivatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ClusterStatusClusterActivatedEvent event = (ClusterStatusClusterActivatedEvent) Util.
+                    jsonToObject(message, ClusterStatusClusterActivatedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received ClusterActivatedEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java
new file mode 100644
index 0000000..9b4780b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ClusterStatusClusterCreatedMessageProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(ClusterStatusClusterCreatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ClusterStatusClusterCreatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ClusterStatusClusterCreatedEvent event = (ClusterStatusClusterCreatedEvent) Util.
+                    jsonToObject(message, ClusterStatusClusterCreatedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received AppStatusClusterCreatedEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format("Failed to process cluster created message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}