You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/08 05:31:36 UTC

git commit: Fixed tenant subscription publishing/receiving logic

Updated Branches:
  refs/heads/master 2dbf5598a -> 2fbd0fd38


Fixed tenant subscription publishing/receiving logic


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/2fbd0fd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/2fbd0fd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/2fbd0fd3

Branch: refs/heads/master
Commit: 2fbd0fd380a1c617eef64128758ad87d4ecb7c3c
Parents: 2dbf559
Author: Imesh Gunaratne <im...@apache.org>
Authored: Sun Dec 8 10:01:20 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sun Dec 8 10:01:20 2013 +0530

----------------------------------------------------------------------
 .../mgt/publisher/TenantSynzhronizerTask.java   |  26 ++++-
 .../balancer/LoadBalancerTenantReceiver.java    | 105 +++++++++++--------
 .../balancer/LoadBalancerTopologyReceiver.java  |  64 ++---------
 .../stratos/load/balancer/RequestDelegator.java |  14 +--
 .../conf/LoadBalancerConfiguration.java         |  17 +--
 .../context/LoadBalancerContextUtil.java        |  73 +++++++++++++
 .../sample/configuration/loadbalancer2.conf     |   4 +-
 .../stratos/messaging/domain/tenant/Tenant.java |   5 +
 .../distribution/src/main/conf/log4j.properties |   2 +-
 9 files changed, 192 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
index 62c5103..d777c2d 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
@@ -21,7 +21,9 @@ package org.apache.stratos.adc.mgt.publisher;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
 import org.apache.stratos.adc.mgt.internal.DataHolder;
+import org.apache.stratos.adc.mgt.utils.PersistenceManager;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
@@ -48,20 +50,34 @@ public class TenantSynzhronizerTask implements Task {
     @Override
     public void execute() {
         try {
-            if(log.isInfoEnabled()) {
+            if (log.isInfoEnabled()) {
                 log.info(String.format("Publishing complete tenant event"));
             }
+            Tenant tenant;
             List<Tenant> tenants = new ArrayList<Tenant>();
             TenantManager tenantManager = DataHolder.getRealmService().getTenantManager();
             org.wso2.carbon.user.api.Tenant[] carbonTenants = tenantManager.getAllTenants();
-            for(org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) {
-                tenants.add(new Tenant(carbonTenant.getId(), carbonTenant.getDomain()));
+            for (org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) {
+                // Create tenant
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Tenant found: [tenant-id] %d [tenant-domain] %s", carbonTenant.getId(), carbonTenant.getDomain()));
+                }
+                tenant = new Tenant(carbonTenant.getId(), carbonTenant.getDomain());
+                // Add subscriptions
+                List<CartridgeSubscriptionInfo> subscriptions = PersistenceManager.getSubscriptionsForTenant(tenant.getTenantId());
+                for (CartridgeSubscriptionInfo subscription : subscriptions) {
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Tenant subscription found: [tenant-id] %d [tenant-domain] %s [service] %s",
+                                   carbonTenant.getId(), carbonTenant.getDomain(), subscription.getCartridge()));
+                    }
+                    tenant.addServiceSubscription(subscription.getCartridge());
+                }
+                tenants.add(tenant);
             }
             CompleteTenantEvent event = new CompleteTenantEvent(tenants);
             EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
             eventPublisher.publish(event);
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("Could not publish complete tenant event", e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
index ad17ec9..dc6e402 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
@@ -22,11 +22,14 @@ package org.apache.stratos.load.balancer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.load.balancer.context.LoadBalancerContext;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
 import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
 import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent;
+import org.apache.stratos.messaging.listener.tenant.CompleteTenantEventListener;
 import org.apache.stratos.messaging.listener.tenant.TenantSubscribedEventListener;
 import org.apache.stratos.messaging.listener.tenant.TenantUnSubscribedEventListener;
 import org.apache.stratos.messaging.message.processor.tenant.TenantMessageProcessorChain;
@@ -59,64 +62,80 @@ public class LoadBalancerTenantReceiver implements Runnable {
 
     private TenantMessageProcessorChain createEventProcessorChain() {
         TenantMessageProcessorChain messageProcessorChain = new TenantMessageProcessorChain();
-        messageProcessorChain.addEventListener(new TenantSubscribedEventListener() {
+        messageProcessorChain.addEventListener(new CompleteTenantEventListener() {
             @Override
             protected void onEvent(Event event) {
-                TenantSubscribedEvent tenantSubscribedEvent = (TenantSubscribedEvent) event;
-
-                // Find cluster of tenant
-                Cluster cluster = findCluster(tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId());
-                if(cluster != null) {
-                    for(String hostName : cluster.getHostNames()) {
-                        // Add hostName, tenantId, cluster to multi-tenant map
-                        Map<Integer, Cluster> clusterMap = LoadBalancerContext.getInstance().getMultiTenantClusters(hostName);
-                        if(clusterMap == null) {
-                            clusterMap = new HashMap<Integer, Cluster>();
-                            clusterMap.put(tenantSubscribedEvent.getTenantId(), cluster);
-                            LoadBalancerContext.getInstance().addMultiTenantClusters(hostName, clusterMap);
-                        }
-                        else {
-                            clusterMap.put(tenantSubscribedEvent.getTenantId(), cluster);
-                        }
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Cluster added to multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
-                                       hostName, tenantSubscribedEvent.getTenantId(), cluster.getClusterId()));
-                        }
-                    }
-                }
-                else {
-                    if(log.isErrorEnabled()) {
-                        log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
-                                tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId()));
+                CompleteTenantEvent completeTenantEvent = (CompleteTenantEvent) event;
+                for(Tenant tenant : completeTenantEvent.getTenants()) {
+                    for(String serviceName : tenant.getServiceSubscriptions()) {
+                        addTenantSubscriptionToLbContext(serviceName, tenant.getTenantId());
                     }
                 }
             }
         });
+        messageProcessorChain.addEventListener(new TenantSubscribedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                TenantSubscribedEvent tenantSubscribedEvent = (TenantSubscribedEvent) event;
+                addTenantSubscriptionToLbContext(tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId());
+            }
+        });
         messageProcessorChain.addEventListener(new TenantUnSubscribedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 TenantUnSubscribedEvent tenantUnSubscribedEvent = (TenantUnSubscribedEvent) event;
+                removeTenantSubscriptionFromLbContext(tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId());
+            }
+        });
+        return messageProcessorChain;
+    }
 
-                // Find cluster of tenant
-                Cluster cluster = findCluster(tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId());
-                if(cluster != null) {
-                    for(String hostName : cluster.getHostNames()) {
-                        LoadBalancerContext.getInstance().removeMultiTenantClusters(hostName);
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Cluster removed from multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
-                                      hostName, tenantUnSubscribedEvent.getTenantId(), cluster.getClusterId()));
-                        }
-                    }
+    private void addTenantSubscriptionToLbContext(String serviceName, int tenantId) {
+        // Find cluster of tenant
+        Cluster cluster = findCluster(serviceName, tenantId);
+        if(cluster != null) {
+            for(String hostName : cluster.getHostNames()) {
+                // Add hostName, tenantId, cluster to multi-tenant map
+                Map<Integer, Cluster> clusterMap = LoadBalancerContext.getInstance().getMultiTenantClusters(hostName);
+                if(clusterMap == null) {
+                    clusterMap = new HashMap<Integer, Cluster>();
+                    clusterMap.put(tenantId, cluster);
+                    LoadBalancerContext.getInstance().addMultiTenantClusters(hostName, clusterMap);
                 }
                 else {
-                    if(log.isErrorEnabled()) {
-                        log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
-                                tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId()));
-                    }
+                    clusterMap.put(tenantId, cluster);
+                }
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster added to multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
+                               hostName, tenantId, cluster.getClusterId()));
                 }
             }
-        });
-        return messageProcessorChain;
+        }
+        else {
+            if(log.isErrorEnabled()) {
+                log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
+                           serviceName, tenantId));
+            }
+        }
+    }
+
+    private void removeTenantSubscriptionFromLbContext(String serviceName, int tenantId) {
+        // Find cluster of tenant
+        Cluster cluster = findCluster(serviceName, tenantId);
+        if (cluster != null) {
+            for (String hostName : cluster.getHostNames()) {
+                LoadBalancerContext.getInstance().removeMultiTenantClusters(hostName);
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster removed from multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
+                            hostName, tenantId, cluster.getClusterId()));
+                }
+            }
+        } else {
+            if (log.isErrorEnabled()) {
+                log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
+                        serviceName, tenantId));
+            }
+        }
     }
 
     private Cluster findCluster(String serviceName, int tenantId) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
index 55ff817..bd5a0b1 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
@@ -22,10 +22,10 @@ package org.apache.stratos.load.balancer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.load.balancer.context.LoadBalancerContext;
+import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.ServiceType;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
 import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
@@ -85,7 +85,7 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                     for (Service service : TopologyManager.getTopology().getServices()) {
                         for (Cluster cluster : service.getClusters()) {
                             if (hasActiveMembers(cluster)) {
-                                addClusterToLbContext(cluster);
+                                LoadBalancerContextUtil.addClusterToLbContext(cluster);
                             }
                         }
                     }
@@ -113,10 +113,9 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                     MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
                     Cluster cluster = LoadBalancerContext.getInstance().getCluster(memberActivatedEvent.getClusterId());
                     if (cluster != null) {
-                        addClusterToLbContext(cluster);
-                    }
-                    else {
-                        if(log.isWarnEnabled()) {
+                        LoadBalancerContextUtil.addClusterToLbContext(cluster);
+                    } else {
+                        if (log.isWarnEnabled()) {
                             log.warn(String.format("Cluster not found in cluster id cluster map: [cluster] %s", memberActivatedEvent.getClusterId()));
                         }
                     }
@@ -136,11 +135,10 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                     Cluster cluster = LoadBalancerContext.getInstance().getCluster(clusterRemovedEvent.getClusterId());
                     if (cluster != null) {
                         for (String hostName : cluster.getHostNames()) {
-                            removeClusterFromLbContext(hostName);
+                            LoadBalancerContextUtil.removeClusterFromLbContext(hostName);
                         }
-                    }
-                    else {
-                        if(log.isWarnEnabled()) {
+                    } else {
+                        if (log.isWarnEnabled()) {
                             log.warn(String.format("Cluster not found in cluster id cluster map: [cluster] %s", clusterRemovedEvent.getClusterId()));
                         }
                     }
@@ -161,12 +159,11 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                     if (service != null) {
                         for (Cluster cluster : service.getClusters()) {
                             for (String hostName : cluster.getHostNames()) {
-                                removeClusterFromLbContext(hostName);
+                                LoadBalancerContextUtil.removeClusterFromLbContext(hostName);
                             }
                         }
-                    }
-                    else {
-                        if(log.isWarnEnabled()) {
+                    } else {
+                        if (log.isWarnEnabled()) {
                             log.warn(String.format("Service not found in topology: [service] %s", serviceRemovedEvent.getServiceName()));
                         }
                     }
@@ -178,45 +175,6 @@ public class LoadBalancerTopologyReceiver implements Runnable {
         return processorChain;
     }
 
-    private void addClusterToLbContext(Cluster cluster) {
-        // Add cluster to Map<ClusterId, Cluster>
-        LoadBalancerContext.getInstance().addCluster(cluster);
-
-        Service service = TopologyManager.getTopology().getService(cluster.getServiceName());
-        if (service.getServiceType() == ServiceType.SingleTenant) {
-            // Add cluster to SingleTenantClusterMap
-            for (String hostName : cluster.getHostNames()) {
-                if (!LoadBalancerContext.getInstance().singleTenantClusterExists((hostName))) {
-                    LoadBalancerContext.getInstance().addSingleTenantCluster(hostName, cluster);
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster added to single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
-                    }
-                }
-            }
-        }
-        // MultiTenantClusterMap is updated by tenant receiver.
-    }
-
-    private void removeClusterFromLbContext(String clusterId) {
-        Cluster cluster = LoadBalancerContext.getInstance().getCluster(clusterId);
-        Service service = TopologyManager.getTopology().getService(cluster.getServiceName());
-        if (service.getServiceType() == ServiceType.SingleTenant) {
-            // Remove cluster from SingleTenantClusterMap
-            for (String hostName : cluster.getHostNames()) {
-                if (LoadBalancerContext.getInstance().singleTenantClusterExists(hostName)) {
-                    LoadBalancerContext.getInstance().removeSingleTenantCluster(hostName);
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster removed from single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
-                    }
-                }
-            }
-        }
-        // MultiTenantClusterMap is updated by tenant receiver.
-
-        // Remove cluster from Map<ClusterId,Cluster>
-        LoadBalancerContext.getInstance().removeCluster(clusterId);
-    }
-
     /**
      * Terminate load balancer topology receiver thread.
      */

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
index 9b0a8a5..be09b31 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.load.balancer.algorithm.AlgorithmContext;
 import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithm;
+import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
 import org.apache.stratos.load.balancer.context.LoadBalancerContext;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Member;
@@ -116,14 +117,13 @@ public class RequestDelegator {
     }
 
     public boolean isTargetHostValid(String hostName) {
-        try {
-            if (hostName == null)
-                return false;
+        if (hostName == null)
+            return false;
 
-            TopologyManager.acquireReadLock();
-            return LoadBalancerContext.getInstance().clusterExists(hostName);
-        } finally {
-            TopologyManager.releaseReadLock();
+        boolean valid = LoadBalancerContext.getInstance().singleTenantClusterExists(hostName);
+        if ((!valid) && (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled())) {
+            valid = LoadBalancerContext.getInstance().multiTenantClustersExists(hostName);
         }
+        return valid;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
index c94d55f..ae811a0 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
@@ -28,6 +28,7 @@ import org.apache.stratos.load.balancer.conf.structure.Node;
 import org.apache.stratos.load.balancer.conf.structure.NodeBuilder;
 import org.apache.stratos.load.balancer.conf.util.Constants;
 import org.apache.stratos.load.balancer.context.LoadBalancerContext;
+import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil;
 import org.apache.stratos.load.balancer.exception.InvalidConfigurationException;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
@@ -238,7 +239,7 @@ public class LoadBalancerConfiguration {
 
         public LoadBalancerConfiguration readFromFile() {
             String configFilePath = System.getProperty("loadbalancer.conf.file");
-            if(configFilePath == null){
+            if (configFilePath == null) {
                 throw new RuntimeException("loadbalancer.conf.file' system property is not set");
             }
 
@@ -302,8 +303,8 @@ public class LoadBalancerConfiguration {
                 configuration.setMultiTenancyEnabled(Boolean.parseBoolean(multiTenancyEnabled));
             }
 
-            // Read mb ip, port, topology service filter and topology cluster filter if topology event listener is enabled
-            if (configuration.isTopologyEventListenerEnabled()) {
+            // Read mb ip and port
+            if (configuration.isTopologyEventListenerEnabled() || configuration.isMultiTenancyEnabled()) {
                 String mbIp = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_MB_IP);
                 validateRequiredPropertyInNode(Constants.CONF_PROPERTY_MB_IP, mbIp, "loadbalancer");
                 configuration.setMbIp(mbIp);
@@ -311,7 +312,10 @@ public class LoadBalancerConfiguration {
                 String mbPort = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_MB_PORT);
                 validateRequiredPropertyInNode(Constants.CONF_PROPERTY_MB_PORT, mbPort, "loadbalancer");
                 configuration.setMbPort(Integer.parseInt(mbPort));
+            }
 
+            // Read topology service filter and topology cluster filter
+            if (configuration.isTopologyEventListenerEnabled()) {
                 String serviceFilter = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_TOPOLOGY_SERVICE_FILTER);
                 if (StringUtils.isNotBlank(serviceFilter)) {
                     configuration.setTopologyServiceFilter(serviceFilter);
@@ -349,8 +353,7 @@ public class LoadBalancerConfiguration {
                 validateRequiredPropertyInNode(Constants.CONF_PROPERTY_TENANT_IDENTIFIER_REGEX, tenantIdentifierRegex, "loadbalancer");
                 try {
                     Pattern.compile(tenantIdentifierRegex);
-                }
-                catch (Exception e) {
+                } catch (Exception e) {
                     throw new InvalidConfigurationException(String.format("Invalid tenant identifier regular expression: %s", tenantIdentifierRegex), e);
                 }
                 configuration.setTenantIdentifierRegex(tenantIdentifierRegex);
@@ -419,8 +422,8 @@ public class LoadBalancerConfiguration {
                         }
                         service.addCluster(cluster);
 
-                        // Add cluster to load balancer context Map<Hostname,Cluster>
-                        LoadBalancerContext.getInstance().addCluster(cluster);
+                        // Add cluster to load balancer context
+                        LoadBalancerContextUtil.addClusterToLbContext(cluster);
                     }
 
                     // Add service to topology manager

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java
new file mode 100644
index 0000000..f644933
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.context;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.ServiceType;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * Load balancer context utility class.
+ */
+public class LoadBalancerContextUtil {
+    private static final Log log = LogFactory.getLog(LoadBalancerContextUtil.class);
+
+    public static void addClusterToLbContext(Cluster cluster) {
+        // Add cluster to Map<ClusterId, Cluster>
+        LoadBalancerContext.getInstance().addCluster(cluster);
+
+        Service service = TopologyManager.getTopology().getService(cluster.getServiceName());
+        if (service.getServiceType() == ServiceType.SingleTenant) {
+            // Add cluster to SingleTenantClusterMap
+            for (String hostName : cluster.getHostNames()) {
+                if (!LoadBalancerContext.getInstance().singleTenantClusterExists((hostName))) {
+                    LoadBalancerContext.getInstance().addSingleTenantCluster(hostName, cluster);
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster added to single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
+                    }
+                }
+            }
+        }
+        // MultiTenantClusterMap is updated by tenant receiver.
+    }
+
+    public static void removeClusterFromLbContext(String clusterId) {
+        Cluster cluster = LoadBalancerContext.getInstance().getCluster(clusterId);
+        Service service = TopologyManager.getTopology().getService(cluster.getServiceName());
+        if (service.getServiceType() == ServiceType.SingleTenant) {
+            // Remove cluster from SingleTenantClusterMap
+            for (String hostName : cluster.getHostNames()) {
+                if (LoadBalancerContext.getInstance().singleTenantClusterExists(hostName)) {
+                    LoadBalancerContext.getInstance().removeSingleTenantCluster(hostName);
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster removed from single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
+                    }
+                }
+            }
+        }
+        // MultiTenantClusterMap is updated by tenant receiver.
+
+        // Remove cluster from Map<ClusterId,Cluster>
+        LoadBalancerContext.getInstance().removeCluster(clusterId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
index 1a345a9..dac8f0b 100755
--- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
+++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
@@ -48,8 +48,8 @@ loadbalancer {
 
     # Message broker endpoint
     # Provide message broker ip address and port if topology-event-listener or multi-tenancy is set to true.
-    # mb-ip: localhost;
-    # mb-port: 5677;
+    mb-ip: localhost;
+    mb-port: 5677;
 
     # Topology service filter
     # Provide service names in a comma separated list to filter incoming topology events if

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
index f4e3a0c..bc4244a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
@@ -20,6 +20,7 @@
 package org.apache.stratos.messaging.domain.tenant;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -52,6 +53,10 @@ public class Tenant implements Serializable{
         this.tenantDomain = tenantDomain;
     }
 
+    public Collection<String> getServiceSubscriptions() {
+        return serviceNameMap.keySet();
+    }
+
     public boolean isServiceSubscribed(String serviceName) {
         return serviceNameMap.containsKey(serviceName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/products/load-balancer/modules/distribution/src/main/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/products/load-balancer/modules/distribution/src/main/conf/log4j.properties b/products/load-balancer/modules/distribution/src/main/conf/log4j.properties
index 1942895..3a23f46 100644
--- a/products/load-balancer/modules/distribution/src/main/conf/log4j.properties
+++ b/products/load-balancer/modules/distribution/src/main/conf/log4j.properties
@@ -111,7 +111,7 @@ log4j.appender.CARBON_LOGFILE.File=${carbon.home}/repository/logs/${instance.log
 log4j.appender.CARBON_LOGFILE.Append=true
 log4j.appender.CARBON_LOGFILE.layout=org.wso2.carbon.utils.logging.TenantAwarePatternLayout
 # ConversionPattern will be overridden by the configuration setting in the DB
-log4j.appender.CARBON_LOGFILE.layout.ConversionPattern=TID: [%T] [%S] [%d] %P%5p {%c} - %x %m {%c}%n
+log4j.appender.CARBON_LOGFILE.layout.ConversionPattern=TID: [%T] [%S] [%d] %P%5p {%c} - %x %m %n
 log4j.appender.CARBON_LOGFILE.layout.TenantPattern=%U%@%D [%T] [%S]
 log4j.appender.CARBON_LOGFILE.threshold=DEBUG