You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/04/17 16:24:59 UTC
git commit: Added catch clauses to event listeners
Repository: incubator-stratos
Updated Branches:
refs/heads/master aba72724a -> 7744e6dec
Added catch clauses to event listeners
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7744e6de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7744e6de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7744e6de
Branch: refs/heads/master
Commit: 7744e6deca47083d85c2e4c0b7a026c3aa898aeb
Parents: aba7272
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Apr 17 19:54:50 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Apr 17 19:54:50 2014 +0530
----------------------------------------------------------------------
.../topology/AutoscalerTopologyReceiver.java | 532 ++++++++++---------
.../balancer/LoadBalancerTopologyReceiver.java | 16 +-
2 files changed, 283 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7744e6de/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
index 9120e20..1c4a178 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
@@ -57,7 +57,7 @@ public class AutoscalerTopologyReceiver implements Runnable {
private boolean terminated;
public AutoscalerTopologyReceiver() {
- this.topologyReceiver = new TopologyReceiver(createMessageDelegator(), new TopologyEventMessageListener());
+ this.topologyReceiver = new TopologyReceiver(createMessageDelegator(), new TopologyEventMessageListener());
}
@Override
@@ -69,18 +69,18 @@ public class AutoscalerTopologyReceiver implements Runnable {
}
Thread thread = new Thread(topologyReceiver);
thread.start();
- if(log.isInfoEnabled()) {
+ if (log.isInfoEnabled()) {
log.info("Autoscaler topology receiver thread started");
}
// Keep the thread live until terminated
while (!terminated) {
- try {
+ try {
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
}
- if(log.isInfoEnabled()) {
+ if (log.isInfoEnabled()) {
log.info("Autoscaler topology receiver thread terminated");
}
}
@@ -97,18 +97,19 @@ public class AutoscalerTopologyReceiver implements Runnable {
@Override
protected void onEvent(Event event) {
- try {
- TopologyManager.acquireReadLock();
- for(Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- startClusterMonitor(cluster);
- }
+ try {
+ TopologyManager.acquireReadLock();
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ startClusterMonitor(cluster);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
}
}
- finally {
- TopologyManager.releaseReadLock();
- }
- }
});
@@ -116,171 +117,178 @@ public class AutoscalerTopologyReceiver implements Runnable {
processorChain.addEventListener(new ClusterCreatedEventListener() {
@Override
protected void onEvent(Event event) {
- try {
- log.info("Event received: "+event);
- ClusterCreatedEvent e = (ClusterCreatedEvent) event;
- TopologyManager.acquireReadLock();
- Service service = TopologyManager.getTopology().getService(e.getServiceName());
- Cluster cluster = service.getCluster(e.getClusterId());
- startClusterMonitor(cluster);
- } finally {
- TopologyManager.releaseReadLock();
+ try {
+ log.info("Event received: " + event);
+ ClusterCreatedEvent e = (ClusterCreatedEvent) event;
+ TopologyManager.acquireReadLock();
+ Service service = TopologyManager.getTopology().getService(e.getServiceName());
+ Cluster cluster = service.getCluster(e.getClusterId());
+ startClusterMonitor(cluster);
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
}
+
+ });
+
+ processorChain.addEventListener(new ClusterRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ ClusterRemovedEvent e = (ClusterRemovedEvent) event;
+ TopologyManager.acquireReadLock();
+
+ String serviceName = e.getServiceName();
+ String clusterId = e.getClusterId();
+ String deploymentPolicy = e.getDeploymentPolicy();
+
+ AbstractMonitor monitor;
+
+ if (e.isLbCluster()) {
+ DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
+ if (depPolicy != null) {
+ List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
+ .getNetworkPartitionLbHolders(depPolicy);
+
+ for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
+ // removes lb cluster ids
+ boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
+ if (isRemoved) {
+ log.info("Removed the lb cluster [id]:"
+ + clusterId
+ + " reference from Network Partition [id]: "
+ + networkPartitionLbHolder
+ .getNetworkPartitionId());
+
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(networkPartitionLbHolder);
+ }
+
+ }
+ }
+ monitor = AutoscalerContext.getInstance()
+ .removeLbMonitor(clusterId);
+
+ } else {
+ monitor = AutoscalerContext.getInstance()
+ .removeMonitor(clusterId);
+ }
+
+ // runTerminateAllRule(monitor);
+ if (monitor != null) {
+ monitor.destroy();
+ log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
+ clusterId));
+ }
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
}
});
-
- processorChain.addEventListener(new ClusterRemovedEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- ClusterRemovedEvent e = (ClusterRemovedEvent) event;
- TopologyManager.acquireReadLock();
-
- String serviceName = e.getServiceName();
- String clusterId = e.getClusterId();
- String deploymentPolicy = e.getDeploymentPolicy();
-
- AbstractMonitor monitor;
-
- if (e.isLbCluster()) {
- DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
- if (depPolicy != null) {
- List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
- .getNetworkPartitionLbHolders(depPolicy);
-
- for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
- // removes lb cluster ids
- boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
- if (isRemoved) {
- log.info("Removed the lb cluster [id]:"
- + clusterId
- + " reference from Network Partition [id]: "
- + networkPartitionLbHolder
- .getNetworkPartitionId());
-
- }
- if (log.isDebugEnabled()) {
- log.debug(networkPartitionLbHolder);
- }
-
- }
- }
- monitor = AutoscalerContext.getInstance()
- .removeLbMonitor(clusterId);
-
- } else {
- monitor = AutoscalerContext.getInstance()
- .removeMonitor(clusterId);
- }
-
- // runTerminateAllRule(monitor);
- if (monitor != null) {
- monitor.destroy();
- log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
- clusterId));
- }
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- });
-
+
processorChain.addEventListener(new MemberStartedEventListener() {
@Override
protected void onEvent(Event event) {
-
+
}
});
-
+
processorChain.addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
-
- try {
- TopologyManager.acquireReadLock();
- MemberTerminatedEvent e = (MemberTerminatedEvent) event;
- String networkPartitionId = e.getNetworkPartitionId();
- String clusterId = e.getClusterId();
- String partitionId = e.getPartitionId();
- AbstractMonitor monitor;
-
- if(AutoscalerContext.getInstance().monitorExist(clusterId)){
- monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- } else {
- //This is LB member
- monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
- }
-
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
- String memberId = e.getMemberId();
- partitionContext.removeMemberStatsContext(memberId);
+ try {
+ TopologyManager.acquireReadLock();
+ MemberTerminatedEvent e = (MemberTerminatedEvent) event;
+ String networkPartitionId = e.getNetworkPartitionId();
+ String clusterId = e.getClusterId();
+ String partitionId = e.getPartitionId();
+ AbstractMonitor monitor;
+
+ if (AutoscalerContext.getInstance().monitorExist(clusterId)) {
+ monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ } else {
+ //This is LB member
+ monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
+ }
- if(partitionContext.removeTerminationPendingMember(memberId)){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member is removed from termination pending members list: [member] %s", memberId));
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+
+ PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
+ String memberId = e.getMemberId();
+ partitionContext.removeMemberStatsContext(memberId);
+
+ if (partitionContext.removeTerminationPendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from termination pending members list: [member] %s", memberId));
+ }
+ } else if (partitionContext.removePendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from pending members list: [member] %s", memberId));
+ }
+ } else if (partitionContext.removeActiveMemberById(memberId)) {
+ log.warn(String.format("Member is in the wrong list and it is removed from active members list", memberId));
+ } else {
+ log.warn(String.format("Member is not available in any of the list active, pending and termination pending", memberId));
}
- } else if(partitionContext.removePendingMember(memberId)) {
- if(log.isDebugEnabled()){
- log.debug(String.format("Member is removed from pending members list: [member] %s", memberId));
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been removed successfully: [member] %s", memberId));
}
- } else if(partitionContext.removeActiveMemberById(memberId)){
- log.warn(String.format("Member is in the wrong list and it is removed from active members list", memberId));
- } else {
- log.warn(String.format("Member is not available in any of the list active, pending and termination pending", memberId));
- }
-
- if(log.isInfoEnabled()){
- log.info(String.format("Member stat context has been removed successfully: [member] %s", memberId));
- }
// partitionContext.decrementCurrentActiveMemberCount(1);
- } finally {
- TopologyManager.releaseReadLock();
- }
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
}
});
-
+
processorChain.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
- try {
- TopologyManager.acquireReadLock();
-
- MemberActivatedEvent e = (MemberActivatedEvent)event;
- String memberId = e.getMemberId();
- String partitionId = e.getPartitionId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- PartitionContext partitionContext;
- String clusterId = e.getClusterId();
- AbstractMonitor monitor;
-
- if(AutoscalerContext.getInstance().monitorExist(clusterId)) {
- monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
- } else {
- monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
- partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
- }
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if(log.isInfoEnabled()){
- log.info(String.format("Member stat context has been added successfully: [member] %s", memberId));
- }
+ try {
+ TopologyManager.acquireReadLock();
+
+ MemberActivatedEvent e = (MemberActivatedEvent) event;
+ String memberId = e.getMemberId();
+ String partitionId = e.getPartitionId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ PartitionContext partitionContext;
+ String clusterId = e.getClusterId();
+ AbstractMonitor monitor;
+
+ if (AutoscalerContext.getInstance().monitorExist(clusterId)) {
+ monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+ } else {
+ monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
+ partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+ }
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added successfully: [member] %s", memberId));
+ }
// partitionContext.incrementCurrentActiveMemberCount(1);
- partitionContext.movePendingMemberToActiveMembers(memberId);
+ partitionContext.movePendingMemberToActiveMembers(memberId);
- }
- finally{
- TopologyManager.releaseReadLock();
- }
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
}
});
@@ -289,38 +297,39 @@ public class AutoscalerTopologyReceiver implements Runnable {
@Override
protected void onEvent(Event event) {
- try {
- TopologyManager.acquireReadLock();
-
- MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent)event;
- String memberId = e.getMemberId();
- String partitionId = e.getPartitionId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- PartitionContext partitionContext;
- String clusterId = e.getClusterId();
- AbstractMonitor monitor;
-
- if(AutoscalerContext.getInstance().monitorExist(clusterId)) {
- monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
- } else {
- monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
- partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
- }
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if(log.isDebugEnabled()){
- log.debug(String.format("Member has been moved as pending termination: [member] %s", memberId));
- }
- partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+ try {
+ TopologyManager.acquireReadLock();
- }
- finally{
- TopologyManager.releaseReadLock();
- }
+ MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent) event;
+ String memberId = e.getMemberId();
+ String partitionId = e.getPartitionId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ PartitionContext partitionContext;
+ String clusterId = e.getClusterId();
+ AbstractMonitor monitor;
+
+ if (AutoscalerContext.getInstance().monitorExist(clusterId)) {
+ monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+ } else {
+ monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
+ partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+ }
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member has been moved as pending termination: [member] %s", memberId));
+ }
+ partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
}
});
-
+
processorChain.addEventListener(new ServiceRemovedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -342,10 +351,10 @@ public class AutoscalerTopologyReceiver implements Runnable {
});
return processorChain;
}
-
+
private class LBClusterMonitorAdder implements Runnable {
private Cluster cluster;
-
+
public LBClusterMonitorAdder(Cluster cluster) {
this.cluster = cluster;
}
@@ -355,46 +364,46 @@ public class AutoscalerTopologyReceiver implements Runnable {
int retries = 5;
boolean success = false;
do {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e1) {
- }
- try {
- monitor = AutoscalerUtil.getLBClusterMonitor(cluster);
- success = true;
-
- } catch (PolicyValidationException e) {
- String msg = "LB Cluster monitor creation failed for cluster: "+cluster.getClusterId();
- log.debug(msg, e);
- retries--;
-
- } catch(PartitionValidationException e){
- String msg = "LB Cluster monitor creation failed for cluster: "+cluster.getClusterId();
- log.debug(msg, e);
- retries--;
- }
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e1) {
+ }
+ try {
+ monitor = AutoscalerUtil.getLBClusterMonitor(cluster);
+ success = true;
+
+ } catch (PolicyValidationException e) {
+ String msg = "LB Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+ log.debug(msg, e);
+ retries--;
+
+ } catch (PartitionValidationException e) {
+ String msg = "LB Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+ log.debug(msg, e);
+ retries--;
+ }
} while (!success && retries <= 0);
-
+
if (monitor == null) {
- String msg = "LB Cluster monitor creation failed, even after retrying for 5 times, "
- + "for cluster: "+cluster.getClusterId();
- log.error(msg);
- throw new RuntimeException(msg);
+ String msg = "LB Cluster monitor creation failed, even after retrying for 5 times, "
+ + "for cluster: " + cluster.getClusterId();
+ log.error(msg);
+ throw new RuntimeException(msg);
}
Thread th = new Thread(monitor);
th.start();
AutoscalerContext.getInstance().addLbMonitor(monitor);
- if(log.isInfoEnabled()){
+ if (log.isInfoEnabled()) {
log.info(String.format("LB Cluster monitor has been added successfully: [cluster] %s",
- cluster.getClusterId()));
+ cluster.getClusterId()));
}
}
}
-
+
private class ClusterMonitorAdder implements Runnable {
private Cluster cluster;
-
+
public ClusterMonitorAdder(Cluster cluster) {
this.cluster = cluster;
}
@@ -404,51 +413,51 @@ public class AutoscalerTopologyReceiver implements Runnable {
int retries = 5;
boolean success = false;
do {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e1) {
- }
-
- try {
- monitor = AutoscalerUtil.getClusterMonitor(cluster);
- success = true;
-
- } catch (PolicyValidationException e) {
- String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
- log.debug(msg, e);
- retries--;
-
- } catch(PartitionValidationException e){
- String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
- log.debug(msg, e);
- retries--;
- }
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e1) {
+ }
+
+ try {
+ monitor = AutoscalerUtil.getClusterMonitor(cluster);
+ success = true;
+
+ } catch (PolicyValidationException e) {
+ String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+ log.debug(msg, e);
+ retries--;
+
+ } catch (PartitionValidationException e) {
+ String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+ log.debug(msg, e);
+ retries--;
+ }
} while (!success && retries != 0);
-
+
if (monitor == null) {
- String msg = "Cluster monitor creation failed, even after retrying for 5 times, "
- + "for cluster: "+cluster.getClusterId();
- log.error(msg);
- throw new RuntimeException(msg);
+ String msg = "Cluster monitor creation failed, even after retrying for 5 times, "
+ + "for cluster: " + cluster.getClusterId();
+ log.error(msg);
+ throw new RuntimeException(msg);
}
Thread th = new Thread(monitor);
th.start();
AutoscalerContext.getInstance().addMonitor(monitor);
- if(log.isInfoEnabled()){
+ if (log.isInfoEnabled()) {
log.info(String.format("Cluster monitor has been added successfully: [cluster] %s",
- cluster.getClusterId()));
+ cluster.getClusterId()));
}
}
}
- private void runTerminateAllRule(AbstractMonitor monitor){
+ private void runTerminateAllRule(AbstractMonitor monitor) {
FactHandle terminateAllFactHandle = null;
StatefulKnowledgeSession terminateAllKnowledgeSession = null;
- for(NetworkPartitionContext networkPartitionContext: monitor.getNetworkPartitionCtxts().values()) {
+ for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) {
terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll(terminateAllKnowledgeSession
, terminateAllFactHandle, networkPartitionContext);
}
@@ -462,31 +471,32 @@ public class AutoscalerTopologyReceiver implements Runnable {
topologyReceiver.terminate();
terminated = true;
}
-
+
protected synchronized void startClusterMonitor(Cluster cluster) {
- Thread th = null;
- if (cluster.isLbCluster()
- && !AutoscalerContext.getInstance()
- .lbMonitorExist(
- cluster.getClusterId())) {
- th = new Thread(new LBClusterMonitorAdder(
- cluster));
- } else if (!cluster.isLbCluster() && !AutoscalerContext.getInstance()
- .monitorExist(cluster.getClusterId())) {
- th = new Thread(
- new ClusterMonitorAdder(cluster));
- }
- if (th != null) {
- th.start();
- try {
- th.join();
- } catch (InterruptedException ignore) {}
-
- if (log.isDebugEnabled()) {
- log.debug(String
- .format("Cluster monitor thread has been started successfully: [cluster] %s ",
- cluster.getClusterId()));
- }
- }
+ Thread th = null;
+ if (cluster.isLbCluster()
+ && !AutoscalerContext.getInstance()
+ .lbMonitorExist(
+ cluster.getClusterId())) {
+ th = new Thread(new LBClusterMonitorAdder(
+ cluster));
+ } else if (!cluster.isLbCluster() && !AutoscalerContext.getInstance()
+ .monitorExist(cluster.getClusterId())) {
+ th = new Thread(
+ new ClusterMonitorAdder(cluster));
+ }
+ if (th != null) {
+ th.start();
+ try {
+ th.join();
+ } catch (InterruptedException ignore) {
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Cluster monitor thread has been started successfully: [cluster] %s ",
+ cluster.getClusterId()));
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7744e6de/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
index 875cad6..c039f1b 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
@@ -65,10 +65,10 @@ public class LoadBalancerTopologyReceiver implements Runnable {
// Keep the thread live until terminated
while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
}
if (log.isInfoEnabled()) {
log.info("Load balancer topology receiver thread terminated");
@@ -99,6 +99,8 @@ public class LoadBalancerTopologyReceiver implements Runnable {
}
}
}
+ } catch (Exception e) {
+ log.error("Error processing event", e);
} finally {
TopologyManager.releaseReadLock();
}
@@ -145,6 +147,8 @@ public class LoadBalancerTopologyReceiver implements Runnable {
log.error(String.format("Service not found in topology: [service] %s", memberActivatedEvent.getServiceName()));
}
}
+ } catch (Exception e) {
+ log.error("Error processing event", e);
} finally {
TopologyManager.releaseReadLock();
}
@@ -167,6 +171,8 @@ public class LoadBalancerTopologyReceiver implements Runnable {
clusterRemovedEvent.getServiceName(), clusterRemovedEvent.getClusterId()));
}
}
+ } catch (Exception e) {
+ log.error("Error processing event", e);
} finally {
TopologyManager.releaseReadLock();
}
@@ -190,6 +196,8 @@ public class LoadBalancerTopologyReceiver implements Runnable {
log.warn(String.format("Service not found in topology: [service] %s", serviceRemovedEvent.getServiceName()));
}
}
+ } catch (Exception e) {
+ log.error("Error processing event", e);
} finally {
TopologyManager.releaseReadLock();
}