You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2014/10/07 15:40:30 UTC

[1/4] git commit: hierarchical locking contd.

Repository: stratos
Updated Branches:
  refs/heads/4.0.0-grouping 2536b30b3 -> 622ce9fb6


hierarchical locking contd.


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/622ce9fb
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/622ce9fb
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/622ce9fb

Branch: refs/heads/4.0.0-grouping
Commit: 622ce9fb6aa9b6ab4613626b32c805ff74d868cd
Parents: 4ace39c
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Tue Oct 7 19:08:25 2014 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Tue Oct 7 19:08:54 2014 +0530

----------------------------------------------------------------------
 .../impl/CloudControllerServiceImpl.java        |   6 -
 .../controller/topology/TopologyBuilder.java    |   7 +-
 .../topology/TopologyEventPublisher.java        |   6 +-
 .../event/topology/ApplicationRemovedEvent.java |  15 +-
 .../ApplicationActivatedMessageProcessor.java   |   2 -
 .../ApplicationCreatedMessageProcessor.java     |   9 +
 .../ApplicationRemovedMessageProcessor.java     |  23 ++
 .../topology/ClusterActivatedProcessor.java     |   2 -
 .../ClusterCreatedMessageProcessor.java         |   2 -
 .../ClusterMaintenanceModeMessageProcessor.java |   2 -
 .../ClusterRemovedMessageProcessor.java         |   2 -
 .../topology/GroupActivatedProcessor.java       |   2 -
 .../receiver/topology/TopologyManager.java      |  80 -------
 .../topology/locking/TopologyLockingTest.java   | 209 +++++++++++++++++++
 14 files changed, 263 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
index a6f74df..df2c217 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
@@ -35,7 +35,6 @@ import org.apache.stratos.cloud.controller.interfaces.Iaas;
 import org.apache.stratos.cloud.controller.persist.Deserializer;
 import org.apache.stratos.cloud.controller.pojo.*;
 import org.apache.stratos.cloud.controller.pojo.application.ApplicationContext;
-import org.apache.stratos.cloud.controller.pojo.payload.MetaDataHolder;
 import org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisher;
 import org.apache.stratos.cloud.controller.registry.RegistryManager;
 import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
@@ -49,11 +48,6 @@ import org.apache.stratos.messaging.domain.topology.ConfigCompositeApplication;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.util.Constants;
-import org.apache.stratos.metadata.client.DefaultMetaDataServiceClient;
-import org.apache.stratos.metadata.client.MetaDataServiceClient;
-import org.apache.stratos.metadata.client.config.MetaDataClientConfig;
-import org.apache.stratos.metadata.client.exception.MetaDataServiceClientExeption;
-import org.apache.stratos.metadata.client.exception.RestClientException;
 import org.jclouds.compute.ComputeService;
 import org.jclouds.compute.domain.NodeMetadata;
 import org.jclouds.compute.domain.NodeMetadataBuilder;

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
index 30b1b00..4aa7023 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
@@ -664,12 +664,13 @@ public class TopologyBuilder {
 
             if (!topology.applicationExists(applicationId)) {
                 log.warn("Application with id [ " + applicationId + " ] doesn't exist in Topology");
-                TopologyEventPublisher.sendApplicationRemovedEvent(applicationId, tenantId, tenantDomain);
+                //TopologyEventPublisher.sendApplicationRemovedEvent(applicationId, tenantId, tenantDomain);
 
             } else {
                 Application application = topology.getApplication(applicationId);
+                Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
                 // remove clusters
-                for (ClusterDataHolder clusterDataHolder : application.getClusterDataRecursively()) {
+                for (ClusterDataHolder clusterDataHolder : clusterData) {
                     Service service = topology.getService(clusterDataHolder.getServiceType());
                     if (service != null) {
                         // remove Cluster
@@ -696,7 +697,7 @@ public class TopologyBuilder {
 
                 log.info("Removed application [ " + applicationId + " ] from Topology");
 
-                TopologyEventPublisher.sendApplicationRemovedEvent(applicationId, tenantId, tenantDomain);
+                TopologyEventPublisher.sendApplicationRemovedEvent(applicationId, clusterData, tenantId, tenantDomain);
             }
 
         } finally {

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
index 53efeaa..a6d8350 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
@@ -39,6 +39,7 @@ import org.apache.stratos.messaging.util.Constants;
 
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * this is to send the relevant events from cloud controller to topology topic
@@ -99,13 +100,14 @@ public class TopologyEventPublisher {
         publishEvent(new ApplicationCreatedEvent(application, clusters));
     }
 
-    public static void sendApplicationRemovedEvent(String applicationId, int tenantId, String tenantDomain) {
+    public static void sendApplicationRemovedEvent(String applicationId, Set<ClusterDataHolder> clusterData,
+                                                   int tenantId, String tenantDomain) {
 
         if(log.isInfoEnabled() || log.isDebugEnabled()) {
             log.info("Publishing Application removed event: " + applicationId + " tenantId: " + tenantId);
         }
         
-        publishEvent(new ApplicationRemovedEvent(applicationId, tenantId, tenantDomain));
+        publishEvent(new ApplicationRemovedEvent(applicationId, clusterData, tenantId, tenantDomain));
     }
 
     public static void sendClusterRemovedEvent(ClusterContext ctxt, String deploymentPolicy) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationRemovedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationRemovedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationRemovedEvent.java
index 1e51321..8b0f2cb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationRemovedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ApplicationRemovedEvent.java
@@ -19,6 +19,11 @@
 
 package org.apache.stratos.messaging.event.topology;
 
+
+import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
+
+import java.util.Set;
+
 public class ApplicationRemovedEvent extends TopologyEvent {
 
     /**
@@ -26,11 +31,15 @@ public class ApplicationRemovedEvent extends TopologyEvent {
 	 */
 	private static final long serialVersionUID = -5499420725533165623L;
 	private String applicationId;
+    private Set<ClusterDataHolder> clusterData;
+
     private String tenantDomain;
     private int tenantId;
 
-    public ApplicationRemovedEvent (String applicationId, int tenantId, String tenantDomain) {
+    public ApplicationRemovedEvent (String applicationId, Set<ClusterDataHolder> clusterData,
+                                    int tenantId, String tenantDomain) {
         this.applicationId = applicationId;
+        this.clusterData = clusterData;
         this.tenantId = tenantId;
         this.tenantDomain = tenantDomain;
     }
@@ -46,4 +55,8 @@ public class ApplicationRemovedEvent extends TopologyEvent {
     public String getTenantDomain() {
         return tenantDomain;
     }
+
+    public Set<ClusterDataHolder> getClusterData() {
+        return clusterData;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
index 803a871..4211830 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
@@ -57,7 +57,6 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor {
             ApplicationActivatedEvent event = (ApplicationActivatedEvent) Util.
                     jsonToObject(message, ApplicationActivatedEvent.class);
 
-            TopologyManager.acquireReadLockForApplications();
             TopologyManager.acquireWriteLockForApplication(event.getAppId());
 
             try {
@@ -65,7 +64,6 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor {
 
             } finally {
                 TopologyManager.releaseWriteLockForApplication(event.getAppId());
-                TopologyManager.releaseReadLockForApplications();
             }
 
         } else {

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
index 4368bd7..c47867e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
@@ -22,12 +22,15 @@ package org.apache.stratos.messaging.message.processor.topology;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
+import java.util.Set;
+
 public class ApplicationCreatedMessageProcessor extends MessageProcessor {
 
     private static final Log log = LogFactory.getLog(ApplicationCreatedMessageProcessor.class);
@@ -55,11 +58,17 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
             }
 
             TopologyManager.acquireWriteLockForApplications();
+            for (ClusterDataHolder clusterData : event.getApplication().getClusterDataRecursively()) {
+                TopologyManager.acquireWriteLockForService(clusterData.getServiceType());
+            }
 
             try {
                 return doProcess(event, topology);
 
             } finally {
+                for (ClusterDataHolder clusterData : event.getApplication().getClusterDataRecursively()) {
+                    TopologyManager.releaseWriteLockForService(clusterData.getServiceType());
+                }
                 TopologyManager.releaseWriteLockForApplications();
             }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
index ed6c2d4..629f21b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
@@ -21,6 +21,8 @@ package org.apache.stratos.messaging.message.processor.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
+import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ApplicationRemovedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -62,12 +64,18 @@ public class ApplicationRemovedMessageProcessor extends MessageProcessor {
 	        }
 
             TopologyManager.acquireWriteLockForApplications();
+            for (ClusterDataHolder clusterData : event.getClusterData()) {
+                TopologyManager.acquireWriteLockForService(clusterData.getServiceType());
+            }
 
             try {
                 return doProcess(event, topology);
 
             } finally {
                 TopologyManager.releaseWriteLockForApplications();
+                for (ClusterDataHolder clusterData : event.getClusterData()) {
+                    TopologyManager.releaseWriteLockForService(clusterData.getServiceType());
+                }
             }
         
 	    } else {
@@ -102,6 +110,21 @@ public class ApplicationRemovedMessageProcessor extends MessageProcessor {
             topology.removeApplication(appId);
         }
 
+        if (event.getClusterData() != null) {
+            // remove the Clusters from the Topology
+            for (ClusterDataHolder clusterData : event.getClusterData()) {
+                Service service = topology.getService(clusterData.getServiceType());
+                if (service != null) {
+                    service.removeCluster(clusterData.getClusterId());
+                    if (log.isDebugEnabled()) {
+                        log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology");
+                    }
+                }  else {
+                    log.warn("Service " + clusterData.getServiceType() + " not found in Topology!");
+                }
+            }
+        }
+
         if (log.isDebugEnabled()) {
             log.debug("ApplicationRemovedMessageProcessor notifying listener ");
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
index 78f772b..601cfb2 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
@@ -58,14 +58,12 @@ public class ClusterActivatedProcessor extends MessageProcessor {
             ClusterActivatedEvent event = (ClusterActivatedEvent) Util.
                     jsonToObject(message, ClusterActivatedEvent.class);
 
-            TopologyManager.acquireReadLockForServices();
             TopologyManager.acquireWriteLockForService(event.getServiceName());
             try {
                 return doProcess(event, topology);
 
             } finally {
                 TopologyManager.releaseWriteLockForService(event.getServiceName());
-                TopologyManager.releaseReadLockForServices();
             }
 
         }  else {

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
index 94b9650..6eddfd3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -53,14 +53,12 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
 
-            TopologyManager.acquireReadLockForServices();
             TopologyManager.acquireWriteLockForService(event.getServiceName());
             try {
                 return doProcess(event, topology);
 
             } finally {
                 TopologyManager.releaseWriteLockForService(event.getServiceName());
-                TopologyManager.releaseReadLockForServices();
             }
 
         } else {

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
index 8629363..0b10504 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
@@ -50,14 +50,12 @@ public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor {
             ClusterMaintenanceModeEvent event = (ClusterMaintenanceModeEvent) Util.
                                 jsonToObject(message, ClusterMaintenanceModeEvent.class);
 
-            TopologyManager.acquireReadLockForServices();
             TopologyManager.acquireWriteLockForService(event.getServiceName());
             try {
                 return doProcess(event, topology);
 
             } finally {
                 TopologyManager.releaseWriteLockForService(event.getServiceName());
-                TopologyManager.releaseReadLockForServices();
             }
 
         } else {

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
index 1dfb929..fb45cd3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -51,14 +51,12 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
 
-            TopologyManager.acquireReadLockForServices();
             TopologyManager.acquireWriteLockForService(event.getServiceName());
             try {
                 return doProcess(event, topology);
 
             } finally {
                 TopologyManager.releaseWriteLockForService(event.getServiceName());
-                TopologyManager.releaseReadLockForServices();
             }
 
         } else {

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
index 7200431..3bf5fad 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
@@ -51,7 +51,6 @@ public class GroupActivatedProcessor extends MessageProcessor {
             GroupActivatedEvent event = (GroupActivatedEvent) Util.
                     jsonToObject(message, GroupActivatedEvent.class);
 
-            TopologyManager.acquireReadLockForApplications();
             TopologyManager.acquireWriteLockForApplication(event.getAppId());
 
             try {
@@ -59,7 +58,6 @@ public class GroupActivatedProcessor extends MessageProcessor {
 
             } finally {
                 TopologyManager.releaseWriteLockForApplication(event.getAppId());
-                TopologyManager.releaseReadLockForApplications();
             }
 
         } else {

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
index 2ffd7f6..ed3b16a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
@@ -21,13 +21,10 @@ package org.apache.stratos.messaging.message.receiver.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Application;
-import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
 import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
 
-import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
@@ -335,27 +332,6 @@ public class TopologyManager {
         // acquire read lock for all Applications
         acquireReadLockForApplications();
 
-        // get the Application's cluster's and acquire read locks
-        Application application = topology.getApplication(appId);
-        if (application == null) {
-            log.warn("Application " + appId + " is not found in the Topology");
-
-        } else {
-
-            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
-            if (clusterData != null && !clusterData.isEmpty()) {
-                for (ClusterDataHolder clusterDataHolder : clusterData) {
-                    // acquire read locks for services and clusters
-                    acquireReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
-                }
-
-            } else {
-               if (log.isDebugEnabled()) {
-                   log.debug("No Cluster Data found in Application " + appId);
-               }
-            }
-        }
-
         TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
         if (topologyAppLock == null)  {
             handleLockNotFound("Topology lock not found for Application " + appId);
@@ -385,25 +361,6 @@ public class TopologyManager {
 
         // release read lock for all Applications
         releaseReadLockForApplications();
-
-        // get the Application's cluster information
-        Application application = topology.getApplication(appId);
-        if (application == null) {
-            log.warn("Application " + appId + " is not found in the Topology");
-
-        } else {
-            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
-            if (clusterData != null && !clusterData.isEmpty()) {
-                for (ClusterDataHolder clusterDataHolder : clusterData) {
-                    // release read locks for clusters and services
-                    releaseReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
-                }
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("No Cluster Data found in Application " + appId);
-                }
-            }
-        }
     }
 
     public static synchronized void acquireWriteLockForApplication (String appId) {
@@ -411,24 +368,6 @@ public class TopologyManager {
         // acquire read lock for all Applications
         acquireReadLockForApplications();
 
-        // get the Application's cluster's and acquire read locks
-        Application application = topology.getApplication(appId);
-        if (application == null) {
-            log.warn("Application " + appId + " is not found in the Topology");
-
-        } else {
-            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
-            if (clusterData != null && !clusterData.isEmpty()) {
-                for (ClusterDataHolder clusterDataHolder : clusterData) {
-                    acquireWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
-                }
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("No Cluster Data found in Application " + appId);
-                }
-            }
-        }
-
         TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
         if (topologyAppLock == null)  {
             handleLockNotFound("Topology lock not found for Application " + appId);
@@ -458,25 +397,6 @@ public class TopologyManager {
 
         // release read lock for all Applications
         releaseReadLockForApplications();
-
-        // get the Application's cluster's and acquire read
-        Application application = topology.getApplication(appId);
-        if (application == null) {
-            log.warn("Application " + appId + " is not found in the Topology");
-
-        } else {
-            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
-            if (clusterData != null && !clusterData.isEmpty()) {
-                for (ClusterDataHolder clusterDataHolder : clusterData) {
-                    // release read locks for clusters and services
-                    releaseWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
-                }
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("No Cluster Data found in Application " + appId);
-                }
-            }
-        }
     }
 
     private static void handleLockNotFound (String errorMsg) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/622ce9fb/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/topology/locking/TopologyLockingTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/topology/locking/TopologyLockingTest.java b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/topology/locking/TopologyLockingTest.java
new file mode 100644
index 0000000..5d33904
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/topology/locking/TopologyLockingTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.topology.locking;
+
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.junit.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+//@RunWith(ConcurrentRunner.class)
+//@Concurrent(threads = 12)
+public class TopologyLockingTest {
+
+    private static Topology topology;
+
+    @BeforeClass
+    public static void setUpBeforeClass() {
+        System.out.println("Setting up TopologyLockingTest");
+        topology = TopologyManager.getTopology();
+
+        //add Services
+        topology.addService(new Service("service1", ServiceType.SingleTenant));
+        topology.addService(new Service("service2", ServiceType.SingleTenant));
+        topology.addService(new Service("service3", ServiceType.SingleTenant));
+        topology.addService(new Service("service4", ServiceType.SingleTenant));
+
+        // add Clusters
+        topology.getService("service1").addCluster(new Cluster("service1", "service1.cluster1.domain",
+                "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+        topology.getService("service1").addCluster(new Cluster("service1", "service1.cluster2.domain",
+                "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+
+        topology.getService("service2").addCluster(new Cluster("service2", "service2.cluster1.domain",
+                "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+        topology.getService("service2").addCluster(new Cluster("service2", "service2.cluster2.domain",
+                "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+
+        topology.getService("service3").addCluster(new Cluster("service3", "service3.cluster1.domain",
+                "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+        topology.getService("service3").addCluster(new Cluster("service3", "service3.cluster2.domain",
+                "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+
+        topology.getService("service4").addCluster(new Cluster("service4", "service4.cluster1.domain",
+                "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+        topology.getService("service4").addCluster(new Cluster("service4", "service4.cluster2.domain",
+                "dummyDeploymentPolicy", "dummyAutoscalePolicy", null));
+
+        // Create Application 1
+        Application app1 = new Application("app1");
+        Map<String, ClusterDataHolder> clusterDataMap1 = new HashMap<String, ClusterDataHolder>();
+        clusterDataMap1.put("alias1", new ClusterDataHolder("service1", "service1.cluster1.domain"));
+        clusterDataMap1.put("alias2", new ClusterDataHolder("service1", "service1.cluster2.domain"));
+        clusterDataMap1.put("alias3", new ClusterDataHolder("service2", "service2.cluster1.domain"));
+        clusterDataMap1.put("alias4", new ClusterDataHolder("service2", "service2.cluster2.domain"));
+
+        // add cluster data to Application
+        app1.setClusterData(clusterDataMap1);
+
+        // add Applicaiton to Topology
+        topology.addApplication(app1);
+
+        // Create Application 2
+        Application app2 = new Application("app2");
+        Map<String, ClusterDataHolder> clusterDataMap2 = new HashMap<String, ClusterDataHolder>();
+        clusterDataMap2.put("alias5", new ClusterDataHolder("service3", "service3.cluster1.domain"));
+        clusterDataMap2.put("alias6", new ClusterDataHolder("service3", "service3.cluster2.domain"));
+        clusterDataMap2.put("alias7", new ClusterDataHolder("service4", "service4.cluster1.domain"));
+        clusterDataMap2.put("alias8", new ClusterDataHolder("service4", "service4.cluster2.domain"));
+
+        // add cluster data to Application
+        app2.setClusterData(clusterDataMap2);
+
+        // add Applicaiton to Topology
+        topology.addApplication(app2);
+    }
+
+    @Test
+    public void testAqcuireAndReleaseReadLocksForServices1To2 () {
+
+        TopologyManager.acquireReadLockForService("service1");
+        TopologyManager.acquireReadLockForService("service2");
+
+        TopologyManager.releaseReadLockForService("service1");
+        TopologyManager.releaseReadLockForService("service2");
+    }
+
+    @Test
+    public void testAqcuireAndReleaseReadLocksForServices3To4 () {
+
+        TopologyManager.acquireReadLockForService("service3");
+        TopologyManager.acquireReadLockForService("service4");
+
+        TopologyManager.releaseReadLockForService("service3");
+        TopologyManager.releaseReadLockForService("service4");
+    }
+
+    @Test
+    public void testAqcuireAndReleaseWriteLocksForServices1To2 () {
+
+        TopologyManager.acquireWriteLockForService("service1");
+        TopologyManager.acquireWriteLockForService("service2");
+
+        TopologyManager.releaseWriteLockForService("service1");
+        TopologyManager.releaseWriteLockForService("service2");
+    }
+
+    @Test
+    public void testAqcuireAndReleaseWriteLocksForServices3To4 () {
+
+        TopologyManager.acquireWriteLockForService("service3");
+        TopologyManager.acquireWriteLockForService("service4");
+
+        TopologyManager.releaseWriteLockForService("service3");
+        TopologyManager.releaseWriteLockForService("service4");
+    }
+
+    @Test
+    public void testAqcuireAndReleaseReadLocksForClustersOfService1 () {
+
+        TopologyManager.acquireReadLockForCluster("service1", "service1.cluster1.domain");
+        TopologyManager.acquireReadLockForCluster("service1", "service1.cluster2.domain");
+
+        TopologyManager.releaseReadLockForCluster("service1", "service1.cluster1.domain");
+        TopologyManager.releaseReadLockForCluster("service1", "service1.cluster2.domain");
+    }
+
+    @Test
+    public void testAqcuireAndReleaseReadLocksForClustersOfService2 () {
+
+        TopologyManager.acquireReadLockForCluster("service2", "service2.cluster1.domain");
+        TopologyManager.acquireReadLockForCluster("service2", "service2.cluster2.domain");
+
+        TopologyManager.releaseReadLockForCluster("service2", "service2.cluster1.domain");
+        TopologyManager.releaseReadLockForCluster("service2", "service2.cluster2.domain");
+    }
+
+    @Test
+    public void testAqcuireAndReleaseWriteLocksForClustersOfService1 () {
+
+        TopologyManager.acquireWriteLockForCluster("service1", "service1.cluster1.domain");
+        TopologyManager.acquireWriteLockForCluster("service1", "service1.cluster2.domain");
+
+        TopologyManager.acquireWriteLockForCluster("service1", "service1.cluster1.domain");
+        TopologyManager.acquireWriteLockForCluster("service1", "service1.cluster2.domain");
+    }
+
+    @Test
+    public void testAqcuireAndReleaseWriteLocksForClustersOfService2 () {
+
+        TopologyManager.acquireWriteLockForCluster("service2", "service2.cluster1.domain");
+        TopologyManager.acquireWriteLockForCluster("service2", "service2.cluster2.domain");
+
+        TopologyManager.acquireWriteLockForCluster("service2", "service2.cluster1.domain");
+        TopologyManager.acquireWriteLockForCluster("service2", "service2.cluster2.domain");
+    }
+
+    @Test
+    public void testAcquireAndReleaseReadLockForApp1 () {
+
+        TopologyManager.acquireReadLockForApplication("app1");
+        TopologyManager.releaseReadLockForApplication("app1");
+    }
+
+    @Test
+    public void testAcquireAndReleaseWriteLockForApp1 () {
+
+        TopologyManager.acquireWriteLockForApplication("app1");
+        TopologyManager.releaseWriteLockForApplication("app1");
+    }
+
+    @Test
+    public void testAcquireAndReleaseReadLockForApp2 () {
+
+        TopologyManager.acquireReadLockForApplication("app2");
+        TopologyManager.releaseReadLockForApplication("app2");
+    }
+
+    @Test
+    public void testAcquireAndReleaseWriteLockForApp2 () {
+
+        TopologyManager.acquireWriteLockForApplication("app2");
+        TopologyManager.releaseWriteLockForApplication("app2");
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        System.out.println("Cleaning up TopologyLockingTest");
+        topology = null;
+    }
+}


[2/4] initial changes for hierarchical topology locking

Posted by is...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
index 67c9b67..4473add 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberSuspendedMessageProcessor extends MessageProcessor {
@@ -54,96 +55,108 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberSuspendedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
             }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            return false;
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                 }
+                return false;
             }
+        }
 
-            if (member.getStatus() == MemberStatus.Suspended) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	member.setStatus(MemberStatus.Suspended);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        if (member.getStatus() == MemberStatus.Suspended) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.Suspended);
 
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
index 5b5cbc9..3619b53 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
@@ -29,6 +29,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberTerminatedMessageProcessor extends MessageProcessor {
@@ -53,87 +54,99 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberTerminatedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if(member != null) {
-                // Apply member filter
-                if(TopologyMemberFilter.getInstance().isActive()) {
-                    if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                        }
-                        return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+            }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if(member != null) {
+            // Apply member filter
+            if(TopologyMemberFilter.getInstance().isActive()) {
+                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                     }
+                    return false;
                 }
             }
+        }
 
-            // Notify event listeners before removing member object
-            notifyEventListeners(event);
+        // Notify event listeners before removing member object
+        notifyEventListeners(event);
 
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	// Remove member from the cluster
-            	cluster.removeMember(member);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
-
-            return true;
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            // Remove member from the cluster
+            cluster.removeMember(member);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
index 2c216f0..1c4be8f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ServiceCreatedMessageProcessor extends MessageProcessor {
@@ -43,44 +44,21 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor {
 
         if (ServiceCreatedEvent.class.getName().equals(type)) {
             // Return if topology has not been initialized
-            if (!topology.isInitialized())
+            if (!topology.isInitialized()) {
                 return false;
+            }
 
             // Parse complete message and build event
             ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
-            }
+            TopologyManager.acquireWriteLockForServices();
+            try {
+                return doProcess(event, topology);
 
-            // Validate event against the existing topology
-            if (topology.serviceExists(event.getServiceName())) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service already created: [service] %s", event.getServiceName()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	Service service = new Service(event.getServiceName(), event.getServiceType());
-            	service.addPorts(event.getPorts());
-            	topology.addService(service);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Service created: [service] %s", event.getServiceName()));
-            	}
+            } finally {
+                TopologyManager.releaseWriteLockForServices();
             }
 
-
-            notifyEventListeners(event);
-            return true;
-
         } else {
             if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
@@ -90,4 +68,40 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor {
             }
         }
     }
+
+    private boolean doProcess (ServiceCreatedEvent event, Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                }
+                return false;
+            }
+        }
+
+        // Validate event against the existing topology
+        if (topology.serviceExists(event.getServiceName())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service already created: [service] %s", event.getServiceName()));
+            }
+        } else {
+
+            // Apply changes to the topology
+            Service service = new Service(event.getServiceName(), event.getServiceType());
+            service.addPorts(event.getPorts());
+            topology.addService(service);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Service created: [service] %s", event.getServiceName()));
+            }
+        }
+
+
+        notifyEventListeners(event);
+        return true;
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
index 2c0bc70..a38cbdc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ServiceRemovedMessageProcessor extends MessageProcessor {
@@ -49,38 +50,14 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
-            }
-
-            // Notify event listeners before removing service object
-            notifyEventListeners(event);
+            TopologyManager.acquireWriteLockForServices();
+            try {
+                return doProcess(event, topology);
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	topology.removeService(service);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Service removed: [service] %s", event.getServiceName()));
-            	}
+            } finally {
+                TopologyManager.releaseWriteLockForServices();
             }
 
-            return true;
         } else {
             if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
@@ -90,4 +67,40 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
             }
         }
     }
+
+    private boolean doProcess (ServiceRemovedEvent event, Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                }
+                return false;
+            }
+        }
+
+        // Notify event listeners before removing service object
+        notifyEventListeners(event);
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+        } else {
+
+            // Apply changes to the topology
+            topology.removeService(service);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Service removed: [service] %s", event.getServiceName()));
+            }
+        }
+
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index e3ddfa3..db9e8b1 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -47,7 +47,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
     private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
     private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
     private GroupActivatedProcessor groupActivatedProcessor;
-    private CompositeApplicationRemovedMessageProcessor compositeApplicationRemovedMessageProcessor;
+    //private CompositeApplicationRemovedMessageProcessor compositeApplicationRemovedMessageProcessor;
     private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
     private ApplicationRemovedMessageProcessor applicationRemovedMessageProcessor;
     private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor;
@@ -109,11 +109,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
         add(applicationActivatedMessageProcessor);
 
         if (log.isDebugEnabled()) {
-            log.debug("Grouping: added applicationCreatedMessageProcessor, applicationRemovedMessageProcessor: " +
-            		applicationCreatedMessageProcessor + " / " + applicationRemovedMessageProcessor);
-        }
-
-        if (log.isDebugEnabled()) {
             log.debug("Topology message processor chain initialized X1");
         }
     }
@@ -153,9 +148,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
             applicationCreatedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ApplicationRemovedEventListener) {
             applicationRemovedMessageProcessor.addEventListener(eventListener);
-        	if (log.isDebugEnabled()) {
-                log.debug("Grouping: added eventlistener to applicationCreatedMessageProcessor: " + eventListener);
-            }
         } else if (eventListener instanceof ApplicationActivatedEventListener) {
             applicationActivatedMessageProcessor.addEventListener(eventListener);
         } else {

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 9cc8f78..218c441 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -69,15 +69,15 @@ class TopologyEventMessageDelegator implements Runnable {
                         log.debug(String.format("Topology event message [%s] received from queue: %s", type, messageQueue.getClass()));
                     }
 
-                    try {
-                        TopologyManager.acquireWriteLock();
+//                    try {
+//                        TopologyManager.acquireWriteLock();
                         if (log.isDebugEnabled()) {
                             log.debug(String.format("Delegating topology event message: %s", type));
                         }
                         processorChain.process(type, json, TopologyManager.getTopology());
-                    } finally {
-                        TopologyManager.releaseWriteLock();
-                    }
+//                    } finally {
+//                        TopologyManager.releaseWriteLock();
+//                    }
 
                 } catch (Exception e) {
                     log.error("Failed to retrieve topology event message", e);

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
index 5df66bd..2ffd7f6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
@@ -21,7 +21,13 @@ package org.apache.stratos.messaging.message.receiver.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
 import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
+
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
@@ -30,43 +36,454 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  *  Usage:
  *  Acquire a relevant lock and invoke the getTopology() method inside a try block.
  *  Once processing is done release the lock using a finally block.
+ *
+ *  Acquiring Locks:
+ *
+ *  Stratos supports hierarchical locking. As per the practice, we need to lock the
+ *  hierarchy from root level till the relevant sub tree.
+ *
+ *  Acquire a write lock:
+ *
+ *  From root level, acquire read lock, and acquire a write lock only for the
+ *  relevant sub tree.
+ *
+ *  Acquire a read lock:
+ *
+ *  From root level, acquire read locks till the relevant sub tree
+ *
+ *  Examples -
+ *
+ *  Example 1: Acquiring write lock for a Cluster to modify the Cluster object -
+ *           acquiring:
+ *           1. acquire read lock for all Services
+ *           2. acquire read lock for the particular Service, to which the cluster belongs to
+ *           3. acquire write lock for the Cluster
+ *
+ *           releasing:
+ *           1. release write lock for the Cluster
+ *           2. release read lock for the particular Service
+ *           3. release read lock for all Services
+ *
+ *  Example 2: Acquiring write lock to add a new Cluster object -
+ *           acquiring:
+ *           1. acquire read lock for all Services
+ *           2. acquire write lock for the particular Service, to which the cluster belongs to
+ *
+ *           releasing:
+ *           1. release write lock for the particular Service
+ *           2. release read lock for all Services
+ *
+ *  Example 3: Acquiring read lock to read Cluster information
+ *           acquiring:
+ *           1. acquire read lock for all Services
+ *           2. acquire read lock for the particular Service, to which the cluster belongs to
+ *           3. acquire read lock for the relevant Cluster
+ *
+ *           releasing:
+ *           1. release read lock for the relevant Cluster
+ *           2. release read lock for the particular Service
+ *           3. release read lock for all Services
+ *
+ *  Example 4: Acquiring the write lock to add a deploy a Cartridge (add a new Service)
+ *           acquire:
+ *           1. acquire write lock for all Services
+ *
+ *           release:
+ *           1. release write lock for all Services
  */
 public class TopologyManager {
     private static final Log log = LogFactory.getLog(TopologyManager.class);
 
     private static volatile Topology topology;
+    private static final TopologyLockHierarchy topologyLockHierarchy =
+            TopologyLockHierarchy.getInstance();
     private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
     private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
     private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
 
+
+    // Top level locks - should be used to lock the entire Topology
+
     public static void acquireReadLock() {
         if(log.isDebugEnabled()) {
-            log.debug("Read lock acquired");
+            log.debug("Read lock acquired for Topology");
         }
         readLock.lock();
     }
 
     public static void releaseReadLock() {
         if(log.isDebugEnabled()) {
-            log.debug("Read lock released");
+            log.debug("Read lock released for Topology");
         }
         readLock.unlock();
     }
 
     public static void acquireWriteLock() {
         if(log.isDebugEnabled()) {
-            log.debug("Write lock acquired");
+            log.debug("Write lock acquired for Topology");
         }
         writeLock.lock();
     }
 
     public static void releaseWriteLock() {
         if(log.isDebugEnabled()) {
-            log.debug("Write lock released");
+            log.debug("Write lock released for Topology");
         }
         writeLock.unlock();
     }
 
+    // Application, Service and Cluster read locks
+
+    public static void acquireReadLockForApplications() {
+        if(log.isDebugEnabled()) {
+            log.debug("Read lock acquired for Applications");
+        }
+        topologyLockHierarchy.getApplicatioLock().acquireReadLock();
+    }
+
+    public static void releaseReadLockForApplications() {
+        if(log.isDebugEnabled()) {
+            log.debug("Read lock released for Applications");
+        }
+        topologyLockHierarchy.getApplicatioLock().releaseReadLock();
+    }
+
+    public static void acquireReadLockForServices() {
+        if(log.isDebugEnabled()) {
+            log.debug("Read lock acquired for Services");
+        }
+        topologyLockHierarchy.getServiceLock().acquireReadLock();
+    }
+
+    public static void releaseReadLockForServices() {
+        if(log.isDebugEnabled()) {
+            log.debug("Read lock released for Services");
+        }
+        topologyLockHierarchy.getServiceLock().releaseReadLock();
+    }
+
+    // Application, Service and Cluster write locks
+
+    public static void acquireWriteLockForApplications() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock acquired for Applications");
+        }
+        topologyLockHierarchy.getApplicatioLock().acquireWriteLock();
+    }
+
+    public static void releaseWriteLockForApplications() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock released for Applications");
+        }
+        topologyLockHierarchy.getApplicatioLock().releaseWritelock();
+    }
+
+    public static void acquireWriteLockForServices() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock acquired for Services");
+        }
+        topologyLockHierarchy.getServiceLock().acquireWriteLock();
+    }
+
+    public static void releaseWriteLockForServices() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock released for Services");
+        }
+        topologyLockHierarchy.getServiceLock().releaseWritelock();
+    }
+
+    public static void acquireReadLockForService (String serviceName) {
+
+        // acquire read lock for all Services
+        acquireReadLockForServices();
+
+        TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+        if (topologyServiceLock == null) {
+            handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+        } else {
+            topologyServiceLock.acquireReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock acquired for Service " + serviceName);
+            }
+        }
+    }
+
+    public static void releaseReadLockForService (String serviceName) {
+
+        TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+        if (topologyServiceLock == null) {
+            handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+        } else {
+            topologyServiceLock.releaseReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock released for Service " + serviceName);
+            }
+        }
+
+        // release read lock for all Services
+        releaseReadLockForServices();
+    }
+
+    public static void acquireWriteLockForService (String serviceName) {
+
+        // acquire read lock for all Applications
+        acquireReadLockForServices();
+
+        TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+        if (topologyServiceLock == null) {
+            handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+        } else {
+            topologyServiceLock.acquireWriteLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock acquired for Service " + serviceName);
+            }
+        }
+    }
+
+    public static void releaseWriteLockForService (String serviceName) {
+
+        TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+        if (topologyServiceLock == null) {
+            handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+        } else {
+            topologyServiceLock.releaseWritelock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock released for Service " + serviceName);
+            }
+        }
+
+        // release read lock for all Services
+        releaseReadLockForServices();
+    }
+
+    public static void acquireReadLockForCluster (String serviceName, String clusterId) {
+
+        // acquire read lock for the relevant Services
+        acquireReadLockForService(serviceName);
+
+        TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+        if (topologyClusterLock == null) {
+            handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+        }  else {
+            // acquire read lock for the relevant Cluster
+            topologyClusterLock.acquireReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock acquired for Cluster " + clusterId);
+            }
+        }
+    }
+
+    public static void releaseReadLockForCluster (String serviceName, String clusterId) {
+
+        TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+        if (topologyClusterLock == null) {
+            handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+        } else {
+            // release read lock for the relevant Cluster
+            topologyClusterLock.releaseReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock released for Cluster " + clusterId);
+            }
+        }
+
+        // release read lock for relevant Service
+        releaseReadLockForService(serviceName);
+    }
+
+    public static void acquireWriteLockForCluster (String serviceName, String clusterId) {
+
+        // acquire read lock for the relevant Services
+        acquireReadLockForService(serviceName);
+
+        TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+        if (topologyClusterLock == null) {
+            handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+        } else {
+            topologyClusterLock.acquireWriteLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock acquired for Cluster " + clusterId);
+            }
+        }
+    }
+
+    public static void releaseWriteLockForCluster (String serviceName, String clusterId) {
+
+        TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+        if (topologyClusterLock == null) {
+            handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+        } else {
+            topologyClusterLock.releaseWritelock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock released for Cluster " + clusterId);
+            }
+        }
+
+        // release read lock for relevant Service
+        releaseReadLockForService(serviceName);
+    }
+
+    public static void acquireReadLockForApplication (String appId) {
+
+        // acquire read lock for all Applications
+        acquireReadLockForApplications();
+
+        // get the Application's cluster's and acquire read locks
+        Application application = topology.getApplication(appId);
+        if (application == null) {
+            log.warn("Application " + appId + " is not found in the Topology");
+
+        } else {
+
+            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+            if (clusterData != null && !clusterData.isEmpty()) {
+                for (ClusterDataHolder clusterDataHolder : clusterData) {
+                    // acquire read locks for services and clusters
+                    acquireReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+                }
+
+            } else {
+               if (log.isDebugEnabled()) {
+                   log.debug("No Cluster Data found in Application " + appId);
+               }
+            }
+        }
+
+        TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+        if (topologyAppLock == null)  {
+            handleLockNotFound("Topology lock not found for Application " + appId);
+
+        } else {
+            // now, lock Application
+            topologyAppLock.acquireReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock acquired for Application " + appId);
+            }
+        }
+    }
+
+    public static void releaseReadLockForApplication (String appId) {
+
+        TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+        if (topologyAppLock == null)  {
+            handleLockNotFound("Topology lock not found for Application " + appId);
+
+        } else {
+            // release App lock
+            topologyAppLock.releaseReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock released for Application " + appId);
+            }
+        }
+
+        // release read lock for all Applications
+        releaseReadLockForApplications();
+
+        // get the Application's cluster information
+        Application application = topology.getApplication(appId);
+        if (application == null) {
+            log.warn("Application " + appId + " is not found in the Topology");
+
+        } else {
+            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+            if (clusterData != null && !clusterData.isEmpty()) {
+                for (ClusterDataHolder clusterDataHolder : clusterData) {
+                    // release read locks for clusters and services
+                    releaseReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("No Cluster Data found in Application " + appId);
+                }
+            }
+        }
+    }
+
+    public static synchronized void acquireWriteLockForApplication (String appId) {
+
+        // acquire read lock for all Applications
+        acquireReadLockForApplications();
+
+        // get the Application's cluster's and acquire read locks
+        Application application = topology.getApplication(appId);
+        if (application == null) {
+            log.warn("Application " + appId + " is not found in the Topology");
+
+        } else {
+            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+            if (clusterData != null && !clusterData.isEmpty()) {
+                for (ClusterDataHolder clusterDataHolder : clusterData) {
+                    acquireWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("No Cluster Data found in Application " + appId);
+                }
+            }
+        }
+
+        TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+        if (topologyAppLock == null)  {
+            handleLockNotFound("Topology lock not found for Application " + appId);
+
+        } else {
+            // now, lock Application
+            topologyAppLock.acquireWriteLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock acquired for Application " + appId);
+            }
+        }
+    }
+
+    public static synchronized void releaseWriteLockForApplication (String appId) {
+
+        TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+        if (topologyAppLock == null)  {
+            handleLockNotFound("Topology lock not found for Application " + appId);
+
+        } else {
+            // release App lock
+            topologyAppLock.releaseWritelock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock released for Application " + appId);
+            }
+        }
+
+        // release read lock for all Applications
+        releaseReadLockForApplications();
+
+        // get the Application's cluster's and acquire read
+        Application application = topology.getApplication(appId);
+        if (application == null) {
+            log.warn("Application " + appId + " is not found in the Topology");
+
+        } else {
+            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+            if (clusterData != null && !clusterData.isEmpty()) {
+                for (ClusterDataHolder clusterDataHolder : clusterData) {
+                    // release read locks for clusters and services
+                    releaseWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("No Cluster Data found in Application " + appId);
+                }
+            }
+        }
+    }
+
+    private static void handleLockNotFound (String errorMsg) {
+        log.warn(errorMsg);
+        //throw new RuntimeException(errorMsg);
+    }
+
     public static Topology getTopology() {
         if (topology == null) {
             synchronized (TopologyManager.class){


[3/4] initial changes for hierarchical topology locking

Posted by is...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
index c007343..94b9650 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterCreatedMessageProcessor extends MessageProcessor {
@@ -41,83 +42,97 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
 
     @Override
     public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
 
+        Topology topology = (Topology) object;
         if (ClusterCreatedEvent.class.getName().equals(type)) {
             // Return if topology has not been initialized
-            if (!topology.isInitialized())
+            if (!topology.isInitialized()) {
                 return false;
+            }
 
             // Parse complete message and build event
             ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
-            }
+            TopologyManager.acquireReadLockForServices();
+            TopologyManager.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+            } finally {
+                TopologyManager.releaseWriteLockForService(event.getServiceName());
+                TopologyManager.releaseReadLockForServices();
             }
 
-            // Validate event properties
-            Cluster cluster = event.getCluster();
-            if(cluster == null) {
-                String msg = "Cluster object of cluster created event is null.";
-                log.error(msg);
-                throw new RuntimeException(msg);
-            }
-            if (cluster.getHostNames().isEmpty()) {
-                throw new RuntimeException("Host name/s not found in cluster created event");
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        }
+    }
+
+    private boolean doProcess (ClusterCreatedEvent event,Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            if (service.clusterExists(event.getClusterId())) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
-                            event.getClusterId()));
-                }
-			} else {
-
-				// Apply changes to the topology
-				service.addCluster(cluster);
-				if (log.isInfoEnabled()) {
-					log.info(String.format("Cluster created: %s",
-							cluster.toString()));
-				}
-			}
+        }
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                }
+                return false;
+            }
+        }
 
+        // Validate event properties
+        Cluster cluster = event.getCluster();
+        if(cluster == null) {
+            String msg = "Cluster object of cluster created event is null.";
+            log.error(msg);
+            throw new RuntimeException(msg);
+        }
+        if (cluster.getHostNames().isEmpty()) {
+            throw new RuntimeException("Host name/s not found in cluster created event");
+        }
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        if (service.clusterExists(event.getClusterId())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
+                        event.getClusterId()));
+            }
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+
+            // Apply changes to the topology
+            service.addCluster(cluster);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster created: %s",
+                        cluster.toString()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
index f125c54..8629363 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor {
@@ -49,64 +50,77 @@ public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor {
             ClusterMaintenanceModeEvent event = (ClusterMaintenanceModeEvent) Util.
                                 jsonToObject(message, ClusterMaintenanceModeEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireReadLockForServices();
+            TopologyManager.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForService(event.getServiceName());
+                TopologyManager.releaseReadLockForServices();
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (ClusterMaintenanceModeEvent event,Topology topology)  {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
+        }
 
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
-                            event.getClusterId()));
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
-			} else {
-			    // Apply changes to the topology
-                cluster.setStatus(Status.In_Maintenance);
-				if (log.isInfoEnabled()) {
-					log.info(String.format("Cluster updated as maintenance mode: %s",
-							cluster.toString()));
-				}
-			}
+                return false;
+            }
+        }
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
 
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
+                        event.getClusterId()));
+            }
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            // Apply changes to the topology
+            cluster.setStatus(Status.In_Maintenance);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster updated as maintenance mode: %s",
+                        cluster.toString()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
index 69ef5b0..1dfb929 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterRemovedMessageProcessor extends MessageProcessor {
@@ -50,65 +51,79 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireReadLockForServices();
+            TopologyManager.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForService(event.getServiceName());
+                TopologyManager.releaseReadLockForServices();
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (ClusterRemovedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
+        }
 
-            // Notify event listeners before removing the cluster object
-            notifyEventListeners(event);
-
-            if (!service.clusterExists(event.getClusterId())) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(),
-                            event.getClusterId()));
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
-            } else {
-            	
-            	// Apply changes to the topology
-            	service.removeCluster(event.getClusterId());
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
-            				event.getServiceName(), event.getClusterId()));
-            	}
+                return false;
+            }
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
             }
+            return false;
+        }
+
+        // Notify event listeners before removing the cluster object
+        notifyEventListeners(event);
 
-            return true;
+        if (!service.clusterExists(event.getClusterId())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(),
+                        event.getClusterId()));
+            }
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+
+            // Apply changes to the topology
+            service.removeCluster(event.getClusterId());
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
             }
         }
+
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
index 135bdae..6d5cb8f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 import java.util.ArrayList;
@@ -49,102 +50,20 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
         if (CompleteTopologyEvent.class.getName().equals(type)) {
         	// Parse complete message and build event
         	CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
-        	
-            // if topology has not already initialized
-			if (!topology.isInitialized()) {
-
-				// Apply service filter
-				if (TopologyServiceFilter.getInstance().isActive()) {
-					// Add services included in service filter
-					for (Service service : event.getTopology().getServices()) {
-						if (TopologyServiceFilter.getInstance()
-								.serviceNameIncluded(service.getServiceName())) {
-							topology.addService(service);
-						} else {
-							if (log.isDebugEnabled()) {
-								log.debug(String.format(
-										"Service is excluded: [service] %s",
-										service.getServiceName()));
-							}
-						}
-					}
-				} else {
-					// Add all services
-					topology.addServices(event.getTopology().getServices());
-				}
-
-				// Apply cluster filter
-				if (TopologyClusterFilter.getInstance().isActive()) {
-					for (Service service : topology.getServices()) {
-						List<Cluster> clustersToRemove = new ArrayList<Cluster>();
-						for (Cluster cluster : service.getClusters()) {
-							if (TopologyClusterFilter.getInstance()
-									.clusterIdExcluded(cluster.getClusterId())) {
-								clustersToRemove.add(cluster);
-							}
-						}
-						for (Cluster cluster : clustersToRemove) {
-							service.removeCluster(cluster);
-							if (log.isDebugEnabled()) {
-								log.debug(String.format(
-										"Cluster is excluded: [cluster] %s",
-										cluster.getClusterId()));
-							}
-						}
-					}
-				}
-
-				// Apply member filter
-				if (TopologyMemberFilter.getInstance().isActive()) {
-					for (Service service : topology.getServices()) {
-						for (Cluster cluster : service.getClusters()) {
-							List<Member> membersToRemove = new ArrayList<Member>();
-							for (Member member : cluster.getMembers()) {
-								if (TopologyMemberFilter.getInstance()
-										.lbClusterIdExcluded(
-												member.getLbClusterId())) {
-									membersToRemove.add(member);
-								}
-							}
-							for (Member member : membersToRemove) {
-								cluster.removeMember(member);
-								if (log.isDebugEnabled()) {
-									log.debug(String
-											.format("Member is excluded: [member] %s [lb-cluster-id] %s",
-													member.getMemberId(),
-													member.getLbClusterId()));
-								}
-							}
-						}
-					}
-				}
-
-                // add existing Applications to Topology
-                Collection<Application> applications = event.getTopology().getApplications();
-                if (applications != null && !applications.isEmpty()) {
-                    for (Application application : applications) {
-                        topology.addApplication(application);
-                        if (log.isDebugEnabled()) {
-                            log.debug("Application with id [ " +  application.getId() + " ] added to Topology");
-                        }
-                    }
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("No Application information found in Complete Topology event");
-                    }
-                }
 
-				if (log.isInfoEnabled()) {
-					log.info("Topology initialized");
-				}
+            if (!topology.isInitialized()) {
+                TopologyManager.acquireWriteLock();
+
+                try {
+                    return doProcess(event, topology);
 
-				// Set topology initialized
-				topology.setInitialized(true);
-			}
+                } finally {
+                    TopologyManager.releaseWriteLock();
+                }
+            } else {
+                return true;
+            }
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
         } else {
             if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
@@ -153,4 +72,99 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
             return false;
         }
     }
+
+    private boolean doProcess (CompleteTopologyEvent event, Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            // Add services included in service filter
+            for (Service service : event.getTopology().getServices()) {
+                if (TopologyServiceFilter.getInstance()
+                        .serviceNameIncluded(service.getServiceName())) {
+                    topology.addService(service);
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format(
+                                "Service is excluded: [service] %s",
+                                service.getServiceName()));
+                    }
+                }
+            }
+        } else {
+            // Add all services
+            topology.addServices(event.getTopology().getServices());
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            for (Service service : topology.getServices()) {
+                List<Cluster> clustersToRemove = new ArrayList<Cluster>();
+                for (Cluster cluster : service.getClusters()) {
+                    if (TopologyClusterFilter.getInstance()
+                            .clusterIdExcluded(cluster.getClusterId())) {
+                        clustersToRemove.add(cluster);
+                    }
+                }
+                for (Cluster cluster : clustersToRemove) {
+                    service.removeCluster(cluster);
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format(
+                                "Cluster is excluded: [cluster] %s",
+                                cluster.getClusterId()));
+                    }
+                }
+            }
+        }
+
+        // Apply member filter
+        if (TopologyMemberFilter.getInstance().isActive()) {
+            for (Service service : topology.getServices()) {
+                for (Cluster cluster : service.getClusters()) {
+                    List<Member> membersToRemove = new ArrayList<Member>();
+                    for (Member member : cluster.getMembers()) {
+                        if (TopologyMemberFilter.getInstance()
+                                .lbClusterIdExcluded(
+                                        member.getLbClusterId())) {
+                            membersToRemove.add(member);
+                        }
+                    }
+                    for (Member member : membersToRemove) {
+                        cluster.removeMember(member);
+                        if (log.isDebugEnabled()) {
+                            log.debug(String
+                                    .format("Member is excluded: [member] %s [lb-cluster-id] %s",
+                                            member.getMemberId(),
+                                            member.getLbClusterId()));
+                        }
+                    }
+                }
+            }
+        }
+
+        // add existing Applications to Topology
+        Collection<Application> applications = event.getTopology().getApplications();
+        if (applications != null && !applications.isEmpty()) {
+            for (Application application : applications) {
+                topology.addApplication(application);
+                if (log.isDebugEnabled()) {
+                    log.debug("Application with id [ " +  application.getId() + " ] added to Topology");
+                }
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("No Application information found in Complete Topology event");
+            }
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info("Topology initialized");
+        }
+
+        // Set topology initialized
+        topology.setInitialized(true);
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
index 627d9a9..7200431 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
@@ -21,11 +21,9 @@ package org.apache.stratos.messaging.message.processor.topology;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent;
 import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
-import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 /**
@@ -53,34 +51,16 @@ public class GroupActivatedProcessor extends MessageProcessor {
             GroupActivatedEvent event = (GroupActivatedEvent) Util.
                     jsonToObject(message, GroupActivatedEvent.class);
 
-            // Validate event against the existing topology
-            Application application = topology.getApplication(event.getAppId());
-            if (application == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Application does not exist: [service] %s",
-                            event.getAppId()));
-                }
-                return false;
-            }
-            Group group = application.getGroup(event.getGroupId());
+            TopologyManager.acquireReadLockForApplications();
+            TopologyManager.acquireWriteLockForApplication(event.getAppId());
 
-            if (group == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                            event.getGroupId()));
-                }
-            } else {
-                // Apply changes to the topology
-                group.setStatus(Status.Activated);
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Group updated as activated : %s",
-                            group.toString()));
-                }
-            }
+            try {
+                return doProcess(event, topology);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+            } finally {
+                TopologyManager.releaseWriteLockForApplication(event.getAppId());
+                TopologyManager.releaseReadLockForApplications();
+            }
 
         } else {
             if (nextProcessor != null) {
@@ -91,4 +71,36 @@ public class GroupActivatedProcessor extends MessageProcessor {
             }
         }
     }
+
+    private boolean doProcess (GroupActivatedEvent event,Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroup(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            // Apply changes to the topology
+            group.setStatus(Status.Activated);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Group updated as activated : %s",
+                        group.toString()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
index 8e4e1b1..2d3f8b3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class InstanceSpawnedMessageProcessor extends MessageProcessor {
@@ -50,92 +51,103 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (InstanceSpawnedEvent event,Topology topology){
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
-                    }
-                    return false;
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
+                return false;
             }
+        }
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
                 }
                 return false;
             }
-            if (cluster.memberExists(event.getMemberId())) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	Member member = new Member(event.getServiceName(), event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), event.getMemberId());
-            	member.setStatus(MemberStatus.Created);
-            	member.setMemberPublicIp(event.getMemberPublicIp());
-            	member.setMemberIp(event.getMemberIp());
-            	member.setLbClusterId(event.getLbClusterId());
-                member.setProperties(event.getProperties());
-            	cluster.addMember(member);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        if (cluster.memberExists(event.getMemberId())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            Member member = new Member(event.getServiceName(), event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), event.getMemberId());
+            member.setStatus(MemberStatus.Created);
+            member.setMemberPublicIp(event.getMemberPublicIp());
+            member.setMemberIp(event.getMemberIp());
+            member.setLbClusterId(event.getLbClusterId());
+            member.setProperties(event.getProperties());
+            cluster.addMember(member);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
index a5d701d..ec1b5ec 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberActivatedMessageProcessor extends MessageProcessor {
@@ -54,111 +55,123 @@ public class MemberActivatedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (MemberActivatedEvent event,Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
+                return false;
             }
+        }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
+                return false;
             }
+        }
 
-            // Validate event properties
-            if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
-                throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
+        // Validate event properties
+        if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
+            throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
+                    event.getServiceName(),
+                    event.getClusterId(),
+                    event.getMemberId()));
+        }
+        if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
+            throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
+                    event.getServiceName(),
+                    event.getClusterId(),
+                    event.getMemberId()));
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
             }
-            if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
-                throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
                         event.getServiceName(),
                         event.getClusterId(),
                         event.getMemberId()));
             }
+            return false;
+        }
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
-                }
-                return false;
-            }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
-                }
-                return false;
-            }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                 }
                 return false;
             }
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
-                }
+        if (member.getStatus() == MemberStatus.Activated) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
-            if (member.getStatus() == MemberStatus.Activated) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	member.addPorts(event.getPorts());
-            	member.setMemberIp(event.getMemberIp());
-            	member.setStatus(MemberStatus.Activated);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
-            }
+            // Apply changes to the topology
+            member.addPorts(event.getPorts());
+            member.setMemberIp(event.getMemberIp());
+            member.setStatus(MemberStatus.Activated);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
index b6dc489..b252a61 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberMaintenanceModeProcessor extends MessageProcessor {
@@ -51,98 +52,110 @@ public class MemberMaintenanceModeProcessor extends MessageProcessor {
             MemberMaintenanceModeEvent event = (MemberMaintenanceModeEvent) Util.
                                             jsonToObject(message, MemberMaintenanceModeEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberMaintenanceModeEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
             }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            return false;
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                 }
+                return false;
             }
+        }
 
-            if (member.getStatus() == MemberStatus.In_Maintenance) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already updated as In_Maintenance: " +
-                            "[service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	member.setStatus(MemberStatus.In_Maintenance);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member updated as In_Maintenance: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        if (member.getStatus() == MemberStatus.In_Maintenance) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already updated as In_Maintenance: " +
+                        "[service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.In_Maintenance);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member updated as In_Maintenance: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
index 92115aa..f0c3580 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{
@@ -50,98 +51,111 @@ public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{
             MemberReadyToShutdownEvent event = (MemberReadyToShutdownEvent) Util.
                                             jsonToObject(message, MemberReadyToShutdownEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberReadyToShutdownEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
             }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            return false;
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                 }
+                return false;
             }
+        }
 
-            if (member.getStatus() == MemberStatus.ReadyToShutDown) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already updated as Ready to Shutdown: " +
-                            "[service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	member.setStatus(MemberStatus.ReadyToShutDown);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        if (member.getStatus() == MemberStatus.ReadyToShutDown) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already updated as Ready to Shutdown: " +
+                        "[service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.ReadyToShutDown);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
index 4d93957..508ec39 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberStartedMessageProcessor extends MessageProcessor {
@@ -54,97 +55,109 @@ public class MemberStartedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberStartedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
             }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            return false;
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                 }
+                return false;
             }
+        }
 
-            if (member.getStatus() == MemberStatus.Starting) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	member.setStatus(MemberStatus.Starting);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        if (member.getStatus() == MemberStatus.Starting) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.Starting);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }


[4/4] git commit: initial changes for hierarchical topology locking

Posted by is...@apache.org.
initial changes for hierarchical topology locking


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4ace39c8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4ace39c8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4ace39c8

Branch: refs/heads/4.0.0-grouping
Commit: 4ace39c85ab1f989e888dcc48afd2c1092ff245f
Parents: 2536b30
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Tue Oct 7 10:21:48 2014 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Tue Oct 7 19:08:54 2014 +0530

----------------------------------------------------------------------
 .../AutoscalerHealthStatEventReceiver.java      |  18 +-
 .../AutoscalerTopologyEventReceiver.java        | 151 ++++---
 .../controller/topology/TopologyBuilder.java    |   7 +-
 .../LoadBalancerTopologyEventReceiver.java      |  77 +++-
 .../StratosManagerTopologyEventReceiver.java    |  78 +++-
 .../messaging/domain/topology/Service.java      |   9 +-
 .../messaging/domain/topology/Topology.java     |  11 +-
 .../domain/topology/locking/TopologyLock.java   |  49 +++
 .../topology/locking/TopologyLockHierarchy.java | 147 +++++++
 .../ApplicationActivatedMessageProcessor.java   |  54 ++-
 .../ApplicationCreatedMessageProcessor.java     |  65 +--
 .../ApplicationRemovedMessageProcessor.java     |  73 ++--
 .../topology/ClusterActivatedProcessor.java     | 108 +++--
 .../ClusterCreatedMessageProcessor.java         | 131 +++---
 .../ClusterMaintenanceModeMessageProcessor.java | 104 +++--
 .../ClusterRemovedMessageProcessor.java         | 107 +++--
 .../CompleteTopologyMessageProcessor.java       | 200 +++++----
 .../topology/GroupActivatedProcessor.java       |  70 +--
 .../InstanceSpawnedMessageProcessor.java        | 150 ++++---
 .../MemberActivatedMessageProcessor.java        | 183 ++++----
 .../MemberMaintenanceModeProcessor.java         | 159 +++----
 .../MemberReadyToShutdownMessageProcessor.java  | 160 +++----
 .../topology/MemberStartedMessageProcessor.java | 157 +++----
 .../MemberSuspendedMessageProcessor.java        | 155 +++----
 .../MemberTerminatedMessageProcessor.java       | 139 +++---
 .../ServiceCreatedMessageProcessor.java         |  74 ++--
 .../ServiceRemovedMessageProcessor.java         |  71 ++--
 .../topology/TopologyMessageProcessorChain.java |  10 +-
 .../topology/TopologyEventMessageDelegator.java |  10 +-
 .../receiver/topology/TopologyManager.java      | 425 ++++++++++++++++++-
 30 files changed, 2070 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
index 26d3179..a1213f6 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -593,8 +593,11 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
     }
 
     private Member findMember(String memberId) {
+
+        TopologyManager.acquireReadLockForServices();
+
         try {
-            TopologyManager.acquireReadLock();
+            //TopologyManager.acquireReadLock();
             for(Service service : TopologyManager.getTopology().getServices()) {
                 for(Cluster cluster : service.getClusters()) {
                     if(cluster.memberExists(memberId)) {
@@ -605,7 +608,8 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
             return null;
         }
         finally {
-            TopologyManager.releaseReadLock();
+            //TopologyManager.releaseReadLock();
+            TopologyManager.releaseReadLockForServices();
         }
     }
 
@@ -613,8 +617,13 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
         try {
             AbstractClusterMonitor monitor = getMonitor(clusterId);
             NetworkPartitionContext nwPartitionCtxt;
+
+            // TODO: the optimal way would be to add Service Name to member fault event and use to get a
+            // hierarchical lock using TopologyManager.acquireReadLockForCluster(serviceName, clusterid)
+            TopologyManager.acquireReadLockForServices();
+
             try{
-            	TopologyManager.acquireReadLock();
+            	//TopologyManager.acquireReadLock();
             	Member member = findMember(memberId);
             	
             	if(null == member){
@@ -634,7 +643,8 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
                 }
 
             }finally{
-            	TopologyManager.releaseReadLock();
+            	//TopologyManager.releaseReadLock();
+                TopologyManager.releaseReadLockForServices();
             }
             // start a new member in the same Partition
             String partitionId = monitor.getPartitionOfMember(memberId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index 3e1bfe2..4064510 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -90,22 +90,23 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
-                try {
+
+                if (!topologyInitialized) {
                     TopologyManager.acquireReadLock();
-                    if(!topologyInitialized) {
-                          topologyInitialized = true;
+
+                    try {
                         for (Application application : TopologyManager.getTopology().getApplications()) {
                             startApplicationMonitor(application);
                         }
+
+                        topologyInitialized = true;
+                    } catch (Exception e) {
+                        log.error("Error processing event", e);
+                    } finally {
+                        TopologyManager.releaseReadLock();
                     }
-                } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
                 }
             }
-
-
         });
 
 
@@ -118,12 +119,19 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
                 ApplicationCreatedEvent applicationCreatedEvent = (ApplicationCreatedEvent) event;
 
                 //acquire read lock
-                TopologyManager.acquireReadLock();
-                //start the application monitor
-                //TODO catch exception by ApplicationMonitor
-                startApplicationMonitor(applicationCreatedEvent.getApplication());
-                //release read lock
-                TopologyManager.releaseReadLock();
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForApplication(applicationCreatedEvent.getApplication().getId());
+
+                try {
+                    //start the application monitor
+                    //TODO catch exception by ApplicationMonitor
+                    startApplicationMonitor(applicationCreatedEvent.getApplication());
+
+                } finally {
+                    //release read lock
+                    TopologyManager.releaseReadLockForApplication(applicationCreatedEvent.getApplication().getId());
+                    //TopologyManager.releaseReadLock();
+                }
 
             }
         });
@@ -197,7 +205,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
                 ApplicationRemovedEvent applicationRemovedEvent = (ApplicationRemovedEvent) event;
 
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForApplication(applicationRemovedEvent.getApplicationId());
 
                 try {
                     //TODO remove monitors as well as any starting or pending threads
@@ -222,7 +231,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
 
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForApplication(applicationRemovedEvent.getApplicationId());
+                    //TopologyManager.releaseReadLock();
                 }
 
             }
@@ -283,26 +293,34 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new ClusterMaintenanceModeEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                ClusterMaintenanceModeEvent clusterMaitenanceEvent = null;
+
                 try {
                     log.info("Event received: " + event);
-                    ClusterMaintenanceModeEvent e = (ClusterMaintenanceModeEvent) event;
-                    TopologyManager.acquireReadLock();
-                    Service service = TopologyManager.getTopology().getService(e.getServiceName());
-                    Cluster cluster = service.getCluster(e.getClusterId());
+                    clusterMaitenanceEvent = (ClusterMaintenanceModeEvent) event;
+                    //TopologyManager.acquireReadLock();
+                    TopologyManager.acquireWriteLockForCluster(clusterMaitenanceEvent.getServiceName(),
+                            clusterMaitenanceEvent.getClusterId());
+
+                    Service service = TopologyManager.getTopology().getService(clusterMaitenanceEvent.getServiceName());
+                    Cluster cluster = service.getCluster(clusterMaitenanceEvent.getClusterId());
                     if (AutoscalerContext.getInstance().monitorExist((cluster.getClusterId()))) {
-                        AutoscalerContext.getInstance().getMonitor(e.getClusterId()).
-                                                        setStatus(e.getStatus());
+                        AutoscalerContext.getInstance().getMonitor(clusterMaitenanceEvent.getClusterId()).
+                                                        setStatus(clusterMaitenanceEvent.getStatus());
                     } else if (AutoscalerContext.getInstance().
                                         lbMonitorExist((cluster.getClusterId()))) {
-                        AutoscalerContext.getInstance().getLBMonitor(e.getClusterId()).
-                                                        setStatus(e.getStatus());
+                        AutoscalerContext.getInstance().getLBMonitor(clusterMaitenanceEvent.getClusterId()).
+                                                        setStatus(clusterMaitenanceEvent.getStatus());
                     } else {
                         log.error("cluster monitor not exists for the cluster: " + cluster.toString());
                     }
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseWriteLockForCluster(clusterMaitenanceEvent.getServiceName(),
+                            clusterMaitenanceEvent.getClusterId());
                 }
             }
 
@@ -312,16 +330,20 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                ClusterRemovedEvent clusterRemovedEvent = null;
                 try {
-                    ClusterRemovedEvent e = (ClusterRemovedEvent) event;
-                    TopologyManager.acquireReadLock();
+                    clusterRemovedEvent = (ClusterRemovedEvent) event;
+                    //TopologyManager.acquireReadLock();
+                    TopologyManager.acquireReadLockForCluster(clusterRemovedEvent.getServiceName(),
+                            clusterRemovedEvent.getClusterId());
 
-                    String clusterId = e.getClusterId();
-                    String deploymentPolicy = e.getDeploymentPolicy();
+                    String clusterId = clusterRemovedEvent.getClusterId();
+                    String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
 
                     AbstractClusterMonitor monitor;
 
-                    if (e.isLbCluster()) {
+                    if (clusterRemovedEvent.isLbCluster()) {
                         DeploymentPolicy depPolicy = PolicyManager.getInstance().
                                                         getDeploymentPolicy(deploymentPolicy);
                         if (depPolicy != null) {
@@ -362,7 +384,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(clusterRemovedEvent.getServiceName(),
+                            clusterRemovedEvent.getClusterId());
                 }
             }
 
@@ -380,14 +404,19 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
+                MemberTerminatedEvent memberTerminatedEvent =  null;
                 try {
-                    TopologyManager.acquireReadLock();
-                    MemberTerminatedEvent e = (MemberTerminatedEvent) event;
-                    String networkPartitionId = e.getNetworkPartitionId();
-                    String clusterId = e.getClusterId();
-                    String partitionId = e.getPartitionId();
+                    //TopologyManager.acquireReadLock();
+
+                    memberTerminatedEvent = (MemberTerminatedEvent) event;
+                    String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
+                    String clusterId = memberTerminatedEvent.getClusterId();
+                    String partitionId = memberTerminatedEvent.getPartitionId();
                     AbstractClusterMonitor monitor;
 
+                    TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                            memberTerminatedEvent.getClusterId());
+
                     if (AutoscalerContext.getInstance().monitorExist(clusterId)) {
                         monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
                     } else {
@@ -400,7 +429,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
 
                     PartitionContext partitionContext = networkPartitionContext.
                                                                     getPartitionCtxt(partitionId);
-                    String memberId = e.getMemberId();
+                    String memberId = memberTerminatedEvent.getMemberId();
                     partitionContext.removeMemberStatsContext(memberId);
 
                     if (partitionContext.removeTerminationPendingMember(memberId)) {
@@ -431,7 +460,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                            memberTerminatedEvent.getClusterId());
                 }
             }
 
@@ -441,14 +472,18 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
+                MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(),
+                        memberActivatedEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
 
-                    MemberActivatedEvent e = (MemberActivatedEvent) event;
-                    String networkPartitionId = e.getNetworkPartitionId();
-                    String clusterId = e.getClusterId();
-                    String partitionId = e.getPartitionId();
-                    String memberId = e.getMemberId();
+                    String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
+                    String clusterId = memberActivatedEvent.getClusterId();
+                    String partitionId = memberActivatedEvent.getPartitionId();
+                    String memberId = memberActivatedEvent.getMemberId();
 
                     AbstractClusterMonitor monitor;
 
@@ -476,12 +511,14 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
 //                partitionContext.incrementCurrentActiveMemberCount(1);
                     partitionContext.movePendingMemberToActiveMembers(memberId);
                     //triggering the status checker
-                    StatusChecker.getInstance().onMemberStatusChange(e.getClusterId());
+                    StatusChecker.getInstance().onMemberStatusChange(memberActivatedEvent.getClusterId());
 
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(),
+                            memberActivatedEvent.getClusterId());
                 }
             }
         });
@@ -490,16 +527,20 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
+                MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+                        memberMaintenanceModeEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
 
-                    MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent) event;
-                    String memberId = e.getMemberId();
-                    String partitionId = e.getPartitionId();
-                    String networkPartitionId = e.getNetworkPartitionId();
+                    String memberId = memberMaintenanceModeEvent.getMemberId();
+                    String partitionId = memberMaintenanceModeEvent.getPartitionId();
+                    String networkPartitionId = memberMaintenanceModeEvent.getNetworkPartitionId();
 
                     PartitionContext partitionContext;
-                    String clusterId = e.getClusterId();
+                    String clusterId = memberMaintenanceModeEvent.getClusterId();
                     AbstractClusterMonitor monitor;
 
                     if (AutoscalerContext.getInstance().monitorExist(clusterId)) {
@@ -521,7 +562,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+                            memberMaintenanceModeEvent.getClusterId());
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
index 6d50f6c..30b1b00 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
@@ -608,7 +608,7 @@ public class TopologyBuilder {
         }
     }
 
-    public static void handleApplicationDeployed(Application application,
+    public static synchronized void handleApplicationDeployed(Application application,
                                                  Set<ApplicationClusterContext> applicationClusterContexts,
                                                  Set<MetaDataHolder> metaDataHolders) {
 
@@ -644,6 +644,7 @@ public class TopologyBuilder {
             // add to Topology and update
             topology.addApplication(application);
             TopologyManager.updateTopology(topology);
+
             log.info("Application with id [ " + application.getId() + " ] added to Topology successfully");
 
             TopologyEventPublisher.sendApplicationCreatedEvent(application ,clusters);
@@ -653,7 +654,8 @@ public class TopologyBuilder {
         }
     }
 
-    public static void handleApplicationUndeployed(FasterLookUpDataHolder dataHolder, String applicationId, int tenantId, String tenantDomain) {
+    public static synchronized void handleApplicationUndeployed(FasterLookUpDataHolder dataHolder,
+                                                                String applicationId, int tenantId, String tenantDomain) {
 
         Topology topology = TopologyManager.getTopology();
 
@@ -672,6 +674,7 @@ public class TopologyBuilder {
                     if (service != null) {
                         // remove Cluster
                         service.removeCluster(clusterDataHolder.getClusterId());
+
                         if (log.isDebugEnabled()) {
                             log.debug("Removed cluster with id " + clusterDataHolder.getClusterId());
                         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
index a5b5e42..4222075 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
@@ -118,10 +118,15 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireWriteLockForCluster(memberActivatedEvent.getServiceName(),
+                        memberActivatedEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
 
-                    MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
                     Service service = TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName());
                     if (service == null) {
                         if (log.isWarnEnabled()) {
@@ -167,16 +172,24 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseWriteLockForCluster(memberActivatedEvent.getServiceName(),
+                            memberActivatedEvent.getClusterId());
                 }
             }
         });
         topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
             @Override
             protected void onEvent(Event event) {
+
+                MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+
+                TopologyManager.acquireWriteLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+                        memberMaintenanceModeEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
-                    MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+                    //TopologyManager.acquireReadLock();
+
                     Member member = findMember(memberMaintenanceModeEvent.getServiceName(),
                             memberMaintenanceModeEvent.getClusterId(), memberMaintenanceModeEvent.getMemberId());
 
@@ -186,16 +199,22 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseWriteLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+                            memberMaintenanceModeEvent.getClusterId());
                 }
             }
         });
         topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event;
+                TopologyManager.acquireWriteLockForCluster(memberSuspendedEvent.getServiceName(),
+                        memberSuspendedEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
-                    MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event;
+                    //TopologyManager.acquireReadLock();
                     Member member = findMember(memberSuspendedEvent.getServiceName(),
                             memberSuspendedEvent.getClusterId(), memberSuspendedEvent.getMemberId());
 
@@ -205,16 +224,23 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseWriteLockForCluster(memberSuspendedEvent.getServiceName(),
+                            memberSuspendedEvent.getClusterId());
                 }
             }
         });
         topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                //TopologyManager.acquireReadLock();
+                MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+
+                TopologyManager.acquireWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+                        memberTerminatedEvent.getClusterId());
+
                 try {
-                    TopologyManager.acquireReadLock();
-                    MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
                     Member member = findMember(memberTerminatedEvent.getServiceName(),
                             memberTerminatedEvent.getClusterId(), memberTerminatedEvent.getMemberId());
 
@@ -224,18 +250,23 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+                            memberTerminatedEvent.getClusterId());
                 }
             }
         });
         topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
-                try {
-                    TopologyManager.acquireReadLock();
 
-                    // Remove cluster from context
-                    ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+                // Remove cluster from context
+                ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+                TopologyManager.acquireWriteLockForCluster(clusterRemovedEvent.getServiceName(),
+                        clusterRemovedEvent.getClusterId());
+
+                try {
+                    //TopologyManager.acquireReadLock();
                     Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId());
                     if (cluster != null) {
                         for (Member member : cluster.getMembers()) {
@@ -251,18 +282,23 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseWriteLockForCluster(clusterRemovedEvent.getServiceName(),
+                            clusterRemovedEvent.getClusterId());
                 }
             }
         });
         topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event;
+                TopologyManager.acquireWriteLockForServices();
+
                 try {
-                    TopologyManager.acquireReadLock();
+                    //TopologyManager.acquireReadLock();
 
                     // Remove all clusters of given service from context
-                    ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event;
                     Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName());
                     if (service != null) {
                         for (Cluster cluster : service.getClusters()) {
@@ -280,7 +316,8 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
                 } catch (Exception e) {
                     log.error("Error processing event", e);
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseWriteLockForServices();
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
index c3a5719..d61f474 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
@@ -23,8 +23,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.manager.exception.ADCException;
 import org.apache.stratos.manager.exception.ApplicationSubscriptionException;
-import org.apache.stratos.manager.exception.CompositeApplicationDefinitionException;
-import org.apache.stratos.manager.exception.CompositeApplicationException;
 import org.apache.stratos.manager.manager.CartridgeSubscriptionManager;
 import org.apache.stratos.manager.subscription.ApplicationSubscription;
 import org.apache.stratos.manager.topology.model.TopologyClusterInformationModel;
@@ -95,7 +93,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
 
                 String serviceType = clustercreatedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForCluster(clustercreatedEvent.getServiceName(),
+                        clustercreatedEvent.getClusterId());
 
                 try {
                     Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clustercreatedEvent.getClusterId());
@@ -103,7 +103,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
 
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(clustercreatedEvent.getServiceName(),
+                            clustercreatedEvent.getClusterId());
                 }
 
             }
@@ -137,14 +139,18 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
 
                 String serviceType = instanceSpawnedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForCluster(instanceSpawnedEvent.getServiceName(),
+                        instanceSpawnedEvent.getClusterId());
 
                 try {
                     Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
                     TopologyClusterInformationModel.getInstance().addCluster(cluster);
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(instanceSpawnedEvent.getServiceName(),
+                            instanceSpawnedEvent.getClusterId());
                 }
             }
         });
@@ -162,14 +168,18 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
 
                 String serviceType = memberStartedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForCluster(memberStartedEvent.getServiceName(),
+                        memberStartedEvent.getClusterId());
 
                 try {
                     Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
                     TopologyClusterInformationModel.getInstance().addCluster(cluster);
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(memberStartedEvent.getServiceName(),
+                            memberStartedEvent.getClusterId());
                 }
 
             }
@@ -188,14 +198,18 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
 
                 String serviceType = memberActivatedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(),
+                        memberActivatedEvent.getClusterId());
 
                 try {
                     Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
                     TopologyClusterInformationModel.getInstance().addCluster(cluster);
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(),
+                            memberActivatedEvent.getClusterId());
                 }
             }
         });
@@ -213,7 +227,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
 
                 String serviceType = memberSuspendedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForCluster(memberSuspendedEvent.getServiceName(),
+                        memberSuspendedEvent.getClusterId());
 
                 try {
                     Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
@@ -221,7 +237,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
 
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(memberSuspendedEvent.getServiceName(),
+                            memberSuspendedEvent.getClusterId());
                 }
             }
         });
@@ -239,7 +257,9 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
 
                 String serviceType = memberTerminatedEvent.getServiceName();
                 //acquire read lock
-                TopologyManager.acquireReadLock();
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                        memberTerminatedEvent.getClusterId());
 
                 try {
                     Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
@@ -247,8 +267,12 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
                     // check and remove terminated member
                     if (cluster.memberExists(memberTerminatedEvent.getMemberId())) {
                         // release the read lock and acquire the write lock
-                        TopologyManager.releaseReadLock();
-                        TopologyManager.acquireWriteLock();
+//                        TopologyManager.releaseReadLock();
+//                        TopologyManager.acquireWriteLock();
+                        TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                                memberTerminatedEvent.getClusterId());
+                        TopologyManager.acquireWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+                                memberTerminatedEvent.getClusterId());
 
                         try {
                             // re-check the state; another thread might have acquired the write lock and modified
@@ -263,17 +287,23 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
 
                             // downgrade to read lock - 1. acquire read lock, 2. release write lock
                             // acquire read lock
-                            TopologyManager.acquireReadLock();
+                            //TopologyManager.acquireReadLock();
+                            TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                                    memberTerminatedEvent.getClusterId());
 
                         } finally {
                             // release the write lock
-                            TopologyManager.releaseWriteLock();
+                           // TopologyManager.releaseWriteLock();
+                            TopologyManager.releaseWriteLockForCluster(memberTerminatedEvent.getServiceName(),
+                                    memberTerminatedEvent.getClusterId());
                         }
                     }
                     TopologyClusterInformationModel.getInstance().addCluster(cluster);
                 } finally {
                     //release read lock
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                            memberTerminatedEvent.getClusterId());
                 }
             }
         });
@@ -288,7 +318,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
                 log.info("[ApplicationCreatedEventListener] Received: " + event.getClass());
 
                 try {
-                    TopologyManager.acquireReadLock();
+                    //TopologyManager.acquireReadLock();
+                    TopologyManager.acquireReadLockForApplication(appCreateEvent.getApplication().getId());
                     
                     // create and persist Application subscritpion
                     CartridgeSubscriptionManager cartridgeSubscriptionManager = new CartridgeSubscriptionManager();
@@ -318,7 +349,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
                     	PrivilegedCarbonContext.endTenantFlow();
                     }
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForApplication(appCreateEvent.getApplication().getId());
                 }
             }
         });
@@ -333,7 +365,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
                 log.info("[ApplicationRemovedEventListener] Received: " + event.getClass());
 
                 try {
-                    TopologyManager.acquireReadLock();
+                    //TopologyManager.acquireReadLock();
+                    TopologyManager.acquireReadLockForApplication(appRemovedEvent.getApplicationId());
                     
                     // create and persist Application subscritpion
                     CartridgeSubscriptionManager cartridgeSubscriptionManager = new CartridgeSubscriptionManager();
@@ -360,7 +393,8 @@ public class StratosManagerTopologyEventReceiver implements Runnable {
                     	PrivilegedCarbonContext.endTenantFlow();
                     }
                 } finally {
-                    TopologyManager.releaseReadLock();
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForApplication(appRemovedEvent.getApplicationId());
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
index 46d46d4..d03cfda 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
@@ -19,6 +19,9 @@
 
 package org.apache.stratos.messaging.domain.topology;
 
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
+
 import java.io.Serializable;
 import java.util.*;
 
@@ -59,14 +62,18 @@ public class Service implements Serializable{
 
     public void addCluster(Cluster cluster) {
         this.clusterIdClusterMap.put(cluster.getClusterId(), cluster);
+        TopologyLockHierarchy.getInstance().addClusterLock(cluster.getClusterId(), new TopologyLock());
     }
 
     public void removeCluster(Cluster cluster) {
         this.clusterIdClusterMap.remove(cluster.getClusterId());
+        TopologyLockHierarchy.getInstance().removeTopologyLockForCluster(cluster.getClusterId());
     }
 
     public Cluster removeCluster(String clusterId) {
-        return this.clusterIdClusterMap.remove(clusterId);
+        Cluster removedCluster = this.clusterIdClusterMap.remove(clusterId);
+        TopologyLockHierarchy.getInstance().removeTopologyLockForCluster(clusterId);
+        return removedCluster;
     }
 
     public boolean clusterExists(String clusterId) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
index f8b535f..dabf611 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Topology.java
@@ -26,8 +26,8 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.util.CompositeApplicationBuilder;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
 
 /**
  * Defines a topology of serviceMap in Stratos.
@@ -54,6 +54,7 @@ public class Topology implements Serializable {
 
     public void addApplication (Application application) {
         this.applicationMap.put(application.getId(), application);
+        TopologyLockHierarchy.getInstance().addApplicationLock(application.getId(), new TopologyLock());
     }
 
     public Application getApplication (String applicationId) {
@@ -62,6 +63,7 @@ public class Topology implements Serializable {
 
     public void removeApplication (String applicationId) {
         applicationMap.remove(applicationId);
+        TopologyLockHierarchy.getInstance().removeTopologyLockForApplication(applicationId);
     }
 
     public Collection<Application> getApplications () {
@@ -78,9 +80,10 @@ public class Topology implements Serializable {
 
     public void addService(Service service) {
         this.serviceMap.put(service.getServiceName(), service);
+        TopologyLockHierarchy.getInstance().addServiceLock(service.getServiceName(), new TopologyLock());
     }
 
-    public void addServices(Collection<Service> services) {
+    public synchronized void addServices(Collection<Service> services) {
         for (Service service : services) {
             addService(service);
         }
@@ -88,10 +91,12 @@ public class Topology implements Serializable {
 
     public void removeService(Service service) {
         this.serviceMap.remove(service.getServiceName());
+        TopologyLockHierarchy.getInstance().removeTopologyLockForService(service.getServiceName());
     }
 
     public void removeService(String serviceName) {
         this.serviceMap.remove(serviceName);
+        TopologyLockHierarchy.getInstance().removeTopologyLockForService(serviceName);
     }
 
     public Service getService(String serviceName) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java
new file mode 100644
index 0000000..e1b90ad
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLock.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.domain.topology.locking;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class TopologyLock {
+
+    private final ReentrantReadWriteLock lock;
+
+    public TopologyLock () {
+        lock = new ReentrantReadWriteLock(true);
+    }
+
+    public void acquireWriteLock() {
+        lock.writeLock().lock();
+    }
+
+    public void releaseWritelock() {
+        if (lock.isWriteLockedByCurrentThread()) {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public void acquireReadLock() {
+        lock.readLock().lock();
+    }
+
+    public void releaseReadLock() {
+        lock.readLock().unlock();
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
new file mode 100644
index 0000000..aa0f7a1
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/locking/TopologyLockHierarchy.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.domain.topology.locking;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TopologyLockHierarchy {
+
+    private static final Log log = LogFactory.getLog(TopologyLockHierarchy.class);
+
+    // lock for Services
+    private TopologyLock serviceLock;
+
+    // lock for Applications
+    private TopologyLock applicatioLock;
+
+    // key = Service.name
+    private Map<String, TopologyLock> serviceNameToTopologyLockMap;
+
+    // key = Application.id
+    private Map<String, TopologyLock> applicationIdToTopologyLockMap;
+
+    // key = Cluster.id
+    private Map<String, TopologyLock> clusterIdToTopologyLockMap;
+
+    private static volatile TopologyLockHierarchy topologyLockHierarchy;
+
+    private TopologyLockHierarchy () {
+
+        this.serviceLock = new TopologyLock();
+        this.applicatioLock = new TopologyLock();
+        this.serviceNameToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>();
+        this.applicationIdToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>();
+        this.clusterIdToTopologyLockMap = new ConcurrentHashMap<String, TopologyLock>();
+    }
+
+    public static TopologyLockHierarchy getInstance () {
+
+        if (topologyLockHierarchy == null) {
+            synchronized (TopologyLockHierarchy.class) {
+                if (topologyLockHierarchy == null) {
+                    topologyLockHierarchy = new TopologyLockHierarchy();
+                }
+            }
+        }
+
+        return topologyLockHierarchy;
+    }
+
+    public void addApplicationLock (String appId, final TopologyLock topologyLock) {
+
+        if (!applicationIdToTopologyLockMap.containsKey(appId)) {
+            synchronized (applicationIdToTopologyLockMap) {
+                if (!applicationIdToTopologyLockMap.containsKey(appId)) {
+                    applicationIdToTopologyLockMap.put(appId, topologyLock);
+                    log.info("Added lock for Application " + appId);
+                }
+            }
+        } else {
+            log.warn("Topology Lock for Application " + appId + " already exists");
+        }
+    }
+
+    public TopologyLock getTopologyLockForApplication (String appId) {
+        return applicationIdToTopologyLockMap.get(appId);
+    }
+
+    public void addServiceLock (String serviceName, final TopologyLock topologyLock) {
+
+        if (!serviceNameToTopologyLockMap.containsKey(serviceName)) {
+            synchronized (serviceNameToTopologyLockMap) {
+                if (!serviceNameToTopologyLockMap.containsKey(serviceName)) {
+                    serviceNameToTopologyLockMap.put(serviceName, topologyLock);
+                    log.info("Added lock for Service " + serviceName);
+                }
+            }
+        } else {
+            log.warn("Topology Lock for Service " + serviceName + " already exists");
+        }
+    }
+
+    public TopologyLock getTopologyLockForService (String serviceName) {
+        return serviceNameToTopologyLockMap.get(serviceName);
+    }
+
+    public void addClusterLock (String clusterId, final TopologyLock topologyLock) {
+
+        if (!clusterIdToTopologyLockMap.containsKey(clusterId)) {
+            synchronized (clusterIdToTopologyLockMap) {
+                if (!clusterIdToTopologyLockMap.containsKey(clusterId)) {
+                    clusterIdToTopologyLockMap.put(clusterId, topologyLock);
+                    log.info("Added lock for Cluster " + clusterId);
+                }
+            }
+        } else {
+            log.warn("Topology Lock for Cluster " + clusterId + " already exists");
+        }
+    }
+
+    public TopologyLock getTopologyLockForCluster (String clusterId) {
+        return clusterIdToTopologyLockMap.get(clusterId);
+    }
+
+    public void removeTopologyLockForApplication (String appId) {
+        applicationIdToTopologyLockMap.remove(appId);
+        log.info("Removed lock for Application " + appId);
+    }
+
+    public void removeTopologyLockForService (String serviceName) {
+        serviceNameToTopologyLockMap.remove(serviceName);
+        log.info("Removed lock for Service " + serviceName);
+    }
+
+    public void removeTopologyLockForCluster (String clusterId) {
+        clusterIdToTopologyLockMap.remove(clusterId);
+        log.info("Removed lock for Cluster " + clusterId);
+    }
+
+    public TopologyLock getServiceLock() {
+        return serviceLock;
+    }
+
+    public TopologyLock getApplicatioLock() {
+        return applicatioLock;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
index 8c1e66b..803a871 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Status;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ApplicationActivatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 /**
@@ -56,26 +57,16 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor {
             ApplicationActivatedEvent event = (ApplicationActivatedEvent) Util.
                     jsonToObject(message, ApplicationActivatedEvent.class);
 
-            // Validate event against the existing topology
-            Application application = topology.getApplication(event.getAppId());
-            if (application == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Application does not exist: [service] %s",
-                            event.getAppId()));
-                }
-                return false;
-            } else {
-                // Apply changes to the topology
-                application.setStatus(Status.Activated);
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Application updated as activated : %s",
-                            application.toString()));
-                }
-            }
+            TopologyManager.acquireReadLockForApplications();
+            TopologyManager.acquireWriteLockForApplication(event.getAppId());
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForApplication(event.getAppId());
+                TopologyManager.releaseReadLockForApplications();
+            }
 
         } else {
             if (nextProcessor != null) {
@@ -86,4 +77,29 @@ public class ApplicationActivatedMessageProcessor extends MessageProcessor {
             }
         }
     }
+
+    private boolean doProcess (ApplicationActivatedEvent event, Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        } else {
+            // Apply changes to the topology
+            application.setStatus(Status.Activated);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Application updated as activated : %s",
+                        application.toString()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
index f02e4be..4368bd7 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ApplicationCreatedMessageProcessor extends MessageProcessor {
@@ -47,40 +48,21 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
                 return false;
             }
 
-            ApplicationCreatedEvent appCreatedEvent = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
-            if (appCreatedEvent == null) {
+            ApplicationCreatedEvent event = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
+            if (event == null) {
                 log.error("Unable to convert the JSON message to ApplicationCreatedEvent");
                 return false;
             }
 
-            // check if required properties are available
-            if (appCreatedEvent.getApplication() == null) {
-                String errorMsg = "Application object of application created event is invalid";
-                log.error(errorMsg);
-                throw new RuntimeException(errorMsg);
-            }
-
-            if (appCreatedEvent.getApplication().getId() == null || appCreatedEvent.getApplication().getId().isEmpty()) {
-                String errorMsg = "App id of application created event is invalid: [ " + appCreatedEvent.getApplication().getId() + " ]";
-                log.error(errorMsg);
-                throw new RuntimeException(errorMsg);
-            }
+            TopologyManager.acquireWriteLockForApplications();
 
-            // check if an Application with same name exists in topology
-            if (topology.applicationExists(appCreatedEvent.getApplication().getId())) {
-                log.warn("Application with id [ " + appCreatedEvent.getApplication().getId() + " ] already exists in Topology");
+            try {
+                return doProcess(event, topology);
 
-            } else {
-                // add application and the clusters to Topology
-                for(Cluster cluster: appCreatedEvent.getClusterList()) {
-                    topology.getService(cluster.getServiceName()).addCluster(cluster);
-                }
-                topology.addApplication(appCreatedEvent.getApplication());
+            } finally {
+                TopologyManager.releaseWriteLockForApplications();
             }
 
-            notifyEventListeners(appCreatedEvent);
-            return true;
-
         } else {
             if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
@@ -90,4 +72,35 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
             }
         }
     }
+
+    private boolean doProcess (ApplicationCreatedEvent event,Topology topology) {
+
+        // check if required properties are available
+        if (event.getApplication() == null) {
+            String errorMsg = "Application object of application created event is invalid";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        if (event.getApplication().getId() == null || event.getApplication().getId().isEmpty()) {
+            String errorMsg = "App id of application created event is invalid: [ " + event.getApplication().getId() + " ]";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        // check if an Application with same name exists in topology
+        if (topology.applicationExists(event.getApplication().getId())) {
+            log.warn("Application with id [ " + event.getApplication().getId() + " ] already exists in Topology");
+
+        } else {
+            // add application and the clusters to Topology
+            for(Cluster cluster: event.getClusterList()) {
+                topology.getService(cluster.getServiceName()).addCluster(cluster);
+            }
+            topology.addApplication(event.getApplication());
+        }
+
+        notifyEventListeners(event);
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
index c9dbb07..ed6c2d4 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationRemovedMessageProcessor.java
@@ -22,9 +22,9 @@ package org.apache.stratos.messaging.message.processor.topology;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
 import org.apache.stratos.messaging.event.topology.ApplicationRemovedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ApplicationRemovedMessageProcessor extends MessageProcessor {
@@ -55,38 +55,20 @@ public class ApplicationRemovedMessageProcessor extends MessageProcessor {
 	            return false;
 	        }
 	
-	        ApplicationRemovedEvent appRemovedEvent = (ApplicationRemovedEvent) Util.jsonToObject(message, ApplicationRemovedEvent.class);
-	        if (appRemovedEvent == null) {
+	        ApplicationRemovedEvent event = (ApplicationRemovedEvent) Util.jsonToObject(message, ApplicationRemovedEvent.class);
+	        if (event == null) {
 	            log.error("Unable to convert the JSON message to ApplicationCreatedEvent");
 	            return false;
 	        }
-	        
-	     // check if required properties are available
-	        if (appRemovedEvent.getApplicationId() == null) {
-	            String errorMsg = "Application Id of application removed event is invalid";
-	            log.error(errorMsg);
-	            throw new RuntimeException(errorMsg);
-	        }
-	        
-	        if (appRemovedEvent.getTenantDomain()== null) {
-	            String errorMsg = "Application tenant domain of application removed event is invalid";
-	            log.error(errorMsg);
-	            throw new RuntimeException(errorMsg);
-	        }
-	        
-	        // check if an Application with same name exists in topology
-	        String appId = appRemovedEvent.getApplicationId();
-            if (topology.applicationExists(appId)) {
-                log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it");
-                topology.removeApplication(appId);
-            } 
-	        
-            if (log.isDebugEnabled()) {
-        		log.debug("ApplicationRemovedMessageProcessor notifying listener " + object);
-        	}
-            
-	        notifyEventListeners(appRemovedEvent);
-	        return true;
+
+            TopologyManager.acquireWriteLockForApplications();
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForApplications();
+            }
         
 	    } else {
 	        if (nextProcessor != null) {
@@ -96,6 +78,35 @@ public class ApplicationRemovedMessageProcessor extends MessageProcessor {
 	            throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
 	        }
 	    }
-        
+    }
+
+    private boolean doProcess (ApplicationRemovedEvent event, Topology topology) {
+
+        // check if required properties are available
+        if (event.getApplicationId() == null) {
+            String errorMsg = "Application Id of application removed event is invalid";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        if (event.getTenantDomain()== null) {
+            String errorMsg = "Application tenant domain of application removed event is invalid";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+
+        // check if an Application with same name exists in topology
+        String appId = event.getApplicationId();
+        if (topology.applicationExists(appId)) {
+            log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it");
+            topology.removeApplication(appId);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("ApplicationRemovedMessageProcessor notifying listener ");
+        }
+
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
index 52de45b..78f772b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterActivatedProcessor.java
@@ -25,10 +25,10 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Status;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent;
-import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 /**
@@ -45,75 +45,91 @@ public class ClusterActivatedProcessor extends MessageProcessor {
 
     @Override
     public boolean process(String type, String message, Object object) {
+
         Topology topology = (Topology) object;
 
         if (ClusterActivatedEvent.class.getName().equals(type)) {
             // Return if topology has not been initialized
-            if (!topology.isInitialized())
+            if (!topology.isInitialized()) {
                 return false;
+            }
 
             // Parse complete message and build event
             ClusterActivatedEvent event = (ClusterActivatedEvent) Util.
                     jsonToObject(message, ClusterActivatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireReadLockForServices();
+            TopologyManager.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForService(event.getServiceName());
+                TopologyManager.releaseReadLockForServices();
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        }  else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (ClusterActivatedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
+        }
 
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
-                            event.getClusterId()));
-                }
-            } else {
-                // Apply changes to the topology
-                cluster.setStatus(Status.Activated);
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Cluster updated as activated : %s",
-                            cluster.toString()));
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
+                return false;
             }
+        }
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
 
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
+                        event.getClusterId()));
+            }
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            // Apply changes to the topology
+            cluster.setStatus(Status.Activated);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster updated as activated : %s",
+                        cluster.toString()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
+
 }