You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/11 14:20:22 UTC

[2/3] git commit: Implemented topology member filter

Implemented topology member filter


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7a5a6b81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7a5a6b81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7a5a6b81

Branch: refs/heads/master
Commit: 7a5a6b815914dcf7eaeacf7ed8ae4a6251ef3de2
Parents: a10d95f
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Dec 11 18:49:56 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Wed Dec 11 18:49:56 2013 +0530

----------------------------------------------------------------------
 .../controller/topology/TopologyBuilder.java    | 112 +++++--------------
 .../topology/TopologyEventMessageDelegator.java |  16 +--
 .../topology/TopologyEventSender.java           |  56 ++--------
 .../conf/LoadBalancerConfiguration.java         |   2 +-
 .../load/balancer/conf/util/Constants.java      |   1 +
 .../internal/LoadBalancerServiceComponent.java  |  12 +-
 .../balancer/test/RoundRobinAlgorithmTest.java  |   6 +-
 components/org.apache.stratos.messaging/pom.xml |   5 +
 .../messaging/domain/topology/Member.java       |  23 ++--
 .../event/topology/InstanceSpawnedEvent.java    |   8 ++
 .../event/topology/MemberStartedEvent.java      |   9 ++
 .../event/topology/MemberSuspendedEvent.java    |   9 ++
 .../event/topology/MemberTerminatedEvent.java   |   9 ++
 .../messaging/message/filter/MessageFilter.java |  94 ++++++++++++++++
 .../message/filter/topology/ClusterFilter.java  |  83 --------------
 .../message/filter/topology/ServiceFilter.java  |  83 --------------
 .../filter/topology/TopologyClusterFilter.java  |  65 +++++++++++
 .../filter/topology/TopologyMemberFilter.java   |  46 ++++++++
 .../filter/topology/TopologyServiceFilter.java  |  65 +++++++++++
 .../ClusterCreatedMessageProcessor.java         |  12 +-
 .../ClusterRemovedMessageProcessor.java         |  13 +--
 .../CompleteTopologyMessageProcessor.java       |  12 +-
 .../InstanceSpawnedMessageProcessor.java        |  27 +++--
 .../MemberActivatedMessageProcessor.java        |  23 +++-
 .../topology/MemberStartedMessageProcessor.java |  23 +++-
 .../MemberSuspendedMessageProcessor.java        |  23 +++-
 .../MemberTerminatedMessageProcessor.java       |  23 +++-
 .../ServiceCreatedMessageProcessor.java         |   6 +-
 .../ServiceRemovedMessageProcessor.java         |   6 +-
 .../stratos/messaging/util/Constants.java       |  14 +++
 30 files changed, 508 insertions(+), 378 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
index 39c7608..fb1fe5a 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
@@ -29,8 +29,8 @@ import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.instance.status.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.instance.status.MemberStartedEvent;
+import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
+import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
 
 import java.util.List;
 import java.util.Properties;
@@ -75,58 +75,6 @@ public class TopologyBuilder {
 
     }
 
-//    public static void handlePartitionCreated(Partition partition) {
-//
-//        Topology topology = TopologyManager.getInstance().getTopology();
-//        if (partition == null) {
-//            throw new RuntimeException(String.format("Partition is empty"));
-//        }
-//        try {
-//            TopologyManager.getInstance().acquireWriteLock();
-//            topology.addPartition(partition);
-//            TopologyManager.getInstance().updateTopology(topology);
-//        } finally {
-//            TopologyManager.getInstance().releaseWriteLock();
-//        }
-//        TopologyEventSender.sendPartitionCreatedEvent(partition);
-//
-//    }
-
-//    public static void handlePartitionUpdated(Partition newPartition, Partition oldPartition) {
-//
-//        Topology topology = TopologyManager.getInstance().getTopology();
-//        if (newPartition == null || oldPartition == null) {
-//            throw new RuntimeException(String.format("Partition is empty"));
-//        }
-//        try {
-//            TopologyManager.getInstance().acquireWriteLock();
-//            topology.removePartition(oldPartition);
-//            topology.addPartition(newPartition);
-//            TopologyManager.getInstance().updateTopology(topology);
-//        } finally {
-//            TopologyManager.getInstance().releaseWriteLock();
-//        }
-//        TopologyEventSender.sendPartitionUpdatedEvent(newPartition, oldPartition.getId());
-//
-//    }
-//
-//    public static void handlePartitionRemoved(Partition partition) {
-//
-//        Topology topology = TopologyManager.getInstance().getTopology();
-//        if (partition == null) {
-//            throw new RuntimeException(String.format("Partition is empty"));
-//        }
-//        try {
-//            TopologyManager.getInstance().acquireWriteLock();
-//            topology.removePartition(partition);
-//            TopologyManager.getInstance().updateTopology(topology);
-//        } finally {
-//            TopologyManager.getInstance().releaseWriteLock();
-//        }
-//        TopologyEventSender.sendPartitionRemovedEvent(partition);
-//    }
-
-
     public static void handleServiceRemoved(List<Cartridge> cartridgeList) {
         Topology topology = TopologyManager.getInstance().getTopology();
 
@@ -240,8 +188,7 @@ public class TopologyBuilder {
 
         try {
             TopologyManager.getInstance().acquireWriteLock();
-            Member member = new Member(serviceName, clusterId, memberId);
-            member.setPartitionId(partitionId);
+            Member member = new Member(serviceName, clusterId, partitionId, memberId);
             member.setStatus(MemberStatus.Created);
             member.setMemberIp(privateIp);
             cluster.addMember(member);
@@ -249,28 +196,28 @@ public class TopologyBuilder {
         } finally {
             TopologyManager.getInstance().releaseWriteLock();
         }
-        TopologyEventSender.sendInstanceSpawnedEvent(serviceName, clusterId, memberId, partitionId);
+        TopologyEventSender.sendInstanceSpawnedEvent(serviceName, clusterId, partitionId, memberId);
 
     }
 
-    public static void handleMemberStarted(MemberStartedEvent memberStartedEvent) {
+    public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) {
         Topology topology = TopologyManager.getInstance().getTopology();
-        Service service = topology.getService(memberStartedEvent.getServiceName());
+        Service service = topology.getService(instanceStartedEvent.getServiceName());
         if (service == null) {
             throw new RuntimeException(String.format("Service %s does not exist",
-                    memberStartedEvent.getServiceName()));
+                    instanceStartedEvent.getServiceName()));
         }
-        if (!service.clusterExists(memberStartedEvent.getClusterId())) {
+        if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
             throw new RuntimeException(String.format("Cluster %s does not exist in service %s",
-                    memberStartedEvent.getClusterId(),
-                    memberStartedEvent.getServiceName()));
+                    instanceStartedEvent.getClusterId(),
+                    instanceStartedEvent.getServiceName()));
         }
 
-        Member member = service.getCluster(memberStartedEvent.getClusterId()).
-                getMember(memberStartedEvent.getMemberId());
+        Member member = service.getCluster(instanceStartedEvent.getClusterId()).
+                getMember(instanceStartedEvent.getMemberId());
         if (member == null) {
             throw new RuntimeException(String.format("Member %s does not exist",
-                    memberStartedEvent.getMemberId()));
+                    instanceStartedEvent.getMemberId()));
         }
         try {
             TopologyManager.getInstance().acquireWriteLock();
@@ -282,39 +229,39 @@ public class TopologyBuilder {
             TopologyManager.getInstance().releaseWriteLock();
         }
         //memberStartedEvent.
-        TopologyEventSender.sendMemberStartedEvent(memberStartedEvent);
+        TopologyEventSender.sendMemberStartedEvent(instanceStartedEvent);
     }
 
-    public static void handleMemberActivated(MemberActivatedEvent memberActivatedEvent) {
+    public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) {
         Topology topology = TopologyManager.getInstance().getTopology();
-        Service service = topology.getService(memberActivatedEvent.getServiceName());
+        Service service = topology.getService(instanceActivatedEvent.getServiceName());
         if (service == null) {
             throw new RuntimeException(String.format("Service %s does not exist",
-                                                     memberActivatedEvent.getServiceName()));
+                                                     instanceActivatedEvent.getServiceName()));
         }
         
-        Cluster cluster = service.getCluster(memberActivatedEvent.getClusterId());
+        Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId());
         if (cluster == null) {
             throw new RuntimeException(String.format("Cluster %s does not exist",
-                                                     memberActivatedEvent.getClusterId()));
+                                                     instanceActivatedEvent.getClusterId()));
         }
-        Member member = cluster.getMember(memberActivatedEvent.getMemberId());
+        Member member = cluster.getMember(instanceActivatedEvent.getMemberId());
 
         if (member == null) {
             throw new RuntimeException(String.format("Member %s does not exist",
-                    memberActivatedEvent.getMemberId()));
+                    instanceActivatedEvent.getMemberId()));
         }
 
-        org.apache.stratos.messaging.event.topology.MemberActivatedEvent memberActivatedEventTopology =
-                new org.apache.stratos.messaging.event.topology.MemberActivatedEvent(memberActivatedEvent.getServiceName(),
-                        memberActivatedEvent.getClusterId(), memberActivatedEvent.getMemberId());
+        org.apache.stratos.messaging.event.topology.MemberActivatedEvent memberActivatedEvent =
+                new org.apache.stratos.messaging.event.topology.MemberActivatedEvent(instanceActivatedEvent.getServiceName(),
+                        instanceActivatedEvent.getClusterId(), instanceActivatedEvent.getPartitionId(), instanceActivatedEvent.getMemberId());
 
         try {
             TopologyManager.getInstance().acquireWriteLock();
             member.setStatus(MemberStatus.Activated);
             log.info("member started event adding status activated");
             Cartridge cartridge = FasterLookUpDataHolder.getInstance().
-                    getCartridge(memberActivatedEvent.getServiceName());
+                    getCartridge(instanceActivatedEvent.getServiceName());
 
             List<PortMapping> portMappings = cartridge.getPortMappings();
             Port port;
@@ -324,17 +271,16 @@ public class TopologyBuilder {
                         Integer.parseInt(portMapping.getPort()),
                         Integer.parseInt(portMapping.getProxyPort()));
                 member.addPort(port);
-                memberActivatedEventTopology.addPort(port);
+                memberActivatedEvent.addPort(port);
             }
-            
-            memberActivatedEventTopology.setPartitionId(member.getPartitionId());
-            memberActivatedEventTopology.setMemberIp(member.getMemberIp());
+
+            memberActivatedEvent.setMemberIp(member.getMemberIp());
             TopologyManager.getInstance().updateTopology(topology);
 
         } finally {
             TopologyManager.getInstance().releaseWriteLock();
         }
-        TopologyEventSender.sendMemberActivatedEvent(memberActivatedEventTopology);
+        TopologyEventSender.sendMemberActivatedEvent(memberActivatedEvent);
     }
 
     public static void handleMemberTerminated(String serviceName, String clusterId, String memberId) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java
index 9a975bb..098f7d5 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java
@@ -20,8 +20,8 @@ package org.apache.stratos.cloud.controller.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.instance.status.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.instance.status.MemberStartedEvent;
+import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
+import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
 import org.apache.stratos.messaging.util.Constants;
 import org.apache.stratos.messaging.util.Util;
 
@@ -45,12 +45,12 @@ public class TopologyEventMessageDelegator implements Runnable {
 
                 log.info(String.format("Event message received from queue: %s", type));
 
-                if(MemberStartedEvent.class.getName().equals(type)) {
-                     TopologyBuilder.handleMemberStarted((MemberStartedEvent)Util.
-                                                        jsonToObject(json, MemberStartedEvent.class));
-                } else if(MemberActivatedEvent.class.getName().equals(type)) {
-                     TopologyBuilder.handleMemberActivated((MemberActivatedEvent) Util.
-                                                        jsonToObject(json, MemberActivatedEvent.class));
+                if(InstanceStartedEvent.class.getName().equals(type)) {
+                     TopologyBuilder.handleMemberStarted((InstanceStartedEvent)Util.
+                                                        jsonToObject(json, InstanceStartedEvent.class));
+                } else if(InstanceActivatedEvent.class.getName().equals(type)) {
+                     TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util.
+                                                        jsonToObject(json, InstanceActivatedEvent.class));
                 }
 
 				if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventSender.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventSender.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventSender.java
index 59a9e55..8b8137c 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventSender.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventSender.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.domain.topology.Port;
 import org.apache.stratos.messaging.domain.topology.ServiceType;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
 import org.apache.stratos.messaging.event.topology.*;
 
 import java.util.List;
@@ -64,43 +65,10 @@ public class TopologyEventSender {
         }
     }
 
-//     public static void sendPartitionCreatedEvent(Partition partition) {
-//         PartitionCreatedEvent partitionCreatedEvent =
-//                 new PartitionCreatedEvent(partition);
-//
-//         if(log.isInfoEnabled()) {
-//             log.info(String.format("Publishing partition created event: [partition] %s", partition.getId()));
-//         }
-//         publishEvent(partitionCreatedEvent);
-//     }
-
-//    public static void sendPartitionUpdatedEvent(Partition partition, String oldPartitionId) {
-//        PartitionUpdatedEvent partitionUpdatedEvent =
-//                new PartitionUpdatedEvent(partition.getId(),
-//                                          partition.getScope(),
-//                                          oldPartitionId);
-//        partitionUpdatedEvent.setProperties(partition.getProperties());
-//
-//        if(log.isInfoEnabled()) {
-//            log.info(String.format("Publishing partition updated event: [partition] %s", partition.getId()));
-//        }
-//        publishEvent(partitionUpdatedEvent);
-//    }
-//
-//    public static void sendPartitionRemovedEvent(Partition partition) {
-//        PartitionRemovedEvent partitionRemovedEvent = new PartitionRemovedEvent(partition.getId());
-//
-//        if(log.isInfoEnabled()) {
-//            log.info(String.format("Publishing partition removed event: [partition] %s", partition.getId()));
-//        }
-//        publishEvent(partitionRemovedEvent);
-//    }
-
     public static void sendServiceRemovedEvent(List<Cartridge> cartridgeList) {
         ServiceRemovedEvent serviceRemovedEvent;
         for(Cartridge cartridge : cartridgeList) {
-            serviceRemovedEvent = new ServiceRemovedEvent();
-            serviceRemovedEvent.setServiceName(cartridge.getType());
+            serviceRemovedEvent = new ServiceRemovedEvent(cartridge.getType());
             if(log.isInfoEnabled()) {
                 log.info(String.format("Publishing service removed event: [service] %s", serviceRemovedEvent.getServiceName()));
             }
@@ -139,23 +107,20 @@ public class TopologyEventSender {
 
     }
 
-    public static void sendInstanceSpawnedEvent(String serviceName, String clusterId, String memberId, String nodeId) {
-        InstanceSpawnedEvent instanceSpawnedEvent = new InstanceSpawnedEvent(serviceName,
-                                                                             clusterId,
-                                                                             memberId,
-                                                                             nodeId);
+    public static void sendInstanceSpawnedEvent(String serviceName, String clusterId, String partitionId, String memberId) {
+        InstanceSpawnedEvent instanceSpawnedEvent = new InstanceSpawnedEvent(serviceName, clusterId, partitionId, memberId);
         if(log.isInfoEnabled()) {
-            log.info(String.format("Publishing instance spawned event: [service] %s [cluster] %s [member] %s [node] %s", serviceName, clusterId, memberId, nodeId));
+            log.info(String.format("Publishing instance spawned event: [service] %s [cluster] %s [partition] %s [member] %s", serviceName, clusterId, partitionId, memberId));
         }
         publishEvent(instanceSpawnedEvent);
     }
 
-    public static void sendMemberStartedEvent(org.apache.stratos.messaging.event.instance.status.MemberStartedEvent memberStartedEvent) {
-        MemberStartedEvent memberStartedEventTopology = new MemberStartedEvent(memberStartedEvent.getServiceName(),
-                           memberStartedEvent.getClusterId(), memberStartedEvent.getMemberId());
+    public static void sendMemberStartedEvent(InstanceStartedEvent instanceStartedEvent) {
+        MemberStartedEvent memberStartedEventTopology = new MemberStartedEvent(instanceStartedEvent.getServiceName(),
+                           instanceStartedEvent.getClusterId(), instanceStartedEvent.getMemberId());
 
         if(log.isInfoEnabled()) {
-            log.info(String.format("Publishing member started event: [service] %s [cluster] %s [member] %s", memberStartedEvent.getServiceName(), memberStartedEvent.getClusterId(), memberStartedEvent.getMemberId()));
+            log.info(String.format("Publishing member started event: [service] %s [cluster] %s [member] %s", instanceStartedEvent.getServiceName(), instanceStartedEvent.getClusterId(), instanceStartedEvent.getMemberId()));
         }
         publishEvent(memberStartedEventTopology);
     }
@@ -178,8 +143,7 @@ public class TopologyEventSender {
     }
 
     public static void sendCompleteTopologyEvent(Topology topology) {
-        CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent();
-        completeTopologyEvent.setTopology(topology);
+        CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology);
 
         if(log.isInfoEnabled()) {
             log.info(String.format("Publishing complete topology event"));

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
index adbb60e..58d9163 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
@@ -414,7 +414,7 @@ public class LoadBalancerConfiguration {
 
                         for (Node memberNode : membersNode.getChildNodes()) {
                             String memberId = memberNode.getName();
-                            Member member = new Member(cluster.getServiceName(), cluster.getClusterId(), memberId);
+                            Member member = new Member(cluster.getServiceName(), cluster.getClusterId(), Constants.STATIC_PARTITION, memberId);
                             String ip = memberNode.getProperty(Constants.CONF_PROPERTY_IP);
                             validateRequiredPropertyInNode(Constants.CONF_PROPERTY_IP, ip, String.format("member %s", memberId));
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
index e858858..caf0d33 100755
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
@@ -58,6 +58,7 @@ public class Constants {
 
     public static final String CONF_DELIMITER_HOSTS = ",";
     public static final long DEFAULT_SESSION_TIMEOUT = 90000;
+    public static final String STATIC_PARTITION = "static-partition";
 
     /* Nginx format related constants */
     public static final String NGINX_COMMENT = "#";

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index be7133e..48387c4 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -32,8 +32,8 @@ import org.apache.stratos.load.balancer.conf.configurator.CEPConfigurator;
 import org.apache.stratos.load.balancer.conf.configurator.JndiConfigurator;
 import org.apache.stratos.load.balancer.conf.configurator.SynapseConfigurator;
 import org.apache.stratos.load.balancer.context.LoadBalancerContext;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.config.xml.MultiXMLConfigurationBuilder;
 import org.apache.synapse.core.SynapseEnvironment;
@@ -141,9 +141,9 @@ public class LoadBalancerServiceComponent {
                 }
 
                 if (log.isInfoEnabled()) {
-                    if (ServiceFilter.getInstance().isActive()) {
+                    if (TopologyServiceFilter.getInstance().isActive()) {
                         StringBuilder sb = new StringBuilder();
-                        for (String serviceName : ServiceFilter.getInstance().getIncludedServiceNames()) {
+                        for (String serviceName : TopologyServiceFilter.getInstance().getIncludedServiceNames()) {
                             if (sb.length() > 0) {
                                 sb.append(", ");
                             }
@@ -151,9 +151,9 @@ public class LoadBalancerServiceComponent {
                         }
                         log.info(String.format("Service filter activated: [services] %s", sb.toString()));
                     }
-                    if (ClusterFilter.getInstance().isActive()) {
+                    if (TopologyClusterFilter.getInstance().isActive()) {
                         StringBuilder sb = new StringBuilder();
-                        for (String clusterId : ClusterFilter.getInstance().getIncludedClusterIds()) {
+                        for (String clusterId : TopologyClusterFilter.getInstance().getIncludedClusterIds()) {
                             if (sb.length() > 0) {
                                 sb.append(", ");
                             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/RoundRobinAlgorithmTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/RoundRobinAlgorithmTest.java b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/RoundRobinAlgorithmTest.java
index ad90335..a2a877f 100755
--- a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/RoundRobinAlgorithmTest.java
+++ b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/RoundRobinAlgorithmTest.java
@@ -39,15 +39,15 @@ public class RoundRobinAlgorithmTest {
     @Test
     public final void testRoundRobinAlgorithm() {
         List<Member> members = new ArrayList<Member>();
-        Member member = new Member("service1", "cluster1", "m1");
+        Member member = new Member("service1", "cluster1", "p1", "m1");
         member.setStatus(MemberStatus.Activated);
         members.add(member);
 
-        member = new Member("service1", "cluster1", "m2");
+        member = new Member("service1", "cluster1", "p1", "m2");
         member.setStatus(MemberStatus.Activated);
         members.add(member);
 
-        member = new Member("service1", "cluster1", "m3");
+        member = new Member("service1", "cluster1", "p1", "m3");
         member.setStatus(MemberStatus.Activated);
         members.add(member);
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/pom.xml b/components/org.apache.stratos.messaging/pom.xml
index 183c322..cb227b3 100644
--- a/components/org.apache.stratos.messaging/pom.xml
+++ b/components/org.apache.stratos.messaging/pom.xml
@@ -55,6 +55,11 @@
           <version>3.1</version>
         </dependency>
         <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+            <version>3.2.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.wso2.carbon</groupId>
             <artifactId>org.wso2.carbon.logging</artifactId>
             <version>${wso2carbon.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
index a82b7a1..bb14ae3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
@@ -36,20 +36,24 @@ import java.util.Properties;
 @XmlRootElement
 public class Member implements Serializable {
     private static final long serialVersionUID = 4179661867903664661L;
-	private String serviceName;
-    private String clusterId;
-    private String memberId;
+
+    private final String serviceName;
+    private final String clusterId;
+    private final String partitionId;
+    private final String memberId;
+
     private MemberStatus status;
     private String memberIp;
     @XmlJavaTypeAdapter(MapAdapter.class)
-    private Map<String, Port> portMap;
+    private final Map<String, Port> portMap;
     @XmlJavaTypeAdapter(MapAdapter.class)
     private Properties properties;
-    private String partitionId;
+    private String lbClusterId;
 
-    public Member(String serviceName, String clusterId, String memberId) {
+    public Member(String serviceName, String clusterId, String partitionId, String memberId) {
         this.serviceName = serviceName;
         this.clusterId = clusterId;
+        this.partitionId = partitionId;
         this.memberId = memberId;
         this.portMap = new HashMap<String, Port>();
     }
@@ -128,9 +132,12 @@ public class Member implements Serializable {
         return partitionId;
     }
 
-    public void setPartitionId(String partitionId) {
-        this.partitionId = partitionId;
+    public void setLbClusterId(String lbClusterId) {
+        this.lbClusterId = lbClusterId;
     }
 
+    public String getLbClusterId() {
+        return lbClusterId;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/InstanceSpawnedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/InstanceSpawnedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/InstanceSpawnedEvent.java
index 3817b12..db88940 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/InstanceSpawnedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/InstanceSpawnedEvent.java
@@ -32,6 +32,7 @@ public class InstanceSpawnedEvent extends TopologyEvent implements Serializable
     private final String clusterId;
     private final String memberId;
     private final String partitionId;
+    private String lbClusterId;
 
     public InstanceSpawnedEvent(String serviceName, String clusterId, String partitionId, String memberId) {
         this.serviceName = serviceName;
@@ -56,4 +57,11 @@ public class InstanceSpawnedEvent extends TopologyEvent implements Serializable
         return memberId;
     }
 
+    public String getLbClusterId() {
+        return lbClusterId;
+    }
+
+    public void setLbClusterId(String lbClusterId) {
+        this.lbClusterId = lbClusterId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
index 5d57b16..380a369 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
@@ -35,6 +35,7 @@ public class MemberStartedEvent extends TopologyEvent implements Serializable {
     private MemberStatus status;
     private Properties properties;
     private String partitionId;
+    private String lbClusterId;
 
     public MemberStartedEvent(String serviceName, String clusterId, String memberId) {
         this.serviceName = serviceName;
@@ -83,4 +84,12 @@ public class MemberStartedEvent extends TopologyEvent implements Serializable {
 	public void setPartitionId(String partitionId) {
 		this.partitionId = partitionId;
 	}
+
+    public String getLbClusterId() {
+        return lbClusterId;
+    }
+
+    public void setLbClusterId(String lbClusterId) {
+        this.lbClusterId = lbClusterId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberSuspendedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberSuspendedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberSuspendedEvent.java
index 91d13c6..a58d3c7 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberSuspendedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberSuspendedEvent.java
@@ -30,6 +30,7 @@ public class MemberSuspendedEvent extends TopologyEvent implements Serializable
     private final String serviceName;
     private final String clusterId;
     private final String memberId;
+    private String lbClusterId;
 
     public MemberSuspendedEvent(String serviceName, String clusterId, String memberId) {
         this.serviceName = serviceName;
@@ -48,4 +49,12 @@ public class MemberSuspendedEvent extends TopologyEvent implements Serializable
     public String getMemberId() {
         return memberId;
     }
+
+    public String getLbClusterId() {
+        return lbClusterId;
+    }
+
+    public void setLbClusterId(String lbClusterId) {
+        this.lbClusterId = lbClusterId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberTerminatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberTerminatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberTerminatedEvent.java
index 1ae6e46..8a50b0b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberTerminatedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberTerminatedEvent.java
@@ -30,6 +30,7 @@ public class MemberTerminatedEvent extends TopologyEvent implements Serializable
     private final String serviceName;
     private final String clusterId;
     private final String memberId;
+    private String lbClusterId;
 
     public MemberTerminatedEvent(String serviceName, String clusterId, String memberId) {
         this.serviceName = serviceName;
@@ -48,4 +49,12 @@ public class MemberTerminatedEvent extends TopologyEvent implements Serializable
     public String getMemberId() {
         return memberId;
     }
+
+    public String getLbClusterId() {
+        return lbClusterId;
+    }
+
+    public void setLbClusterId(String lbClusterId) {
+        this.lbClusterId = lbClusterId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/MessageFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/MessageFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/MessageFilter.java
new file mode 100644
index 0000000..d8e0481
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/MessageFilter.java
@@ -0,0 +1,94 @@
+package org.apache.stratos.messaging.message.filter;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Message filter for filtering incoming messages in message processors.
+ */
+public class MessageFilter {
+
+    private static final Log log = LogFactory.getLog(TopologyServiceFilter.class);
+
+    private String filterName;
+    private Map<String, Map<String, Boolean>> filterMap;
+
+    public MessageFilter(String filterName) {
+        this.filterName = filterName;
+        this.filterMap = new HashMap<String, Map<String, Boolean>>();
+        init();
+    }
+
+    private Map<String, String> splitToMap(String filter) {
+        HashMap<String, String> keyValuePairMap = new HashMap<String, String>();
+        String[] keyValuePairArray = filter.split(Constants.FILTER_KEY_VALUE_SEPARATOR);
+        for(String keyValuePair : keyValuePairArray) {
+            String [] keyValueArray = keyValuePair.split(Constants.FILTER_VALUE_ASSIGN_OPERATOR);
+            if(keyValueArray.length == 2) {
+                keyValuePairMap.put(keyValueArray[0].trim(), keyValueArray[1].trim());
+            }
+        }
+        return keyValuePairMap;
+    }
+
+    /**
+     * Initialize message filter using system property.
+     */
+    public void init() {
+        String filter = System.getProperty(filterName);
+        if(StringUtils.isNotBlank(filter)) {
+            if(log.isDebugEnabled()) {
+                log.debug(String.format("Initializing filter: %s", filterName));
+            }
+
+            String propertyValue;
+            Map<String, Boolean> propertyValueMap;
+            String[] propertyValueArray;
+            Map<String, String> keyValuePairMap = splitToMap(filter);
+
+            for(String propertyName : keyValuePairMap.keySet()) {
+                propertyValue = keyValuePairMap.get(propertyName);
+                propertyValueMap = new HashMap<String, Boolean>();
+                propertyValueArray = propertyValue.split(Constants.FILTER_VALUE_SEPARATOR);
+                for(String value : propertyValueArray) {
+                    propertyValueMap.put(value, true);
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Filter property value found: [property] %s [value] %s", propertyName, value));
+                    }
+                }
+                filterMap.put(propertyName, propertyValueMap);
+            }
+        }
+    }
+
+    public boolean isActive() {
+        return filterMap.size() > 0;
+    }
+
+    public boolean included(String propertyName, String propertyValue) {
+        if(filterMap.containsKey(propertyName)) {
+            Map<String, Boolean> propertyValueMap = filterMap.get(propertyName);
+            return propertyValueMap.containsKey(propertyValue);
+        }
+        return false;
+    }
+
+    public boolean excluded(String propertyName, String propertyValue) {
+        return !included(propertyName, propertyValue);
+    }
+
+    public Collection<String> getIncludedPropertyValues(String propertyName) {
+        if(filterMap.containsKey(propertyName)) {
+            return filterMap.get(propertyName).keySet();
+        }
+        return CollectionUtils.EMPTY_COLLECTION;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java
deleted file mode 100644
index ba8e23c..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.filter.topology;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A filter to discard topology events which are not in a given cluster id list.
- */
-public class ClusterFilter {
-    private static final Log log = LogFactory.getLog(ClusterFilter.class);
-    private static volatile ClusterFilter instance;
-
-    private Map<String, Boolean> clusterIdMap;
-
-    private ClusterFilter() {
-        this.clusterIdMap = new HashMap<String, Boolean>();
-
-        String filter = System.getProperty("stratos.messaging.topology.cluster.filter");
-        if(StringUtils.isNotBlank(filter)) {
-            String[] array = filter.split(",");
-            for(String item : array) {
-                clusterIdMap.put(item, true);
-            }
-            if(log.isDebugEnabled()) {
-                log.debug(String.format("Cluster filter initialized: [included] %s", filter));
-            }
-        }
-    }
-
-    public static synchronized ClusterFilter getInstance() {
-        if (instance == null) {
-            synchronized (ClusterFilter.class){
-                if (instance == null) {
-                    instance = new ClusterFilter();
-                    if(log.isDebugEnabled()) {
-                        log.debug("Cluster filter object created");
-                    }
-                }
-            }
-        }
-        return instance;
-    }
-
-    public boolean isActive() {
-        return clusterIdMap.size() > 0;
-    }
-
-    public boolean included(String clusterId) {
-        return clusterIdMap.containsKey(clusterId);
-    }
-
-    public boolean excluded(String clusterId) {
-        return !clusterIdMap.containsKey(clusterId);
-    }
-
-    public Collection<String> getIncludedClusterIds() {
-        return clusterIdMap.keySet();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java
deleted file mode 100644
index 2c78a25..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ServiceFilter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.filter.topology;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A filter to discard topology events which are not in a given service name list.
- */
-public class ServiceFilter {
-    private static final Log log = LogFactory.getLog(ServiceFilter.class);
-    private static volatile ServiceFilter instance;
-
-    private Map<String, Boolean> serviceNameMap;
-
-    private ServiceFilter() {
-        this.serviceNameMap = new HashMap<String, Boolean>();
-
-        String filter = System.getProperty("stratos.messaging.topology.service.filter");
-        if(StringUtils.isNotBlank(filter)) {
-            String[] array = filter.split(",");
-            for(String item : array) {
-                serviceNameMap.put(item, true);
-            }
-            if(log.isDebugEnabled()) {
-                log.debug(String.format("Service filter initialized: [included] %s", filter));
-            }
-        }
-    }
-
-    public static synchronized ServiceFilter getInstance() {
-        if (instance == null) {
-            synchronized (ServiceFilter.class){
-                if (instance == null) {
-                    instance = new ServiceFilter();
-                    if(log.isDebugEnabled()) {
-                        log.debug("Service filter object created");
-                    }
-                }
-            }
-        }
-        return instance;
-    }
-
-    public boolean isActive() {
-        return serviceNameMap.size() > 0;
-    }
-
-    public boolean included(String serviceName) {
-        return serviceNameMap.containsKey(serviceName);
-    }
-
-    public boolean excluded(String serviceName) {
-        return !serviceNameMap.containsKey(serviceName);
-    }
-
-    public Collection<String> getIncludedServiceNames() {
-        return serviceNameMap.keySet();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java
new file mode 100644
index 0000000..5416744
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.filter.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.filter.MessageFilter;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.util.Collection;
+
+/**
+ * A filter to discard topology events which are not in a given cluster id list.
+ */
+public class TopologyClusterFilter extends MessageFilter {
+    private static final Log log = LogFactory.getLog(TopologyServiceFilter.class);
+    private static volatile TopologyClusterFilter instance;
+
+    public TopologyClusterFilter() {
+        super(Constants.TOPOLOGY_CLUSTER_FILTER);
+    }
+
+    public static synchronized TopologyClusterFilter getInstance() {
+        if (instance == null) {
+            synchronized (TopologyClusterFilter.class) {
+                if (instance == null) {
+                    instance = new TopologyClusterFilter();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Topology cluster filter instance created");
+                    }
+                }
+            }
+        }
+        return instance;
+    }
+
+    public boolean clusterIdIncluded(String value) {
+        return included(Constants.TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID, value);
+    }
+
+    public boolean clusterIdExcluded(String value) {
+        return excluded(Constants.TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID, value);
+    }
+
+    public Collection<String> getIncludedClusterIds() {
+        return getIncludedPropertyValues(Constants.TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java
new file mode 100644
index 0000000..1da4413
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java
@@ -0,0 +1,46 @@
+package org.apache.stratos.messaging.message.filter.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.filter.MessageFilter;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.util.Collection;
+
+/**
+ * A filter to discard topology events which are not in a load balancer cluster.
+ */
+public class TopologyMemberFilter extends MessageFilter {
+    private static final Log log = LogFactory.getLog(TopologyServiceFilter.class);
+    private static volatile TopologyMemberFilter instance;
+
+    public TopologyMemberFilter() {
+        super(Constants.TOPOLOGY_MEMBER_FILTER);
+    }
+
+    public static synchronized TopologyMemberFilter getInstance() {
+        if (instance == null) {
+            synchronized (TopologyMemberFilter.class){
+                if (instance == null) {
+                    instance = new TopologyMemberFilter();
+                    if(log.isDebugEnabled()) {
+                        log.debug("Topology member filter instance created");
+                    }
+                }
+            }
+        }
+        return instance;
+    }
+
+    public boolean lbClusterIdIncluded(String value) {
+        return included(Constants.TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID, value);
+    }
+
+    public boolean lbClusterIdExcluded(String value) {
+        return excluded(Constants.TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID, value);
+    }
+
+    public Collection<String> getIncludedLbClusterIds() {
+        return getIncludedPropertyValues(Constants.TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java
new file mode 100644
index 0000000..6ddc6e0
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.filter.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.filter.MessageFilter;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.util.Collection;
+
+/**
+ * A filter to discard topology events which are not in a given service name list.
+ */
+public class TopologyServiceFilter extends MessageFilter {
+    private static final Log log = LogFactory.getLog(TopologyServiceFilter.class);
+    private static volatile TopologyServiceFilter instance;
+
+    public TopologyServiceFilter() {
+        super(Constants.TOPOLOGY_SERVICE_FILTER);
+    }
+
+    public static synchronized TopologyServiceFilter getInstance() {
+        if (instance == null) {
+            synchronized (TopologyServiceFilter.class) {
+                if (instance == null) {
+                    instance = new TopologyServiceFilter();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Topology service filter instance created");
+                    }
+                }
+            }
+        }
+        return instance;
+    }
+
+    public boolean serviceNameIncluded(String value) {
+        return included(Constants.TOPOLOGY_SERVICE_FILTER_SERVICE_NAME, value);
+    }
+
+    public boolean serviceNameExcluded(String value) {
+        return excluded(Constants.TOPOLOGY_SERVICE_FILTER_SERVICE_NAME, value);
+    }
+
+    public Collection<String> getIncludedServiceNames() {
+        return getIncludedPropertyValues(Constants.TOPOLOGY_SERVICE_FILTER_SERVICE_NAME);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
index 8a8d394..d1795b6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -25,9 +25,9 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterCreatedMessageProcessor extends MessageProcessor {
@@ -53,8 +53,8 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
             ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
 
             // Apply service filter
-            if (ServiceFilter.getInstance().isActive()) {
-                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+            if (TopologyServiceFilter.getInstance().isActive()) {
+                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
                     // Service is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -64,8 +64,8 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
             }
 
             // Apply cluster filter
-            if (ClusterFilter.getInstance().isActive()) {
-                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+            if (TopologyClusterFilter.getInstance().isActive()) {
+                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
                     // Cluster is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
index eb1d98a..edbfed1 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -18,15 +18,14 @@
  */
 package org.apache.stratos.messaging.message.processor.topology;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterRemovedMessageProcessor extends MessageProcessor {
@@ -52,8 +51,8 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor {
             ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
 
             // Apply service filter
-            if (ServiceFilter.getInstance().isActive()) {
-                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+            if (TopologyServiceFilter.getInstance().isActive()) {
+                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
                     // Service is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -63,8 +62,8 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor {
             }
 
             // Apply cluster filter
-            if (ClusterFilter.getInstance().isActive()) {
-                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+            if (TopologyClusterFilter.getInstance().isActive()) {
+                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
                     // Cluster is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
index 5cea64e..3fa8890 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -24,9 +24,9 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class CompleteTopologyMessageProcessor extends MessageProcessor {
@@ -53,10 +53,10 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
             CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
 
             // Apply service filter
-            if (ServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().isActive()) {
                 // Add services included in service filter
                 for (Service service : event.getTopology().getServices()) {
-                    if (ServiceFilter.getInstance().included(service.getServiceName())) {
+                    if (TopologyServiceFilter.getInstance().serviceNameExcluded(service.getServiceName())) {
                         topology.addService(service);
                     } else {
                         if (log.isDebugEnabled()) {
@@ -70,10 +70,10 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
             }
 
             // Apply cluster filter
-            if (ClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().isActive()) {
                 for (Service service : topology.getServices()) {
                     for (Cluster cluster : service.getClusters()) {
-                        if (ClusterFilter.getInstance().excluded(cluster.getClusterId())) {
+                        if (TopologyClusterFilter.getInstance().clusterIdExcluded(cluster.getClusterId())) {
                             service.removeCluster(cluster.getClusterId());
                             if (log.isDebugEnabled()) {
                                 log.debug(String.format("Cluster is excluded: [cluster] %s", cluster.getClusterId()));

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
index f614782..d362cbc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
@@ -22,9 +22,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class InstanceSpawnedMessageProcessor extends MessageProcessor {
@@ -50,8 +51,8 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
             InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
 
             // Apply service filter
-            if (ServiceFilter.getInstance().isActive()) {
-                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+            if (TopologyServiceFilter.getInstance().isActive()) {
+                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
                     // Service is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -61,8 +62,8 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
             }
 
             // Apply cluster filter
-            if (ClusterFilter.getInstance().isActive()) {
-                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+            if (TopologyClusterFilter.getInstance().isActive()) {
+                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
                     // Cluster is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
@@ -71,6 +72,16 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
                 }
             }
 
+            // Apply member filter
+            if(TopologyMemberFilter.getInstance().isActive()) {
+                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
+                    }
+                    return false;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {
@@ -99,9 +110,9 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
             }
 
             // Apply changes to the topology
-            Member member = new Member(event.getServiceName(), event.getClusterId(), event.getMemberId());
+            Member member = new Member(event.getServiceName(), event.getClusterId(), event.getPartitionId(), event.getMemberId());
             member.setStatus(MemberStatus.Created);
-            member.setPartitionId(event.getPartitionId());
+            member.setLbClusterId(event.getLbClusterId());
             cluster.addMember(member);
 
             if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
index e299b7d..71853d9 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
@@ -26,9 +26,10 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberActivatedMessageProcessor extends MessageProcessor {
@@ -54,8 +55,8 @@ public class MemberActivatedMessageProcessor extends MessageProcessor {
             MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
 
             // Apply service filter
-            if (ServiceFilter.getInstance().isActive()) {
-                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+            if (TopologyServiceFilter.getInstance().isActive()) {
+                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
                     // Service is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -65,8 +66,8 @@ public class MemberActivatedMessageProcessor extends MessageProcessor {
             }
 
             // Apply cluster filter
-            if (ClusterFilter.getInstance().isActive()) {
-                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+            if (TopologyClusterFilter.getInstance().isActive()) {
+                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
                     // Cluster is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
@@ -75,6 +76,16 @@ public class MemberActivatedMessageProcessor extends MessageProcessor {
                 }
             }
 
+            // Apply member filter
+            if(TopologyMemberFilter.getInstance().isActive()) {
+                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
+                    }
+                    return false;
+                }
+            }
+
             // Validate event properties
             if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
                 throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
index eeae3d3..2753379 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
@@ -26,9 +26,10 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberStartedMessageProcessor extends MessageProcessor {
@@ -54,8 +55,8 @@ public class MemberStartedMessageProcessor extends MessageProcessor {
             MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
 
             // Apply service filter
-            if (ServiceFilter.getInstance().isActive()) {
-                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+            if (TopologyServiceFilter.getInstance().isActive()) {
+                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
                     // Service is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -65,8 +66,8 @@ public class MemberStartedMessageProcessor extends MessageProcessor {
             }
 
             // Apply cluster filter
-            if (ClusterFilter.getInstance().isActive()) {
-                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+            if (TopologyClusterFilter.getInstance().isActive()) {
+                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
                     // Cluster is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
@@ -75,6 +76,16 @@ public class MemberStartedMessageProcessor extends MessageProcessor {
                 }
             }
 
+            // Apply member filter
+            if(TopologyMemberFilter.getInstance().isActive()) {
+                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
+                    }
+                    return false;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7a5a6b81/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
index e72d62c..1aa7fef 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
@@ -26,9 +26,10 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
-import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberSuspendedMessageProcessor extends MessageProcessor {
@@ -54,8 +55,8 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor {
             MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
 
             // Apply service filter
-            if (ServiceFilter.getInstance().isActive()) {
-                if (ServiceFilter.getInstance().excluded(event.getServiceName())) {
+            if (TopologyServiceFilter.getInstance().isActive()) {
+                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
                     // Service is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
@@ -65,8 +66,8 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor {
             }
 
             // Apply cluster filter
-            if (ClusterFilter.getInstance().isActive()) {
-                if (ClusterFilter.getInstance().excluded(event.getClusterId())) {
+            if (TopologyClusterFilter.getInstance().isActive()) {
+                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
                     // Cluster is excluded, do not update topology or fire event
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
@@ -75,6 +76,16 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor {
                 }
             }
 
+            // Apply member filter
+            if(TopologyMemberFilter.getInstance().isActive()) {
+                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
+                    }
+                    return false;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {