You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/04/17 16:24:59 UTC

git commit: Added catch clauses to event listeners

Repository: incubator-stratos
Updated Branches:
  refs/heads/master aba72724a -> 7744e6dec


Added catch clauses to event listeners


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7744e6de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7744e6de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7744e6de

Branch: refs/heads/master
Commit: 7744e6deca47083d85c2e4c0b7a026c3aa898aeb
Parents: aba7272
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Apr 17 19:54:50 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Apr 17 19:54:50 2014 +0530

----------------------------------------------------------------------
 .../topology/AutoscalerTopologyReceiver.java    | 532 ++++++++++---------
 .../balancer/LoadBalancerTopologyReceiver.java  |  16 +-
 2 files changed, 283 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7744e6de/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
index 9120e20..1c4a178 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
@@ -57,7 +57,7 @@ public class AutoscalerTopologyReceiver implements Runnable {
     private boolean terminated;
 
     public AutoscalerTopologyReceiver() {
-		this.topologyReceiver = new TopologyReceiver(createMessageDelegator(), new TopologyEventMessageListener());
+        this.topologyReceiver = new TopologyReceiver(createMessageDelegator(), new TopologyEventMessageListener());
     }
 
     @Override
@@ -69,18 +69,18 @@ public class AutoscalerTopologyReceiver implements Runnable {
         }
         Thread thread = new Thread(topologyReceiver);
         thread.start();
-        if(log.isInfoEnabled()) {
+        if (log.isInfoEnabled()) {
             log.info("Autoscaler topology receiver thread started");
         }
 
         // Keep the thread live until terminated
         while (!terminated) {
-        	try {
+            try {
                 Thread.sleep(1000);
             } catch (InterruptedException ignore) {
             }
         }
-        if(log.isInfoEnabled()) {
+        if (log.isInfoEnabled()) {
             log.info("Autoscaler topology receiver thread terminated");
         }
     }
@@ -97,18 +97,19 @@ public class AutoscalerTopologyReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
-            try {
-                TopologyManager.acquireReadLock();
-                for(Service service : TopologyManager.getTopology().getServices()) {
-						for (Cluster cluster : service.getClusters()) {
-							startClusterMonitor(cluster);
-						}
+                try {
+                    TopologyManager.acquireReadLock();
+                    for (Service service : TopologyManager.getTopology().getServices()) {
+                        for (Cluster cluster : service.getClusters()) {
+                            startClusterMonitor(cluster);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
+                } finally {
+                    TopologyManager.releaseReadLock();
                 }
             }
-            finally {
-                TopologyManager.releaseReadLock();
-            }
-            }
 
 
         });
@@ -116,171 +117,178 @@ public class AutoscalerTopologyReceiver implements Runnable {
         processorChain.addEventListener(new ClusterCreatedEventListener() {
             @Override
             protected void onEvent(Event event) {
-            try {
-            	log.info("Event received: "+event);
-                ClusterCreatedEvent e = (ClusterCreatedEvent) event;
-                TopologyManager.acquireReadLock();
-                Service service = TopologyManager.getTopology().getService(e.getServiceName());
-                Cluster cluster = service.getCluster(e.getClusterId());
-                startClusterMonitor(cluster);
-            } finally {
-                TopologyManager.releaseReadLock();
+                try {
+                    log.info("Event received: " + event);
+                    ClusterCreatedEvent e = (ClusterCreatedEvent) event;
+                    TopologyManager.acquireReadLock();
+                    Service service = TopologyManager.getTopology().getService(e.getServiceName());
+                    Cluster cluster = service.getCluster(e.getClusterId());
+                    startClusterMonitor(cluster);
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
             }
+
+        });
+
+        processorChain.addEventListener(new ClusterRemovedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    ClusterRemovedEvent e = (ClusterRemovedEvent) event;
+                    TopologyManager.acquireReadLock();
+
+                    String serviceName = e.getServiceName();
+                    String clusterId = e.getClusterId();
+                    String deploymentPolicy = e.getDeploymentPolicy();
+
+                    AbstractMonitor monitor;
+
+                    if (e.isLbCluster()) {
+                        DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
+                        if (depPolicy != null) {
+                            List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
+                                    .getNetworkPartitionLbHolders(depPolicy);
+
+                            for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
+                                // removes lb cluster ids
+                                boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
+                                if (isRemoved) {
+                                    log.info("Removed the lb cluster [id]:"
+                                            + clusterId
+                                            + " reference from Network Partition [id]: "
+                                            + networkPartitionLbHolder
+                                            .getNetworkPartitionId());
+
+                                }
+                                if (log.isDebugEnabled()) {
+                                    log.debug(networkPartitionLbHolder);
+                                }
+
+                            }
+                        }
+                        monitor = AutoscalerContext.getInstance()
+                                .removeLbMonitor(clusterId);
+
+                    } else {
+                        monitor = AutoscalerContext.getInstance()
+                                .removeMonitor(clusterId);
+                    }
+
+                    // runTerminateAllRule(monitor);
+                    if (monitor != null) {
+                        monitor.destroy();
+                        log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
+                                clusterId));
+                    }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
             }
 
         });
-        
-		processorChain.addEventListener(new ClusterRemovedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				try {
-					ClusterRemovedEvent e = (ClusterRemovedEvent) event;
-					TopologyManager.acquireReadLock();
-					
-					String serviceName = e.getServiceName();
-					String clusterId = e.getClusterId();
-					String deploymentPolicy = e.getDeploymentPolicy();
-					
-					AbstractMonitor monitor;
-
-					if (e.isLbCluster()) {
-						DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
-						if (depPolicy != null) {
-							List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
-									.getNetworkPartitionLbHolders(depPolicy);
-							
-							for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
-								// removes lb cluster ids
-								boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
-								if (isRemoved) {
-									log.info("Removed the lb cluster [id]:"
-											+ clusterId
-											+ " reference from Network Partition [id]: "
-											+ networkPartitionLbHolder
-													.getNetworkPartitionId());
-
-								}
-								if (log.isDebugEnabled()) {
-									log.debug(networkPartitionLbHolder);
-								}
-								
-							}
-						}
-						monitor = AutoscalerContext.getInstance()
-								.removeLbMonitor(clusterId);
-
-					} else {
-						monitor = AutoscalerContext.getInstance()
-								.removeMonitor(clusterId);
-					}
-
-					// runTerminateAllRule(monitor);
-					if (monitor != null) {
-						monitor.destroy();
-						log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
-										clusterId));
-					} 
-				} finally {
-					TopologyManager.releaseReadLock();
-				}
-			}
-
-		});
-        
+
         processorChain.addEventListener(new MemberStartedEventListener() {
             @Override
             protected void onEvent(Event event) {
-            		
+
             }
 
         });
-        
+
         processorChain.addEventListener(new MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
-             
-            try {
-                TopologyManager.acquireReadLock();
-                MemberTerminatedEvent e = (MemberTerminatedEvent) event;
-                String networkPartitionId = e.getNetworkPartitionId();
-                String clusterId = e.getClusterId();
-                String partitionId = e.getPartitionId();
-                AbstractMonitor monitor;
-
-                if(AutoscalerContext.getInstance().monitorExist(clusterId)){
-                    monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
-                } else {
-                    //This is LB member
-                    monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
-                }
-
-                NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
 
-                PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
-                String memberId = e.getMemberId();
-				partitionContext.removeMemberStatsContext(memberId);
+                try {
+                    TopologyManager.acquireReadLock();
+                    MemberTerminatedEvent e = (MemberTerminatedEvent) event;
+                    String networkPartitionId = e.getNetworkPartitionId();
+                    String clusterId = e.getClusterId();
+                    String partitionId = e.getPartitionId();
+                    AbstractMonitor monitor;
+
+                    if (AutoscalerContext.getInstance().monitorExist(clusterId)) {
+                        monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+                    } else {
+                        //This is LB member
+                        monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
+                    }
 
-                if(partitionContext.removeTerminationPendingMember(memberId)){
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("Member is removed from termination pending members list: [member] %s", memberId));
+                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+
+                    PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
+                    String memberId = e.getMemberId();
+                    partitionContext.removeMemberStatsContext(memberId);
+
+                    if (partitionContext.removeTerminationPendingMember(memberId)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Member is removed from termination pending members list: [member] %s", memberId));
+                        }
+                    } else if (partitionContext.removePendingMember(memberId)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Member is removed from pending members list: [member] %s", memberId));
+                        }
+                    } else if (partitionContext.removeActiveMemberById(memberId)) {
+                        log.warn(String.format("Member is in the wrong list and it is removed from active members list", memberId));
+                    } else {
+                        log.warn(String.format("Member is not available in any of the list active, pending and termination pending", memberId));
                     }
-                } else if(partitionContext.removePendingMember(memberId)) {
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("Member is removed from pending members list: [member] %s", memberId));
+
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format("Member stat context has been removed successfully: [member] %s", memberId));
                     }
-                } else if(partitionContext.removeActiveMemberById(memberId)){
-                    log.warn(String.format("Member is in the wrong list and it is removed from active members list", memberId));
-                } else {
-                    log.warn(String.format("Member is not available in any of the list active, pending and termination pending", memberId));
-                }
-                
-                if(log.isInfoEnabled()){
-                    log.info(String.format("Member stat context has been removed successfully: [member] %s", memberId));
-                }
 //                partitionContext.decrementCurrentActiveMemberCount(1);
 
 
-            } finally {
-                TopologyManager.releaseReadLock();
-            }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
             }
 
         });
-        
+
         processorChain.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
 
-            try {
-                TopologyManager.acquireReadLock();
-
-                MemberActivatedEvent e = (MemberActivatedEvent)event;
-                String memberId = e.getMemberId();
-                String partitionId = e.getPartitionId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                PartitionContext partitionContext;
-                String clusterId = e.getClusterId();
-                AbstractMonitor monitor;
-
-                if(AutoscalerContext.getInstance().monitorExist(clusterId)) {
-                    monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
-                    partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-                } else {
-                    monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
-                    partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-                }
-                partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                if(log.isInfoEnabled()){
-                    log.info(String.format("Member stat context has been added successfully: [member] %s", memberId));
-                }
+                try {
+                    TopologyManager.acquireReadLock();
+
+                    MemberActivatedEvent e = (MemberActivatedEvent) event;
+                    String memberId = e.getMemberId();
+                    String partitionId = e.getPartitionId();
+                    String networkPartitionId = e.getNetworkPartitionId();
+
+                    PartitionContext partitionContext;
+                    String clusterId = e.getClusterId();
+                    AbstractMonitor monitor;
+
+                    if (AutoscalerContext.getInstance().monitorExist(clusterId)) {
+                        monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+                        partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+                    } else {
+                        monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
+                        partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+                    }
+                    partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format("Member stat context has been added successfully: [member] %s", memberId));
+                    }
 //                partitionContext.incrementCurrentActiveMemberCount(1);
-                partitionContext.movePendingMemberToActiveMembers(memberId);
+                    partitionContext.movePendingMemberToActiveMembers(memberId);
 
-            }
-            finally{
-                TopologyManager.releaseReadLock();
-            }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
             }
         });
 
@@ -289,38 +297,39 @@ public class AutoscalerTopologyReceiver implements Runnable {
             @Override
             protected void onEvent(Event event) {
 
-            try {
-                TopologyManager.acquireReadLock();
-
-                MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent)event;
-                String memberId = e.getMemberId();
-                String partitionId = e.getPartitionId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                PartitionContext partitionContext;
-                String clusterId = e.getClusterId();
-                AbstractMonitor monitor;
-
-                if(AutoscalerContext.getInstance().monitorExist(clusterId)) {
-                    monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
-                    partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-                } else {
-                    monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
-                    partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-                }
-                partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                if(log.isDebugEnabled()){
-                    log.debug(String.format("Member has been moved as pending termination: [member] %s", memberId));
-                }
-                partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+                try {
+                    TopologyManager.acquireReadLock();
 
-            }
-            finally{
-                TopologyManager.releaseReadLock();
-            }
+                    MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent) event;
+                    String memberId = e.getMemberId();
+                    String partitionId = e.getPartitionId();
+                    String networkPartitionId = e.getNetworkPartitionId();
+
+                    PartitionContext partitionContext;
+                    String clusterId = e.getClusterId();
+                    AbstractMonitor monitor;
+
+                    if (AutoscalerContext.getInstance().monitorExist(clusterId)) {
+                        monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+                        partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+                    } else {
+                        monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
+                        partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+                    }
+                    partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member has been moved as pending termination: [member] %s", memberId));
+                    }
+                    partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
             }
         });
-        
+
         processorChain.addEventListener(new ServiceRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
@@ -342,10 +351,10 @@ public class AutoscalerTopologyReceiver implements Runnable {
         });
         return processorChain;
     }
-    
+
     private class LBClusterMonitorAdder implements Runnable {
         private Cluster cluster;
-        
+
         public LBClusterMonitorAdder(Cluster cluster) {
             this.cluster = cluster;
         }
@@ -355,46 +364,46 @@ public class AutoscalerTopologyReceiver implements Runnable {
             int retries = 5;
             boolean success = false;
             do {
-            	try {
-					Thread.sleep(5000);
-				} catch (InterruptedException e1) {
-				}
-            	try {
-            		monitor = AutoscalerUtil.getLBClusterMonitor(cluster);
-            		success = true;
-            		
-            	} catch (PolicyValidationException e) {
-            		String msg = "LB Cluster monitor creation failed for cluster: "+cluster.getClusterId();
-            		log.debug(msg, e);
-            		retries--;
-            		
-            	} catch(PartitionValidationException e){
-            		String msg = "LB Cluster monitor creation failed for cluster: "+cluster.getClusterId();
-            		log.debug(msg, e);
-            		retries--;
-            	}
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e1) {
+                }
+                try {
+                    monitor = AutoscalerUtil.getLBClusterMonitor(cluster);
+                    success = true;
+
+                } catch (PolicyValidationException e) {
+                    String msg = "LB Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+                    log.debug(msg, e);
+                    retries--;
+
+                } catch (PartitionValidationException e) {
+                    String msg = "LB Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+                    log.debug(msg, e);
+                    retries--;
+                }
             } while (!success && retries <= 0);
-            
+
             if (monitor == null) {
-            	String msg = "LB Cluster monitor creation failed, even after retrying for 5 times, "
-            			+ "for cluster: "+cluster.getClusterId();
-            	log.error(msg);
-            	throw new RuntimeException(msg);
+                String msg = "LB Cluster monitor creation failed, even after retrying for 5 times, "
+                        + "for cluster: " + cluster.getClusterId();
+                log.error(msg);
+                throw new RuntimeException(msg);
             }
 
             Thread th = new Thread(monitor);
             th.start();
             AutoscalerContext.getInstance().addLbMonitor(monitor);
-            if(log.isInfoEnabled()){
+            if (log.isInfoEnabled()) {
                 log.info(String.format("LB Cluster monitor has been added successfully: [cluster] %s",
-                                                        cluster.getClusterId()));
+                        cluster.getClusterId()));
             }
         }
     }
-    
+
     private class ClusterMonitorAdder implements Runnable {
         private Cluster cluster;
-        
+
         public ClusterMonitorAdder(Cluster cluster) {
             this.cluster = cluster;
         }
@@ -404,51 +413,51 @@ public class AutoscalerTopologyReceiver implements Runnable {
             int retries = 5;
             boolean success = false;
             do {
-            	try {
-					Thread.sleep(5000);
-				} catch (InterruptedException e1) {
-				}
-            	
-            	try {
-            		monitor = AutoscalerUtil.getClusterMonitor(cluster);
-            		success = true;
-            		
-            	} catch (PolicyValidationException e) {
-            		String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
-            		log.debug(msg, e);
-            		retries--;
-            		
-            	} catch(PartitionValidationException e){
-            		String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
-            		log.debug(msg, e);
-            		retries--;
-            	}
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e1) {
+                }
+
+                try {
+                    monitor = AutoscalerUtil.getClusterMonitor(cluster);
+                    success = true;
+
+                } catch (PolicyValidationException e) {
+                    String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+                    log.debug(msg, e);
+                    retries--;
+
+                } catch (PartitionValidationException e) {
+                    String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+                    log.debug(msg, e);
+                    retries--;
+                }
             } while (!success && retries != 0);
-            
+
             if (monitor == null) {
-            	String msg = "Cluster monitor creation failed, even after retrying for 5 times, "
-            			+ "for cluster: "+cluster.getClusterId();
-            	log.error(msg);
-            	throw new RuntimeException(msg);
+                String msg = "Cluster monitor creation failed, even after retrying for 5 times, "
+                        + "for cluster: " + cluster.getClusterId();
+                log.error(msg);
+                throw new RuntimeException(msg);
             }
 
             Thread th = new Thread(monitor);
             th.start();
             AutoscalerContext.getInstance().addMonitor(monitor);
-            if(log.isInfoEnabled()){
+            if (log.isInfoEnabled()) {
                 log.info(String.format("Cluster monitor has been added successfully: [cluster] %s",
-                                                        cluster.getClusterId()));
+                        cluster.getClusterId()));
             }
         }
     }
 
-    private void runTerminateAllRule(AbstractMonitor monitor){
+    private void runTerminateAllRule(AbstractMonitor monitor) {
 
         FactHandle terminateAllFactHandle = null;
 
         StatefulKnowledgeSession terminateAllKnowledgeSession = null;
 
-        for(NetworkPartitionContext networkPartitionContext: monitor.getNetworkPartitionCtxts().values()) {
+        for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) {
             terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll(terminateAllKnowledgeSession
                     , terminateAllFactHandle, networkPartitionContext);
         }
@@ -462,31 +471,32 @@ public class AutoscalerTopologyReceiver implements Runnable {
         topologyReceiver.terminate();
         terminated = true;
     }
-    
+
     protected synchronized void startClusterMonitor(Cluster cluster) {
-    	Thread th = null;
-    	if (cluster.isLbCluster()
-    			&& !AutoscalerContext.getInstance()
-    			.lbMonitorExist(
-    					cluster.getClusterId())) {
-    		th = new Thread(new LBClusterMonitorAdder(
-    				cluster));
-    	} else if (!cluster.isLbCluster() && !AutoscalerContext.getInstance()
-    			.monitorExist(cluster.getClusterId())) {
-    		th = new Thread(
-    				new ClusterMonitorAdder(cluster));
-    	}
-    	if (th != null) {
-    		th.start();
-    		try {
-				th.join();
-			} catch (InterruptedException ignore) {}
-    		
-    		if (log.isDebugEnabled()) {
-    			log.debug(String
-    					.format("Cluster monitor thread has been started successfully: [cluster] %s ",
-    							cluster.getClusterId()));
-    		}
-    	}
+        Thread th = null;
+        if (cluster.isLbCluster()
+                && !AutoscalerContext.getInstance()
+                .lbMonitorExist(
+                        cluster.getClusterId())) {
+            th = new Thread(new LBClusterMonitorAdder(
+                    cluster));
+        } else if (!cluster.isLbCluster() && !AutoscalerContext.getInstance()
+                .monitorExist(cluster.getClusterId())) {
+            th = new Thread(
+                    new ClusterMonitorAdder(cluster));
+        }
+        if (th != null) {
+            th.start();
+            try {
+                th.join();
+            } catch (InterruptedException ignore) {
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug(String
+                        .format("Cluster monitor thread has been started successfully: [cluster] %s ",
+                                cluster.getClusterId()));
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7744e6de/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
index 875cad6..c039f1b 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
@@ -65,10 +65,10 @@ public class LoadBalancerTopologyReceiver implements Runnable {
 
         // Keep the thread live until terminated
         while (!terminated) {
-        	try {
-				Thread.sleep(1000);
-			} catch (InterruptedException ignore) {
-			}
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
         }
         if (log.isInfoEnabled()) {
             log.info("Load balancer topology receiver thread terminated");
@@ -99,6 +99,8 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                             }
                         }
                     }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
@@ -145,6 +147,8 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                             log.error(String.format("Service not found in topology: [service] %s", memberActivatedEvent.getServiceName()));
                         }
                     }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
@@ -167,6 +171,8 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                                     clusterRemovedEvent.getServiceName(), clusterRemovedEvent.getClusterId()));
                         }
                     }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
@@ -190,6 +196,8 @@ public class LoadBalancerTopologyReceiver implements Runnable {
                             log.warn(String.format("Service not found in topology: [service] %s", serviceRemovedEvent.getServiceName()));
                         }
                     }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }