You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/23 14:53:40 UTC

git commit: Introduced a topology event filter to discard topology events which do not belong to a given list of cluster ids

Updated Branches:
  refs/heads/master 815e9b3e1 -> e268ff886


Introduced a topology event filter to discard topology events which do not belong to a given list of cluster ids


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/e268ff88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/e268ff88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/e268ff88

Branch: refs/heads/master
Commit: e268ff88632ebeeb12be4eab723f64847976e844
Parents: 815e9b3
Author: Imesh Gunaratne <im...@apache.org>
Authored: Sat Nov 23 19:23:23 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sat Nov 23 19:23:23 2013 +0530

----------------------------------------------------------------------
 .../internal/LoadBalancerServiceComponent.java  | 11 +++
 .../message/filter/topology/ClusterFilter.java  | 83 ++++++++++++++++++++
 .../topology/ClusterCreatedEventProcessor.java  | 12 +++
 .../topology/ClusterRemovedEventProcessor.java  | 12 +++
 .../CompleteTopologyEventProcessor.java         | 38 +++++----
 .../topology/InstanceSpawnedEventProcessor.java | 12 +++
 .../topology/MemberActivatedEventProcessor.java | 12 +++
 .../topology/MemberStartedEventProcessor.java   | 12 +++
 .../topology/MemberSuspendedEventProcessor.java | 12 +++
 .../MemberTerminatedEventProcessor.java         | 12 +++
 .../distribution/src/main/bin/stratos.bat       |  2 +-
 .../distribution/src/main/bin/stratos.sh        |  1 +
 12 files changed, 203 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 4d8a28e..2c8fe29 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -26,6 +26,7 @@ import org.apache.stratos.load.balancer.LoadBalancerTopologyReceiver;
 import org.apache.stratos.load.balancer.TenantAwareLoadBalanceEndpointException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.config.xml.MultiXMLConfigurationBuilder;
@@ -119,6 +120,16 @@ public class LoadBalancerServiceComponent {
                     }
                     log.info(String.format("Service filter activated: [services] %s", sb.toString()));
                 }
+                if (ClusterFilter.getInstance().isActive()) {
+                    StringBuilder sb = new StringBuilder();
+                    for (String clusterId : ClusterFilter.getInstance().getIncludedClusterIds()) {
+                        if (sb.length() > 0) {
+                            sb.append(", ");
+                        }
+                        sb.append(clusterId);
+                    }
+                    log.info(String.format("Cluster filter activated: [clusters] %s", sb.toString()));
+                }
             }
 
             activated = true;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java
new file mode 100644
index 0000000..ba8e23c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/ClusterFilter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.filter.topology;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A filter to discard topology events which are not in a given cluster id list.
+ */
+public class ClusterFilter {
+    private static final Log log = LogFactory.getLog(ClusterFilter.class);
+    private static volatile ClusterFilter instance;
+
+    private Map<String, Boolean> clusterIdMap;
+
+    private ClusterFilter() {
+        this.clusterIdMap = new HashMap<String, Boolean>();
+
+        String filter = System.getProperty("stratos.messaging.topology.cluster.filter");
+        if(StringUtils.isNotBlank(filter)) {
+            String[] array = filter.split(",");
+            for(String item : array) {
+                clusterIdMap.put(item, true);
+            }
+            if(log.isDebugEnabled()) {
+                log.debug(String.format("Cluster filter initialized: [included] %s", filter));
+            }
+        }
+    }
+
+    public static synchronized ClusterFilter getInstance() {
+        if (instance == null) {
+            synchronized (ClusterFilter.class){
+                if (instance == null) {
+                    instance = new ClusterFilter();
+                    if(log.isDebugEnabled()) {
+                        log.debug("Cluster filter object created");
+                    }
+                }
+            }
+        }
+        return instance;
+    }
+
+    public boolean isActive() {
+        return clusterIdMap.size() > 0;
+    }
+
+    public boolean included(String clusterId) {
+        return clusterIdMap.containsKey(clusterId);
+    }
+
+    public boolean excluded(String clusterId) {
+        return !clusterIdMap.containsKey(clusterId);
+    }
+
+    public Collection<String> getIncludedClusterIds() {
+        return clusterIdMap.keySet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
index 0f9d9d3..bd3dca6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedEventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
@@ -58,6 +59,17 @@ public class ClusterCreatedEventProcessor extends MessageProcessor {
                 }
             }
 
+            // Apply cluster filter
+            if(ClusterFilter.getInstance().isActive()) {
+                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event properties
             if(StringUtils.isBlank(event.getHostName())) {
                 throw new RuntimeException("Hostname not found in cluster created event");

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
index e60172b..295923f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedEventProcessor.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
@@ -57,6 +58,17 @@ public class ClusterRemovedEventProcessor extends MessageProcessor {
                 }
             }
 
+            // Apply cluster filter
+            if(ClusterFilter.getInstance().isActive()) {
+                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event properties
             if(StringUtils.isBlank(event.getHostName())) {
                 throw new RuntimeException("Hostname not found in cluster removed event");

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
index bcd52e5..3624ef6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyEventProcessor.java
@@ -20,9 +20,11 @@ package org.apache.stratos.messaging.message.processor.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
@@ -39,35 +41,41 @@ public class CompleteTopologyEventProcessor extends MessageProcessor {
 
     @Override
     public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology)object;
+        Topology topology = (Topology) object;
 
         if (CompleteTopologyEvent.class.getName().equals(type)) {
             // Parse complete message and build event
             CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
 
             // Apply service filter
-            if(ServiceFilter.getInstance().isActive()) {
+            if (ServiceFilter.getInstance().isActive()) {
                 // Add services included in service filter
-                for(Service service : event.getTopology().getServices()) {
-                    if(ServiceFilter.getInstance().included(service.getServiceName())) {
+                for (Service service : event.getTopology().getServices()) {
+                    if (ServiceFilter.getInstance().included(service.getServiceName())) {
                         topology.addService(service);
                     }
+                    else {
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Service is excluded: [service] %s", service.getServiceName()));
+                        }
+                    }
                 }
-            }
-            else {
+            } else {
                 // Add all services
                 topology.addServices(event.getTopology().getServices());
             }
-            if(log.isDebugEnabled()) {
-                StringBuilder sb = new StringBuilder();
-                for(Service service : topology.getServices()) {
-                    if(sb.length() > 0) {
-                        sb.append(", ");
+
+            // Apply cluster filter
+            if (ClusterFilter.getInstance().isActive()) {
+                for (Service service : topology.getServices()) {
+                    for (Cluster cluster : service.getClusters()) {
+                        if (ClusterFilter.getInstance().excluded(cluster.getClusterId())) {
+                            service.removeCluster(cluster.getClusterId());
+                            if(log.isDebugEnabled()) {
+                                log.debug(String.format("Cluster is excluded: [cluster] %s", cluster.getClusterId()));
+                            }
+                        }
                     }
-                    sb.append(service.getServiceName());
-                }
-                if(sb.length() > 0) {
-                    log.debug(String.format("Services added: %s", sb.toString()));
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
index 518232d..830de83 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedEventProcessor.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
@@ -55,6 +56,17 @@ public class InstanceSpawnedEventProcessor extends MessageProcessor {
                 }
             }
 
+            // Apply cluster filter
+            if(ClusterFilter.getInstance().isActive()) {
+                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
index 81c00cf..def9545 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedEventProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
@@ -59,6 +60,17 @@ public class MemberActivatedEventProcessor extends MessageProcessor {
                 }
             }
 
+            // Apply cluster filter
+            if(ClusterFilter.getInstance().isActive()) {
+                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
index e0a3880..849efcc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedEventProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
@@ -59,6 +60,17 @@ public class MemberStartedEventProcessor extends MessageProcessor {
                 }
             }
 
+            // Apply cluster filter
+            if(ClusterFilter.getInstance().isActive()) {
+                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
index 1cbb3b1..7f4428d 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedEventProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
@@ -59,6 +60,17 @@ public class MemberSuspendedEventProcessor extends MessageProcessor {
                 }
             }
 
+            // Apply cluster filter
+            if(ClusterFilter.getInstance().isActive()) {
+                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
index 4c9e891..b5aac6d 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedEventProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
 import org.apache.stratos.messaging.util.Util;
@@ -59,6 +60,17 @@ public class MemberTerminatedEventProcessor extends MessageProcessor {
                 }
             }
 
+            // Apply cluster filter
+            if(ClusterFilter.getInstance().isActive()) {
+                if(ClusterFilter.getInstance().excluded(event.getClusterId())) {
+                    // Cluster is excluded, do not update topology or fire event
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    }
+                    return true;
+                }
+            }
+
             // Validate event against the existing topology
             Service service = topology.getService(event.getServiceName());
             if (service == null) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/products/load-balancer/modules/distribution/src/main/bin/stratos.bat
----------------------------------------------------------------------
diff --git a/products/load-balancer/modules/distribution/src/main/bin/stratos.bat b/products/load-balancer/modules/distribution/src/main/bin/stratos.bat
index be63ec7..46779cb 100644
--- a/products/load-balancer/modules/distribution/src/main/bin/stratos.bat
+++ b/products/load-balancer/modules/distribution/src/main/bin/stratos.bat
@@ -154,7 +154,7 @@ set CARBON_CLASSPATH=.\lib;%CARBON_CLASSPATH%
 
 set JAVA_ENDORSED=".\lib\endorsed";"%JAVA_HOME%\jre\lib\endorsed";"%JAVA_HOME%\lib\endorsed"
 
-set CMD_LINE_ARGS=-Xbootclasspath/a:%CARBON_XBOOTCLASSPATH% -Xms1500m -Xmx3000m -XX:PermSize=256m -XX:MaxPermSize=512m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:-UseGCOverheadLimit -XX:+CMSClassUnloadingEnabled -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%CARBON_HOME%\repository\logs\heap-dump.hprof -Dcom.sun.management.jmxremote -classpath %CARBON_CLASSPATH% %JAVA_OPTS% -Djava.endorsed.dirs=%JAVA_ENDORSED% -Dcarbon.registry.root=/ -Dcarbon.home="%CARBON_HOME%" -Dloadbalancer.conf="file:///%CARBON_HOME%/repository/conf/loadbalancer.conf" -Dwso2.server.standalone=true -Djava.command="%JAVA_HOME%\bin\java" -Djava.opts="%JAVA_OPTS%" -Djava.io.tmpdir="%CARBON_HOME%\tmp" -Dcatalina.base="%CARBON_HOME%\lib\tomcat" -Dwso2.carbon.xml=%CARBON_HOME%\repository\conf\carbon.xml -Dwso2.registry.xml="%CARBON_HOME%\repository\conf\registry.xml" -Dwso2.user.mgt.xml="%CARBON_HOME%\repository\conf\user-mgt.xml" -Djava.util.logging.config.file="%CARBON_HOME%\repository\conf\log4j.properties" -
 Dcarbon.config.dir.path="%CARBON_HOME%\repository\conf" -Djndi.properties.dir="%CARBON_HOME%/repository/conf" -Dconf.location="%CARBON_HOME%\repository\conf" -Dcarbon.logs.path="%CARBON_HOME%\repository\logs" -Dcomponents.repo="%CARBON_HOME%\repository\components" -Dcom.atomikos.icatch.file="%CARBON_HOME%\lib\transactions.properties" -Dcom.atomikos.icatch.hide_init_file_path="true" -Dorg.apache.jasper.runtime.BodyContentImpl.LIMIT_BUFFER=true -Dcom.sun.jndi.ldap.connect.pool.authentication=simple -Dcom.sun.jndi.ldap.connect.pool.timeout=3000 -Dorg.terracotta.quartz.skipUpdateCheck=true -Dcarbon.classpath=%CARBON_CLASSPATH% -Dfile.encoding=UTF8 -Djavax.net.ssl.trustStore=$CARBON_HOME/repository/resources/security/client-truststore.jks -Djavax.net.ssl.trustStorePassword=wso2carbon -Dthrift.receiver.ip=localhost -Dthrift.receiver.port=7615 -Dstratos.messaging.topology.service.filter="" -Dload.balancer.stats.publisher.enabled="false"
+set CMD_LINE_ARGS=-Xbootclasspath/a:%CARBON_XBOOTCLASSPATH% -Xms1500m -Xmx3000m -XX:PermSize=256m -XX:MaxPermSize=512m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:-UseGCOverheadLimit -XX:+CMSClassUnloadingEnabled -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%CARBON_HOME%\repository\logs\heap-dump.hprof -Dcom.sun.management.jmxremote -classpath %CARBON_CLASSPATH% %JAVA_OPTS% -Djava.endorsed.dirs=%JAVA_ENDORSED% -Dcarbon.registry.root=/ -Dcarbon.home="%CARBON_HOME%" -Dloadbalancer.conf="file:///%CARBON_HOME%/repository/conf/loadbalancer.conf" -Dwso2.server.standalone=true -Djava.command="%JAVA_HOME%\bin\java" -Djava.opts="%JAVA_OPTS%" -Djava.io.tmpdir="%CARBON_HOME%\tmp" -Dcatalina.base="%CARBON_HOME%\lib\tomcat" -Dwso2.carbon.xml=%CARBON_HOME%\repository\conf\carbon.xml -Dwso2.registry.xml="%CARBON_HOME%\repository\conf\registry.xml" -Dwso2.user.mgt.xml="%CARBON_HOME%\repository\conf\user-mgt.xml" -Djava.util.logging.config.file="%CARBON_HOME%\repository\conf\log4j.properties" -
 Dcarbon.config.dir.path="%CARBON_HOME%\repository\conf" -Djndi.properties.dir="%CARBON_HOME%/repository/conf" -Dconf.location="%CARBON_HOME%\repository\conf" -Dcarbon.logs.path="%CARBON_HOME%\repository\logs" -Dcomponents.repo="%CARBON_HOME%\repository\components" -Dcom.atomikos.icatch.file="%CARBON_HOME%\lib\transactions.properties" -Dcom.atomikos.icatch.hide_init_file_path="true" -Dorg.apache.jasper.runtime.BodyContentImpl.LIMIT_BUFFER=true -Dcom.sun.jndi.ldap.connect.pool.authentication=simple -Dcom.sun.jndi.ldap.connect.pool.timeout=3000 -Dorg.terracotta.quartz.skipUpdateCheck=true -Dcarbon.classpath=%CARBON_CLASSPATH% -Dfile.encoding=UTF8 -Djavax.net.ssl.trustStore=$CARBON_HOME/repository/resources/security/client-truststore.jks -Djavax.net.ssl.trustStorePassword=wso2carbon -Dthrift.receiver.ip=localhost -Dthrift.receiver.port=7615 -Dstratos.messaging.topology.service.filter="" -Dstratos.messaging.topology.cluster.filter="" -Dload.balancer.stats.publisher.enabled="false"
 
 :runJava
 echo JAVA_HOME environment variable is set to %JAVA_HOME%

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e268ff88/products/load-balancer/modules/distribution/src/main/bin/stratos.sh
----------------------------------------------------------------------
diff --git a/products/load-balancer/modules/distribution/src/main/bin/stratos.sh b/products/load-balancer/modules/distribution/src/main/bin/stratos.sh
index d4c53bb..e6da246 100644
--- a/products/load-balancer/modules/distribution/src/main/bin/stratos.sh
+++ b/products/load-balancer/modules/distribution/src/main/bin/stratos.sh
@@ -299,6 +299,7 @@ exec "$JAVACMD" \
         -Dthrift.receiver.ip=localhost \
         -Dthrift.receiver.port=7615 \
         -Dstratos.messaging.topology.service.filter="" \
+        -Dstratos.messaging.topology.cluster.filter="" \
         -Dload.balancer.stats.publisher.enabled="false" \
         org.wso2.carbon.bootstrap.Bootstrap $*