You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2014/03/29 21:03:47 UTC

git commit: complete event listeners should be notified even though the topology is initialized, since there're other topology models build upon the main topology

Repository: incubator-stratos
Updated Branches:
  refs/heads/master 61a04ed8f -> a88e909c1


complete event listeners should be notified even though the topology is initialized, since there're other topology models build upon the main topology


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/a88e909c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/a88e909c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/a88e909c

Branch: refs/heads/master
Commit: a88e909c16d145c21a3371ca56ecf368a3e60242
Parents: 61a04ed
Author: Nirmal Fernando <ni...@apache.org>
Authored: Sun Mar 30 01:31:56 2014 +0530
Committer: Nirmal Fernando <ni...@apache.org>
Committed: Sun Mar 30 01:31:56 2014 +0530

----------------------------------------------------------------------
 .../stratos/autoscaler/AutoscalerContext.java   |   8 +-
 .../health/AutoscalerHealthStatReceiver.java    |  44 +++---
 .../topology/AutoscalerTopologyReceiver.java    |  42 +++---
 .../agent/util/CartridgeAgentUtils.java         |   2 +-
 .../model/TopologyClusterInformationModel.java  |   9 ++
 .../StratosManagerTopologyReceiver.java         |  36 +++--
 .../CompleteTopologyMessageProcessor.java       | 138 ++++++++++---------
 7 files changed, 157 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a88e909c/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
index d04bdcc..4c78dbb 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
@@ -46,11 +46,11 @@ public class AutoscalerContext {
         return monitors.get(clusterId);
     }
     
-    public boolean moniterExist(String clusterId) {
+    public boolean monitorExist(String clusterId) {
         return monitors.containsKey(clusterId);
     }
     
-    public boolean lbMoniterExist(String clusterId) {
+    public boolean lbMonitorExist(String clusterId) {
         return lbMonitors.containsKey(clusterId);
     }
     
@@ -59,7 +59,7 @@ public class AutoscalerContext {
     }
 
     public ClusterMonitor removeMonitor(String clusterId) {
-    	if(!moniterExist(clusterId)) {
+    	if(!monitorExist(clusterId)) {
     		log.fatal("Cluster monitor not found for cluster id: "+clusterId);
     		return null;
     	}
@@ -67,7 +67,7 @@ public class AutoscalerContext {
         return monitors.remove(clusterId);
     }
     public LbClusterMonitor removeLbMonitor(String clusterId) {
-    	if(!lbMoniterExist(clusterId)) {
+    	if(!lbMonitorExist(clusterId)) {
     		log.fatal("LB monitor not found for cluster id: "+clusterId);
     		return null;
     	}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a88e909c/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
index 357cd8c..1845c99 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
@@ -106,9 +106,9 @@ public class AutoscalerHealthStatReceiver implements Runnable {
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractMonitor monitor;
 
-                if(asCtx.moniterExist(clusterId)){
+                if(asCtx.monitorExist(clusterId)){
                     monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMoniterExist(clusterId)){
+                }else if(asCtx.lbMonitorExist(clusterId)){
                     monitor = asCtx.getLBMonitor(clusterId);
                 }else{
                     if(log.isDebugEnabled()){
@@ -148,9 +148,9 @@ public class AutoscalerHealthStatReceiver implements Runnable {
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractMonitor monitor;
 
-                if(asCtx.moniterExist(clusterId)){
+                if(asCtx.monitorExist(clusterId)){
                     monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMoniterExist(clusterId)){
+                }else if(asCtx.lbMonitorExist(clusterId)){
                     monitor = asCtx.getLBMonitor(clusterId);
                 }else{
                     if(log.isDebugEnabled()){
@@ -190,9 +190,9 @@ public class AutoscalerHealthStatReceiver implements Runnable {
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractMonitor monitor;
 
-                if(asCtx.moniterExist(clusterId)){
+                if(asCtx.monitorExist(clusterId)){
                     monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMoniterExist(clusterId)){
+                }else if(asCtx.lbMonitorExist(clusterId)){
                     monitor = asCtx.getLBMonitor(clusterId);
                 }else{
                     if(log.isDebugEnabled()){
@@ -230,9 +230,9 @@ public class AutoscalerHealthStatReceiver implements Runnable {
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractMonitor monitor;
 
-                if(asCtx.moniterExist(clusterId)){
+                if(asCtx.monitorExist(clusterId)){
                     monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMoniterExist(clusterId)){
+                }else if(asCtx.lbMonitorExist(clusterId)){
                     monitor = asCtx.getLBMonitor(clusterId);
                 }else{
                     if(log.isDebugEnabled()){
@@ -271,9 +271,9 @@ public class AutoscalerHealthStatReceiver implements Runnable {
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractMonitor monitor;
 
-                if(asCtx.moniterExist(clusterId)){
+                if(asCtx.monitorExist(clusterId)){
                     monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMoniterExist(clusterId)){
+                }else if(asCtx.lbMonitorExist(clusterId)){
                     monitor = asCtx.getLBMonitor(clusterId);
                 }else{
                     if(log.isDebugEnabled()){
@@ -311,9 +311,9 @@ public class AutoscalerHealthStatReceiver implements Runnable {
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractMonitor monitor;
 
-                if(asCtx.moniterExist(clusterId)){
+                if(asCtx.monitorExist(clusterId)){
                     monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMoniterExist(clusterId)){
+                }else if(asCtx.lbMonitorExist(clusterId)){
                     monitor = asCtx.getLBMonitor(clusterId);
                 }else{
                     if(log.isDebugEnabled()){
@@ -473,9 +473,9 @@ public class AutoscalerHealthStatReceiver implements Runnable {
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractMonitor monitor;
 
-                if(asCtx.moniterExist(clusterId)){
+                if(asCtx.monitorExist(clusterId)){
                     monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMoniterExist(clusterId)){
+                }else if(asCtx.lbMonitorExist(clusterId)){
                     monitor = asCtx.getLBMonitor(clusterId);
                 }else{
                     if(log.isDebugEnabled()){
@@ -514,9 +514,9 @@ public class AutoscalerHealthStatReceiver implements Runnable {
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractMonitor monitor;
 
-                if(asCtx.moniterExist(clusterId)){
+                if(asCtx.monitorExist(clusterId)){
                     monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMoniterExist(clusterId)){
+                }else if(asCtx.lbMonitorExist(clusterId)){
                     monitor = asCtx.getLBMonitor(clusterId);
                 }else{
                     if(log.isDebugEnabled()){
@@ -554,9 +554,9 @@ public class AutoscalerHealthStatReceiver implements Runnable {
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractMonitor monitor;
 
-                if(asCtx.moniterExist(clusterId)){
+                if(asCtx.monitorExist(clusterId)){
                     monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMoniterExist(clusterId)){
+                }else if(asCtx.lbMonitorExist(clusterId)){
                     monitor = asCtx.getLBMonitor(clusterId);
                 }else{
                     if(log.isDebugEnabled()){
@@ -598,9 +598,9 @@ public class AutoscalerHealthStatReceiver implements Runnable {
         AutoscalerContext asCtx = AutoscalerContext.getInstance();
         AbstractMonitor monitor;
 
-        if(asCtx.moniterExist(clusterId)){
+        if(asCtx.monitorExist(clusterId)){
             monitor = asCtx.getMonitor(clusterId);
-        }else if(asCtx.lbMoniterExist(clusterId)){
+        }else if(asCtx.lbMonitorExist(clusterId)){
             monitor = asCtx.getLBMonitor(clusterId);
         }else{
             if(log.isDebugEnabled()){
@@ -707,9 +707,9 @@ public class AutoscalerHealthStatReceiver implements Runnable {
         	AutoscalerContext asCtx = AutoscalerContext.getInstance();
         	AbstractMonitor monitor;
         	
-        	if(asCtx.moniterExist(clusterId)){
+        	if(asCtx.monitorExist(clusterId)){
         		monitor = asCtx.getMonitor(clusterId);
-        	}else if(asCtx.lbMoniterExist(clusterId)){
+        	}else if(asCtx.lbMonitorExist(clusterId)){
         		monitor = asCtx.getLBMonitor(clusterId);
         	}else{
                 if(log.isDebugEnabled()){

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a88e909c/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
index e000777..2062268 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
@@ -100,20 +100,28 @@ public class AutoscalerTopologyReceiver implements Runnable {
             try {
                 TopologyManager.acquireReadLock();
                 for(Service service : TopologyManager.getTopology().getServices()) {
-                    for(Cluster cluster : service.getClusters()) {
-                        Thread th;
-                        if(cluster.isLbCluster()){
-                            th = new Thread(new LBClusterMonitorAdder(cluster));
-                        }else{
-                            th = new Thread(new ClusterMonitorAdder(cluster));
-                        }
-
-                        th.start();
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Cluster monitor thread has been started successfully: [cluster] %s "
-                                    , cluster.getClusterId()));
-                        }
-                    }
+						for (Cluster cluster : service.getClusters()) {
+							Thread th = null;
+							if (cluster.isLbCluster()
+									&& !AutoscalerContext.getInstance()
+											.lbMonitorExist(
+													cluster.getClusterId())) {
+								th = new Thread(new LBClusterMonitorAdder(
+										cluster));
+							} else if (!AutoscalerContext.getInstance()
+									.monitorExist(cluster.getClusterId())) {
+								th = new Thread(
+										new ClusterMonitorAdder(cluster));
+							}
+							if (th != null) {
+								th.start();
+								if (log.isDebugEnabled()) {
+									log.debug(String
+											.format("Cluster monitor thread has been started successfully: [cluster] %s ",
+													cluster.getClusterId()));
+								}
+							}
+						}
                 }
             }
             finally {
@@ -223,7 +231,7 @@ public class AutoscalerTopologyReceiver implements Runnable {
                 String partitionId = e.getPartitionId();
                 AbstractMonitor monitor;
 
-                if(AutoscalerContext.getInstance().moniterExist(clusterId)){
+                if(AutoscalerContext.getInstance().monitorExist(clusterId)){
                     monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
                 } else {
                     //This is LB member
@@ -279,7 +287,7 @@ public class AutoscalerTopologyReceiver implements Runnable {
                 String clusterId = e.getClusterId();
                 AbstractMonitor monitor;
 
-                if(AutoscalerContext.getInstance().moniterExist(clusterId)) {
+                if(AutoscalerContext.getInstance().monitorExist(clusterId)) {
                     monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
                     partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
                 } else {
@@ -317,7 +325,7 @@ public class AutoscalerTopologyReceiver implements Runnable {
                 String clusterId = e.getClusterId();
                 AbstractMonitor monitor;
 
-                if(AutoscalerContext.getInstance().moniterExist(clusterId)) {
+                if(AutoscalerContext.getInstance().monitorExist(clusterId)) {
                     monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
                     partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a88e909c/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
index 8c2aca9..8abb285 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
@@ -94,7 +94,7 @@ public class CartridgeAgentUtils {
         boolean active = false;
         while (!active) {
             if(log.isInfoEnabled()) {
-                log.info("Waiting for ports to be active");
+                log.info("Waiting for ports to be active: [IP] "+ipAddress+" [Ports] "+ports);
             }
             active = checkPortsActive(ipAddress,  ports);
             long endTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a88e909c/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/model/TopologyClusterInformationModel.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/model/TopologyClusterInformationModel.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/model/TopologyClusterInformationModel.java
index 8d44ac4..543643c 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/model/TopologyClusterInformationModel.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/model/TopologyClusterInformationModel.java
@@ -40,6 +40,7 @@ public class TopologyClusterInformationModel {
     private static TopologyClusterInformationModel topologyClusterInformationModel;
     private Map<String, Cluster> clusterIdToClusterMap;
     private DataInsertionAndRetrievalManager dataInsertionNRetrievalMgr;
+    private boolean initialized;
 
     //locks
     private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -168,6 +169,14 @@ public class TopologyClusterInformationModel {
     	clusterIdToClusterMap.remove(clusterId);
     }
 
+	public boolean isInitialized() {
+		return initialized;
+	}
+
+	public void setInitialized(boolean initialized) {
+		this.initialized = initialized;
+	}
+
 //    private class CartridgeTypeContext {
 //
 //        private String type;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a88e909c/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java
index 75f9752..46b3313 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java
@@ -72,17 +72,25 @@ public class StratosManagerTopologyReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
-                log.info("********** [CompleteTopologyEventListener] Received: " + event.getClass() + " **********");
+            	if (TopologyClusterInformationModel.getInstance().isInitialized()) {
+            		return;
+            	}
+            	
+                log.info("[CompleteTopologyEventListener] Received: " + event.getClass());
 
                 try {
                     TopologyManager.acquireReadLock();
 
-                    for (Service service : TopologyManager.getTopology().getServices()) {
-                        //iterate through all clusters
-                        for (Cluster cluster : service.getClusters()) {
-                              TopologyClusterInformationModel.getInstance().addCluster(cluster);
-                            }
-                        }
+					for (Service service : TopologyManager.getTopology()
+							.getServices()) {
+						// iterate through all clusters
+						for (Cluster cluster : service.getClusters()) {
+							TopologyClusterInformationModel.getInstance()
+									.addCluster(cluster);
+						}
+					}
+					
+					TopologyClusterInformationModel.getInstance().setInitialized(true);
                 
                 } finally {
                     TopologyManager.releaseReadLock();
@@ -95,7 +103,7 @@ public class StratosManagerTopologyReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
-                log.info("********** [ClusterCreatedEventListener] Received: " + event.getClass() + " **********");
+                log.info("[ClusterCreatedEventListener] Received: " + event.getClass());
 
                 ClusterCreatedEvent clustercreatedEvent = (ClusterCreatedEvent) event;
 
@@ -121,7 +129,7 @@ public class StratosManagerTopologyReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
-                log.info("********** [ClusterRemovedEventListener] Received: " + event.getClass() + " **********");
+                log.info("[ClusterRemovedEventListener] Received: " + event.getClass());
 
                 ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
                 TopologyClusterInformationModel.getInstance().removeCluster(clusterRemovedEvent.getClusterId());
@@ -135,7 +143,7 @@ public class StratosManagerTopologyReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
-                log.info("********** [InstanceSpawnedEventListener] Received: " + event.getClass() + " **********");
+                log.info("[InstanceSpawnedEventListener] Received: " + event.getClass());
 
                 InstanceSpawnedEvent instanceSpawnedEvent = (InstanceSpawnedEvent) event;
 
@@ -160,7 +168,7 @@ public class StratosManagerTopologyReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
-                log.info("********** [MemberStartedEventListener] Received: " + event.getClass() + " **********");
+                log.info("[MemberStartedEventListener] Received: " + event.getClass());
 
                 MemberStartedEvent memberStartedEvent = (MemberStartedEvent) event;
 
@@ -186,7 +194,7 @@ public class StratosManagerTopologyReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
-                log.info("********** [MemberActivatedEventListener] Received: " + event.getClass() + " **********");
+                log.info("[MemberActivatedEventListener] Received: " + event.getClass());
 
                 MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
 
@@ -211,7 +219,7 @@ public class StratosManagerTopologyReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
-                log.info("********** [MemberSuspendedEventListener] Received: " + event.getClass() + " **********");
+                log.info("[MemberSuspendedEventListener] Received: " + event.getClass());
 
                 MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event;
 
@@ -237,7 +245,7 @@ public class StratosManagerTopologyReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
-                log.info("********** [MemberTerminatedEventListener] Received: " + event.getClass() + " **********");
+                log.info("[MemberTerminatedEventListener] Received: " + event.getClass());
 
                 MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a88e909c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
index 3ca0370..029cdae 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -49,75 +49,85 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
         Topology topology = (Topology) object;
 
         if (CompleteTopologyEvent.class.getName().equals(type)) {
-            // Return if topology has already initialized
-            if (topology.isInitialized()) {
-                return false;
-            }
-
-            // Parse complete message and build event
-            CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
+        	// Parse complete message and build event
+        	CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
+        	
+            // if topology has not already initialized
+			if (!topology.isInitialized()) {
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                // Add services included in service filter
-                for (Service service : event.getTopology().getServices()) {
-                    if (TopologyServiceFilter.getInstance().serviceNameIncluded(service.getServiceName())) {
-                        topology.addService(service);
-                    } else {
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("Service is excluded: [service] %s", service.getServiceName()));
-                        }
-                    }
-                }
-            } else {
-                // Add all services
-                topology.addServices(event.getTopology().getServices());
-            }
+				// Apply service filter
+				if (TopologyServiceFilter.getInstance().isActive()) {
+					// Add services included in service filter
+					for (Service service : event.getTopology().getServices()) {
+						if (TopologyServiceFilter.getInstance()
+								.serviceNameIncluded(service.getServiceName())) {
+							topology.addService(service);
+						} else {
+							if (log.isDebugEnabled()) {
+								log.debug(String.format(
+										"Service is excluded: [service] %s",
+										service.getServiceName()));
+							}
+						}
+					}
+				} else {
+					// Add all services
+					topology.addServices(event.getTopology().getServices());
+				}
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                for (Service service : topology.getServices()) {
-                    List<Cluster> clustersToRemove = new ArrayList<Cluster>();
-                    for (Cluster cluster : service.getClusters()) {
-                        if (TopologyClusterFilter.getInstance().clusterIdExcluded(cluster.getClusterId())) {
-                            clustersToRemove.add(cluster);
-                        }
-                    }
-                    for(Cluster cluster : clustersToRemove) {
-                        service.removeCluster(cluster);
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("Cluster is excluded: [cluster] %s", cluster.getClusterId()));
-                        }
-                    }
-                }
-            }
+				// Apply cluster filter
+				if (TopologyClusterFilter.getInstance().isActive()) {
+					for (Service service : topology.getServices()) {
+						List<Cluster> clustersToRemove = new ArrayList<Cluster>();
+						for (Cluster cluster : service.getClusters()) {
+							if (TopologyClusterFilter.getInstance()
+									.clusterIdExcluded(cluster.getClusterId())) {
+								clustersToRemove.add(cluster);
+							}
+						}
+						for (Cluster cluster : clustersToRemove) {
+							service.removeCluster(cluster);
+							if (log.isDebugEnabled()) {
+								log.debug(String.format(
+										"Cluster is excluded: [cluster] %s",
+										cluster.getClusterId()));
+							}
+						}
+					}
+				}
 
-            // Apply member filter
-            if (TopologyMemberFilter.getInstance().isActive()) {
-                for (Service service : topology.getServices()) {
-                    for (Cluster cluster : service.getClusters()) {
-                        List<Member> membersToRemove = new ArrayList<Member>();
-                        for(Member member : cluster.getMembers()) {
-                            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                                membersToRemove.add(member);
-                            }
-                        }
-                        for(Member member : membersToRemove) {
-                            cluster.removeMember(member);
-                            if (log.isDebugEnabled()) {
-                                log.debug(String.format("Member is excluded: [member] %s [lb-cluster-id] %s", member.getMemberId(), member.getLbClusterId()));
-                            }
-                        }
-                    }
-                }
-            }
+				// Apply member filter
+				if (TopologyMemberFilter.getInstance().isActive()) {
+					for (Service service : topology.getServices()) {
+						for (Cluster cluster : service.getClusters()) {
+							List<Member> membersToRemove = new ArrayList<Member>();
+							for (Member member : cluster.getMembers()) {
+								if (TopologyMemberFilter.getInstance()
+										.lbClusterIdExcluded(
+												member.getLbClusterId())) {
+									membersToRemove.add(member);
+								}
+							}
+							for (Member member : membersToRemove) {
+								cluster.removeMember(member);
+								if (log.isDebugEnabled()) {
+									log.debug(String
+											.format("Member is excluded: [member] %s [lb-cluster-id] %s",
+													member.getMemberId(),
+													member.getLbClusterId()));
+								}
+							}
+						}
+					}
+				}
 
-            if (log.isInfoEnabled()) {
-                log.info("Topology initialized");
-            }
+				if (log.isInfoEnabled()) {
+					log.info("Topology initialized");
+				}
 
-            // Set topology initialized
-            topology.setInitialized(true);
+				// Set topology initialized
+				topology.setInitialized(true);
+			}
 
             // Notify event listeners
             notifyEventListeners(event);