You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/16 13:26:18 UTC
git commit: Fixed thread termination logic in load balancer common,
load balancer extension api and messaging components
Updated Branches:
refs/heads/master 8e2cec750 -> ad1df3e11
Fixed thread termination logic in load balancer common, load balancer extension api and messaging components
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/ad1df3e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/ad1df3e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/ad1df3e1
Branch: refs/heads/master
Commit: ad1df3e11631d95efc6569e3ae0bb66352aa1564
Parents: 8e2cec7
Author: Imesh Gunaratne <im...@apache.org>
Authored: Sat Nov 16 17:53:28 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sat Nov 16 17:53:28 2013 +0530
----------------------------------------------------------------------
.../common/topology/TopologyReceiver.java | 44 ++++++++++++--------
.../extension/api/LoadBalancerExtension.java | 32 ++++++++++----
.../api/LoadBalancerStatsNotifier.java | 10 ++++-
.../broker/heartbeat/TopicHealthChecker.java | 8 +++-
.../broker/subscribe/TopicSubscriber.java | 26 ++++++++----
.../topology/TopologyEventMessageDelegator.java | 12 +++++-
6 files changed, 94 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
index 88958b5..4e91f2f 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
@@ -33,6 +33,8 @@ import org.apache.stratos.messaging.util.Constants;
public class TopologyReceiver implements Runnable {
private static final Log log = LogFactory.getLog(TopologyReceiver.class);
private TopologyEventMessageDelegator messageDelegator;
+ private TopicSubscriber topicSubscriber;
+ private boolean terminated;
public TopologyReceiver() {
this.messageDelegator = new TopologyEventMessageDelegator();
@@ -45,26 +47,34 @@ public class TopologyReceiver implements Runnable {
@Override
public void run() {
try {
- // Start topic subscriber thread
- TopicSubscriber topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
- topicSubscriber.setMessageListener(new TopologyEventMessageReceiver());
- Thread subscriberThread = new Thread(topicSubscriber);
- subscriberThread.start();
- if (log.isDebugEnabled()) {
- log.debug("Topology event message receiver thread started");
- }
+ // Start topic subscriber thread
+ topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+ topicSubscriber.setMessageListener(new TopologyEventMessageReceiver());
+ Thread subscriberThread = new Thread(topicSubscriber);
+ subscriberThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Topology event message receiver thread started");
+ }
- // Start topology message receiver thread
- Thread receiverThread = new Thread(messageDelegator);
- receiverThread.start();
- if (log.isDebugEnabled()) {
- log.debug("Topology message processor thread started");
- }
- }
- catch (Exception e) {
- if(log.isErrorEnabled()) {
+ // Start topology event message delegator thread
+ Thread receiverThread = new Thread(messageDelegator);
+ receiverThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Topology event message delegator thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated);
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
log.error("Topology receiver failed", e);
}
}
}
+
+ public void terminate() {
+ topicSubscriber.terminate();
+ messageDelegator.terminate();
+ terminated = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 63df3b7..74af8f9 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -38,6 +38,9 @@ public class LoadBalancerExtension implements Runnable {
private LoadBalancer loadBalancer;
private LoadBalancerStatsReader statsReader;
private boolean loadBalancerStarted;
+ private TopologyReceiver topologyReceiver;
+ private LoadBalancerStatsNotifier statsNotifier;
+ private boolean terminated;
public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatsReader statsReader) {
this.loadBalancer = loadBalancer;
@@ -47,16 +50,22 @@ public class LoadBalancerExtension implements Runnable {
@Override
public void run() {
try {
+ if(log.isInfoEnabled()) {
+ log.info("Load balancer extension started");
+ }
+
// Start topology receiver thread
- TopologyReceiver topologyReceiver = new TopologyReceiver(createMessageDelegator());
+ topologyReceiver = new TopologyReceiver(createMessageDelegator());
Thread topologyReceiverThread = new Thread(topologyReceiver);
topologyReceiverThread.start();
// Start stats notifier thread
- LoadBalancerStatsNotifier statsNotifier = new LoadBalancerStatsNotifier(statsReader);
+ statsNotifier = new LoadBalancerStatsNotifier(statsReader);
Thread statsNotifierThread = new Thread(statsNotifier);
statsNotifierThread.start();
+ // Keep the thread live until terminated
+ while (!terminated);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Could not start load balancer extension", e);
@@ -82,11 +91,11 @@ public class LoadBalancerExtension implements Runnable {
// Complete topology event is only received once
// Remove event listener
messageDelegator.removeCompleteTopologyEventListener(this);
- }
- catch (Exception e) {
- if(log.isErrorEnabled()) {
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
log.error("Could not start load balancer", e);
}
+ terminate();
}
}
});
@@ -130,14 +139,19 @@ public class LoadBalancerExtension implements Runnable {
private void reloadConfiguration() {
try {
- if(loadBalancerStarted) {
+ if (loadBalancerStarted) {
loadBalancer.reload(TopologyManager.getTopology());
}
- }
- catch (Exception e) {
- if(log.isErrorEnabled()) {
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
log.error("Could not reload load balancer configuration", e);
}
}
}
+
+ public void terminate() {
+ topologyReceiver.terminate();
+ statsNotifier.terminate();
+ terminated = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
index 5beaec0..dc5a8a6 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
@@ -39,6 +39,7 @@ public class LoadBalancerStatsNotifier implements Runnable {
private LoadBalancerStatsReader statsReader;
private final LoadBalancerStatsPublisher statsPublisher;
private long statsPublisherInterval = 15000;
+ private boolean terminated;
public LoadBalancerStatsNotifier(LoadBalancerStatsReader statsReader) {
this.statsReader = statsReader;
@@ -52,7 +53,7 @@ public class LoadBalancerStatsNotifier implements Runnable {
@Override
public void run() {
- while (true) {
+ while (!terminated) {
try {
try {
Thread.sleep(statsPublisherInterval);
@@ -74,4 +75,11 @@ public class LoadBalancerStatsNotifier implements Runnable {
}
}
}
+
+ /**
+ * Terminate load balancer statistics notifier thread.
+ */
+ public void terminate() {
+ terminated = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
index 4d7638a..0327c20 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
@@ -34,8 +34,9 @@ public class TopicHealthChecker implements Runnable {
private static final Log log = LogFactory.getLog(TopicHealthChecker.class);
private String topicName;
+ private boolean terminated;
- public TopicHealthChecker(String name) {
+ public TopicHealthChecker(String name) {
topicName = name;
}
@@ -44,7 +45,7 @@ public class TopicHealthChecker implements Runnable {
log.info("Topic Health Checker is running... ");
TopicConnector testConnector = new TopicConnector();
- while (true) {
+ while (!terminated) {
try {
// health checker runs in every 30s
Thread.sleep(30000);
@@ -71,4 +72,7 @@ public class TopicHealthChecker implements Runnable {
}
+ public void terminate() {
+ terminated = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index 577fee4..d8ec008 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -29,17 +29,20 @@ import org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker;
/**
* Any instance who needs to subscribe to a topic, should communicate with this
* object.
- *
+ *
* @author nirmal
- *
+ *
*/
public class TopicSubscriber implements Runnable {
private static final Log log = LogFactory.getLog(TopicSubscriber.class);
- private MessageListener messageListener;
+
+ private boolean terminated = false;
+ private MessageListener messageListener;
private TopicSession topicSession;
private String topicName;
private TopicConnector connector;
+ private TopicHealthChecker healthChecker;
private javax.jms.TopicSubscriber topicSubscriber = null;
/**
@@ -84,7 +87,8 @@ public class TopicSubscriber implements Runnable {
@Override
public void run() {
- while (true) {
+ // Keep the thread live until terminated
+ while (!terminated) {
try {
doSubscribe();
@@ -92,11 +96,12 @@ public class TopicSubscriber implements Runnable {
log.error("Error while subscribing to the topic: " + topicName, e);
} finally {
// start the health checker
- Thread healthChecker = new Thread(new TopicHealthChecker(topicName));
- healthChecker.start();
+ healthChecker = new TopicHealthChecker(topicName);
+ Thread healthCheckerThread = new Thread(healthChecker);
+ healthCheckerThread.start();
try {
// waits till the thread finishes.
- healthChecker.join();
+ healthCheckerThread.join();
} catch (InterruptedException ignore) {
}
// health checker failed
@@ -117,4 +122,11 @@ public class TopicSubscriber implements Runnable {
}
}
+ /**
+ * Terminate topic subscriber.
+ */
+ public void terminate() {
+ healthChecker.terminate();
+ terminated = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 285bd0a..408e27c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -42,6 +42,7 @@ public class TopologyEventMessageDelegator implements Runnable {
private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class);
private CompleteTopologyEventProcessor completeTopEvMsgProcessor;
private MessageProcessorChain processorChain;
+ private boolean terminated;
public TopologyEventMessageDelegator() {
this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor();
@@ -68,7 +69,7 @@ public class TopologyEventMessageDelegator implements Runnable {
log.info("Topology event message delegator started");
log.info("Waiting for the complete topology event message...");
}
- while (true) {
+ while (!terminated) {
try {
// First take the complete topology event
TextMessage message = TopologyEventQueue.getInstance().take();
@@ -86,7 +87,7 @@ public class TopologyEventMessageDelegator implements Runnable {
}
}
- while (true) {
+ while (!terminated) {
try {
TextMessage message = TopologyEventQueue.getInstance().take();
@@ -119,4 +120,11 @@ public class TopologyEventMessageDelegator implements Runnable {
}
}
}
+
+ /**
+ * Terminate topology event message delegator thread.
+ */
+ public void terminate() {
+ terminated = true;
+ }
}