You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ra...@apache.org on 2015/10/14 13:15:00 UTC

[2/4] stratos git commit: Fix STRATOS-1585: Threads can override resource paths in metadata api registry

Fix STRATOS-1585: Threads can override resource paths in metadata api registry


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4b8d439b
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4b8d439b
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4b8d439b

Branch: refs/heads/stratos-4.1.x
Commit: 4b8d439b34097715f9cddfab4948f4c14e472653
Parents: ade33aa
Author: Akila Perera <ra...@gmail.com>
Authored: Wed Oct 14 16:23:23 2015 +0530
Committer: Akila Perera <ra...@gmail.com>
Committed: Wed Oct 14 16:43:04 2015 +0530

----------------------------------------------------------------------
 .../service/MetadataTopologyEventReceiver.java  |  95 ++++++++
 .../metadata/service/api/MetadataApi.java       |  56 +++--
 .../service/registry/MetadataApiRegistry.java   | 223 +++++++++++--------
 3 files changed, 255 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/4b8d439b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
new file mode 100644
index 0000000..f483995
--- /dev/null
+++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.metadata.service;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.concurrent.locks.ReadWriteLock;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.application.ApplicationCreatedEvent;
+import org.apache.stratos.messaging.event.application.ApplicationDeletedEvent;
+import org.apache.stratos.messaging.event.topology.ApplicationClustersCreatedEvent;
+import org.apache.stratos.messaging.event.topology.ApplicationClustersRemovedEvent;
+import org.apache.stratos.messaging.listener.application.ApplicationCreatedEventListener;
+import org.apache.stratos.messaging.listener.application.ApplicationDeletedEventListener;
+import org.apache.stratos.messaging.listener.topology.ApplicationClustersCreatedEventListener;
+import org.apache.stratos.messaging.listener.topology.ApplicationClustersRemovedEventListener;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.metadata.service.registry.MetadataApiRegistry;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Topology receiver class for metadata service
+ */
+public class MetadataTopologyEventReceiver {
+    private static final Log log = LogFactory.getLog(MetadataTopologyEventReceiver.class);
+
+    private TopologyEventReceiver topologyEventReceiver;
+    private ExecutorService executorService;
+    public static final String METADATA_SERVICE_THREAD_POOL_ID = "metadata.service.thread.pool.";
+
+    public MetadataTopologyEventReceiver() {
+        this.topologyEventReceiver = new TopologyEventReceiver();
+        executorService = StratosThreadPool.getExecutorService(METADATA_SERVICE_THREAD_POOL_ID, 10);
+        addEventListeners();
+    }
+
+    private void addEventListeners() {
+        topologyEventReceiver.addEventListener(new ApplicationClustersCreatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                ApplicationClustersCreatedEvent appClustersCreatedEvent = (ApplicationClustersCreatedEvent) event;
+                String applicationId = appClustersCreatedEvent.getAppId();
+                MetadataApiRegistry.getApplicationIdToReadWriteLockMap()
+                        .put(applicationId, new ReadWriteLock(METADATA_SERVICE_THREAD_POOL_ID.concat(applicationId)));
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new ApplicationClustersRemovedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                ApplicationClustersRemovedEvent appClustersRemovedEvent = (ApplicationClustersRemovedEvent) event;
+                String applicationId = appClustersRemovedEvent.getAppId();
+                MetadataApiRegistry.getApplicationIdToReadWriteLockMap().remove(applicationId);
+            }
+        });
+    }
+
+    public void execute() {
+        topologyEventReceiver.setExecutorService(getExecutorService());
+        topologyEventReceiver.execute();
+
+        if (log.isInfoEnabled()) {
+            log.info("Metadata service topology receiver started.");
+        }
+    }
+
+    public void terminate() {
+        topologyEventReceiver.terminate();
+        if (log.isInfoEnabled()) {
+            log.info("Metadata service topology receiver stopped.");
+        }
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/4b8d439b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/api/MetadataApi.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/api/MetadataApi.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/api/MetadataApi.java
index 712bcaa..53174e0 100644
--- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/api/MetadataApi.java
+++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/api/MetadataApi.java
@@ -44,7 +44,13 @@ public class MetadataApi {
      * Meta data admin configuration loading
      */
     public MetadataApi() {
-        registry = new MetadataApiRegistry();
+        try {
+            registry = new MetadataApiRegistry();
+        } catch (Exception e) {
+            String msg = "Could not initialize Metadata API";
+            log.error(msg, e);
+            throw new RuntimeException(msg);
+        }
     }
 
     @GET
@@ -57,8 +63,7 @@ public class MetadataApi {
         List<Property> properties;
         Property[] propertiesArr = null;
         try {
-            properties = registry
-                    .getApplicationProperties(applicationId);
+            properties = registry.getApplicationProperties(applicationId);
             if (properties != null) {
                 propertiesArr = new Property[properties.size()];
                 propertiesArr = properties.toArray(propertiesArr);
@@ -83,13 +88,12 @@ public class MetadataApi {
     @Produces("application/json")
     @Consumes("application/json")
     public Response getClusterProperties(@PathParam("application_id") String applicationId,
-                                         @PathParam("cluster_id") String clusterId) throws RestAPIException {
+            @PathParam("cluster_id") String clusterId) throws RestAPIException {
 
         List<Property> properties;
         Property[] propertiesArr = null;
         try {
-            properties = registry
-                    .getClusterProperties(applicationId, clusterId);
+            properties = registry.getClusterProperties(applicationId, clusterId);
             if (properties != null) {
                 propertiesArr = new Property[properties.size()];
                 propertiesArr = properties.toArray(propertiesArr);
@@ -114,11 +118,9 @@ public class MetadataApi {
     @Produces("application/json")
     @Consumes("application/json")
     public Response getApplicationProperty(@PathParam("application_id") String applicationId,
-                                           @PathParam("property_name") String propertyName)
-            throws RestAPIException {
+            @PathParam("property_name") String propertyName) throws RestAPIException {
         List<Property> properties;
 
-
         Property property = null;
         try {
             properties = registry.getApplicationProperties(applicationId);
@@ -151,8 +153,8 @@ public class MetadataApi {
     @Produces("application/json")
     @Consumes("application/json")
     public Response getClusterProperty(@PathParam("application_id") String applicationId,
-                                       @PathParam("cluster_id") String clusterId,
-                                       @PathParam("property_name") String propertyName) throws RestAPIException {
+            @PathParam("cluster_id") String clusterId, @PathParam("property_name") String propertyName)
+            throws RestAPIException {
         List<Property> properties;
 
         Property property = null;
@@ -186,8 +188,7 @@ public class MetadataApi {
     @Path("applications/{application_id}/properties")
     @Produces("application/json")
     @Consumes("application/json")
-    public Response addPropertyToApplication(@PathParam("application_id") String applicationId,
-                                               Property property)
+    public Response addPropertyToApplication(@PathParam("application_id") String applicationId, Property property)
             throws RestAPIException {
         URI url = uriInfo.getAbsolutePathBuilder().path(applicationId).build();
 
@@ -206,8 +207,7 @@ public class MetadataApi {
     @Produces("application/json")
     @Consumes("application/json")
     public Response addPropertyToCluster(@PathParam("application_id") String applicationId,
-                                         @PathParam("cluster_id") String clusterId, Property property)
-            throws RestAPIException {
+            @PathParam("cluster_id") String clusterId, Property property) throws RestAPIException {
         URI url = uriInfo.getAbsolutePathBuilder().path(applicationId + "/" + clusterId).build();
 
         try {
@@ -230,9 +230,7 @@ public class MetadataApi {
         try {
             boolean deleted = registry.deleteApplicationProperties(applicationId);
             if (!deleted) {
-                log.warn(String.format(
-                        "No metadata is associated with given appId %s",
-                        applicationId));
+                log.warn(String.format("No metadata is associated with given appId %s", applicationId));
             }
         } catch (RegistryException e) {
             String msg = "Resource attached with appId could not be deleted [application-id] " + applicationId;
@@ -247,18 +245,16 @@ public class MetadataApi {
     @Produces("application/json")
     @Consumes("application/json")
     public Response deleteApplicationProperty(@PathParam("application_id") String applicationId,
-                                              @PathParam("property_name") String propertyName)
-            throws RestAPIException {
+            @PathParam("property_name") String propertyName) throws RestAPIException {
 
         try {
             boolean deleted = registry.removePropertyFromApplication(applicationId, propertyName);
             if (!deleted) {
-                log.warn(String.format(
-                        "No metadata is associated with given appId %s",
-                        applicationId));
+                log.warn(String.format("No metadata is associated with given appId %s", applicationId));
             }
         } catch (RegistryException e) {
-            String msg = String.format("[application-id] %s [property-name] deletion failed ", applicationId, propertyName);
+            String msg = String
+                    .format("[application-id] %s [property-name] %s deletion failed ", applicationId, propertyName);
             log.error(msg, e);
             throw new RestAPIException(msg, e);
         }
@@ -270,20 +266,18 @@ public class MetadataApi {
     @Produces("application/json")
     @Consumes("application/json")
     public Response deleteApplicationPropertyValue(@PathParam("application_id") String applicationId,
-                                                   @PathParam("property_name") String propertyName,
-                                                   @PathParam("value") String propertyValue)
+            @PathParam("property_name") String propertyName, @PathParam("value") String propertyValue)
             throws RestAPIException {
 
         try {
             boolean deleted = registry.removePropertyValueFromApplication(applicationId, propertyName, propertyValue);
             if (!deleted) {
-                log.warn(String.format(
-                        "No metadata is associated with given [application-id] %s",
-                        applicationId));
+                log.warn(String.format("No metadata is associated with given [application-id] %s", applicationId));
             }
         } catch (RegistryException e) {
-            String msg = String.format("[application-id] %s [property-name] %s [value] %s deletion failed" +
-                    applicationId, propertyName, propertyValue);
+            String msg = String
+                    .format("[application-id] %s [property-name] %s [value] %s deletion failed" + applicationId,
+                            propertyName, propertyValue);
             log.error(msg, e);
             throw new RestAPIException(msg, e);
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4b8d439b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
index abd3f4b..f6b1ee4 100644
--- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
+++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
@@ -21,6 +21,8 @@ package org.apache.stratos.metadata.service.registry;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.concurrent.locks.ReadWriteLock;
+import org.apache.stratos.metadata.service.MetadataTopologyEventReceiver;
 import org.apache.stratos.metadata.service.definition.Property;
 import org.wso2.carbon.context.PrivilegedCarbonContext;
 import org.wso2.carbon.registry.core.Registry;
@@ -33,7 +35,6 @@ import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Context;
 import java.util.*;
 
-
 /**
  * Carbon registry implementation
  */
@@ -44,8 +45,12 @@ public class MetadataApiRegistry implements DataStore {
     private static Log log = LogFactory.getLog(MetadataApiRegistry.class);
     @Context
     HttpServletRequest httpServletRequest;
+    private static final Map<String, ReadWriteLock> applicationIdToReadWriteLockMap = new HashMap<>();
+    private MetadataTopologyEventReceiver metadataTopologyEventReceiver;
 
     public MetadataApiRegistry() {
+        metadataTopologyEventReceiver = new MetadataTopologyEventReceiver();
+        metadataTopologyEventReceiver.execute();
     }
 
     public List<Property> getApplicationProperties(String applicationName) throws RegistryException {
@@ -90,7 +95,6 @@ public class MetadataApiRegistry implements DataStore {
         Registry tempRegistry = getRegistry();
         String resourcePath = mainResource + applicationName + "/" + clusterId;
 
-
         if (!tempRegistry.resourceExists(resourcePath)) {
             return null;
         }
@@ -123,7 +127,7 @@ public class MetadataApiRegistry implements DataStore {
     public void addPropertyToApplication(String applicationId, Property property) throws RegistryException {
         Registry registry = getRegistry();
         String resourcePath = mainResource + applicationId;
-
+        acquireWriteLock(applicationId);
         try {
             // We are using only super tenant registry to persist
             PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
@@ -135,7 +139,7 @@ public class MetadataApiRegistry implements DataStore {
             } else {
                 nodeResource = registry.newCollection();
                 if (log.isDebugEnabled()) {
-                    log.debug("Registry resource is create at path for application: " + nodeResource.getPath());
+                    log.debug("Registry resource created for application: " + applicationId);
                 }
             }
 
@@ -143,34 +147,39 @@ public class MetadataApiRegistry implements DataStore {
             for (String value : property.getValues()) {
                 if (!propertyValueExist(nodeResource, property.getKey(), value)) {
                     updated = true;
-                    log.info(String.format("Registry property is added: [resource-path] %s " +
-                                    "[Property Name] %s [Property Value] %s",
-                            resourcePath, property.getKey(), value));
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Registry property is added: [resource-path] %s "
+                                        + "[Property Name] %s [Property Value] %s", resourcePath, property.getKey(),
+                                value));
+                    }
                     nodeResource.addProperty(property.getKey(), value);
                 } else {
-                    log.info(String.format("Property value already exist property=%s value=%s", property.getKey(), value));
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Property value already exist property=%s value=%s", property.getKey(),
+                                value));
+                    }
                 }
             }
-
             if (updated) {
                 registry.put(resourcePath, nodeResource);
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format(
+                            "Registry property is persisted: [resource-path] %s [Property Name] %s [Property Values] "
+                                    + "%s", resourcePath, property.getKey(), Arrays.asList(property.getValues())));
+                }
             }
-            //registry.commitTransaction();
         } catch (Exception e) {
             String msg = "Failed to persist properties in registry: " + resourcePath;
-            //registry.rollbackTransaction();
             log.error(msg, e);
             throw new RegistryException(msg, e);
+        } finally {
+            releaseWriteLock(applicationId);
         }
     }
 
     private boolean propertyValueExist(Resource nodeResource, String key, String value) {
         List<String> properties = nodeResource.getPropertyValues(key);
-        if (properties == null) {
-            return false;
-        } else {
-            return properties.contains(value);
-        }
+        return properties != null && properties.contains(value);
 
     }
 
@@ -178,14 +187,12 @@ public class MetadataApiRegistry implements DataStore {
             throws RegistryException {
         Registry registry = getRegistry();
         String resourcePath = mainResource + applicationId;
-
-        // We are using only super tenant registry to persist
-        PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
-        ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
-        ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
-
+        acquireWriteLock(applicationId);
         try {
-            registry.beginTransaction();
+            // We are using only super tenant registry to persist
+            PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
+            ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
+            ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
             Resource nodeResource;
             if (registry.resourceExists(resourcePath)) {
                 nodeResource = registry.get(resourcePath);
@@ -195,20 +202,13 @@ public class MetadataApiRegistry implements DataStore {
             }
             nodeResource.removePropertyValue(propertyName, valueToRemove);
             registry.put(resourcePath, nodeResource);
-            registry.commitTransaction();
-
-            log.info(String.format("Application %s property %s value %s is removed from metadata ",
-                    applicationId, propertyName, valueToRemove));
+            log.info(String.format("Application %s property %s value %s is removed from metadata ", applicationId,
+                    propertyName, valueToRemove));
             return true;
-        }catch (Exception e){
-            try {
-                registry.rollbackTransaction();
-            } catch (RegistryException e1) {
-                if (log.isErrorEnabled()) {
-                    log.error("Could not rollback transaction", e1);
-                }
-            }
+        } catch (Exception e) {
             throw new RegistryException("Could not remove registry resource: [resource-path] " + resourcePath, e);
+        } finally {
+            releaseWriteLock(applicationId);
         }
     }
 
@@ -220,30 +220,33 @@ public class MetadataApiRegistry implements DataStore {
      * @param property
      * @throws RegistryException
      */
-    public void addPropertyToCluster(String applicationId, String clusterId, Property property) throws RegistryException {
+    public void addPropertyToCluster(String applicationId, String clusterId, Property property)
+            throws RegistryException {
         Registry registry = getRegistry();
         String resourcePath = mainResource + applicationId + "/" + clusterId;
-
-        // We are using only super tenant registry to persist
-        PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
-        ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
-        ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
-
-        Resource nodeResource = null;
-        if (registry.resourceExists(resourcePath)) {
-            nodeResource = registry.get(resourcePath);
-        } else {
-            nodeResource = registry.newResource();
-            if (log.isDebugEnabled()) {
-                log.debug("Registry resource is create at path for cluster" + nodeResource.getPath() + " for cluster");
+        acquireWriteLock(applicationId);
+        try {
+            // We are using only super tenant registry to persist
+            PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
+            ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
+            ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
+            Resource nodeResource = null;
+            if (registry.resourceExists(resourcePath)) {
+                nodeResource = registry.get(resourcePath);
+            } else {
+                nodeResource = registry.newResource();
+                if (log.isDebugEnabled()) {
+                    log.debug("Registry resource created for cluster" + clusterId);
+                }
             }
+            nodeResource.setProperty(property.getKey(), Arrays.asList(property.getValues()));
+            registry.put(resourcePath, nodeResource);
+            log.info(String.format(
+                    "Registry property is persisted: [resource-path] %s [Property Name] %s [Property Values] %s",
+                    resourcePath, property.getKey(), Arrays.asList(property.getValues())));
+        } finally {
+            releaseWriteLock(applicationId);
         }
-
-        nodeResource.setProperty(property.getKey(), Arrays.asList(property.getValues()));
-        registry.put(resourcePath, nodeResource);
-
-        log.info(String.format("Registry property is persisted: [resource-path] %s [Property Name] %s [Property Values] %s",
-                resourcePath, property.getKey(), Arrays.asList(property.getValues())));
     }
 
     private UserRegistry getRegistry() throws RegistryException {
@@ -262,31 +265,24 @@ public class MetadataApiRegistry implements DataStore {
         if (StringUtils.isBlank(applicationId)) {
             throw new IllegalArgumentException("Application ID can not be null");
         }
-
-        // We are using only super tenant registry to persist
-        PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
-        ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
-        ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
         Registry registry = getRegistry();
         String resourcePath = mainResource + applicationId;
+        acquireWriteLock(applicationId);
         try {
-            registry.beginTransaction();
+            // We are using only super tenant registry to persist
+            PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
+            ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
+            ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
             if (registry.resourceExists(resourcePath)) {
                 registry.delete(resourcePath);
                 log.info(String.format("Application [application-id ] properties removed from registry %s",
                         applicationId));
             }
-            registry.commitTransaction();
             return true;
-        }catch (Exception e){
-            try {
-                registry.rollbackTransaction();
-            } catch (RegistryException e1) {
-                if (log.isErrorEnabled()) {
-                    log.error("Could not rollback transaction", e1);
-                }
-            }
+        } catch (Exception e) {
             throw new RegistryException("Could not remove registry resource: [resource-path] " + resourcePath, e);
+        } finally {
+            releaseWriteLock(applicationId);
         }
     }
 
@@ -294,30 +290,81 @@ public class MetadataApiRegistry implements DataStore {
             throws org.wso2.carbon.registry.api.RegistryException {
         Registry registry = getRegistry();
         String resourcePath = mainResource + applicationId;
-        // We are using only super tenant registry to persist
-        PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
-        ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
-        ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
-
+        acquireWriteLock(applicationId);
         Resource nodeResource;
-        if (registry.resourceExists(resourcePath)) {
-            nodeResource = registry.get(resourcePath);
-            if (nodeResource.getProperty(propertyName) == null) {
-                log.info(String.format("[application-id] %s does not have a property [property-name] %s ", applicationId,
-                        propertyName));
-                return false;
+        try {
+            // We are using only super tenant registry to persist
+            PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
+            ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
+            ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
+            if (registry.resourceExists(resourcePath)) {
+                nodeResource = registry.get(resourcePath);
+                if (nodeResource.getProperty(propertyName) == null) {
+                    log.info(String.format("[application-id] %s does not have a property [property-name] %s ",
+                            applicationId, propertyName));
+                    return false;
+                } else {
+                    nodeResource.removeProperty(propertyName);
+                    registry.put(resourcePath, nodeResource);
+                }
             } else {
-                nodeResource.removeProperty(propertyName);
-                registry.put(resourcePath, nodeResource);
+                log.error("Registry resource not not found at " + resourcePath);
+                return false;
             }
+
+            log.info(String.format("Application [application-id] %s property [property-name] %s removed from Registry ",
+                    applicationId, propertyName));
+            return true;
+        } finally {
+            releaseWriteLock(applicationId);
+        }
+    }
+
+    public void acquireReadLock(String applicationId) {
+        if (applicationIdToReadWriteLockMap.get(applicationId) == null) {
+            throw new RuntimeException(
+                    String.format("Invalid application [application-id] %s not found. Failed to acquire read lock.",
+                            applicationId));
         } else {
-            log.error("Registry resource not not found at " + resourcePath);
-            return false;
+            applicationIdToReadWriteLockMap.get(applicationId).acquireReadLock();
         }
+    }
 
-        log.info(String.format("Application [application-id] %s property [property-name] %s removed from Registry ",
-                applicationId, propertyName));
-        return true;
+    public void acquireWriteLock(String applicationId) {
+        if (applicationIdToReadWriteLockMap.get(applicationId) == null) {
+            throw new RuntimeException(
+                    String.format("Invalid application [application-id] %s not found. Failed to acquire write lock.",
+                            applicationId));
+        } else {
+            applicationIdToReadWriteLockMap.get(applicationId).acquireWriteLock();
+        }
     }
 
+    public void releaseReadLock(String applicationId) {
+        if (applicationIdToReadWriteLockMap.get(applicationId) == null) {
+            throw new RuntimeException(
+                    String.format("Invalid application [application-id] %s not found. Failed to release read lock.",
+                            applicationId));
+        } else {
+            applicationIdToReadWriteLockMap.get(applicationId).releaseReadLock();
+        }
+    }
+
+    public void releaseWriteLock(String applicationId) {
+        if (applicationIdToReadWriteLockMap.get(applicationId) == null) {
+            throw new RuntimeException(
+                    String.format("Invalid application [application-id] %s not found. Failed to release write lock.",
+                            applicationId));
+        } else {
+            applicationIdToReadWriteLockMap.get(applicationId).releaseWriteLock();
+        }
+    }
+
+    public static Map<String, ReadWriteLock> getApplicationIdToReadWriteLockMap() {
+        return applicationIdToReadWriteLockMap;
+    }
+
+    public void stopTopologyReceiver() {
+        metadataTopologyEventReceiver.terminate();
+    }
 }