You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/16 13:26:18 UTC

git commit: Fixed thread termination logic in load balancer common, load balancer extension api and messaging components

Updated Branches:
  refs/heads/master 8e2cec750 -> ad1df3e11


Fixed thread termination logic in load balancer common, load balancer extension api and messaging components


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/ad1df3e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/ad1df3e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/ad1df3e1

Branch: refs/heads/master
Commit: ad1df3e11631d95efc6569e3ae0bb66352aa1564
Parents: 8e2cec7
Author: Imesh Gunaratne <im...@apache.org>
Authored: Sat Nov 16 17:53:28 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sat Nov 16 17:53:28 2013 +0530

----------------------------------------------------------------------
 .../common/topology/TopologyReceiver.java       | 44 ++++++++++++--------
 .../extension/api/LoadBalancerExtension.java    | 32 ++++++++++----
 .../api/LoadBalancerStatsNotifier.java          | 10 ++++-
 .../broker/heartbeat/TopicHealthChecker.java    |  8 +++-
 .../broker/subscribe/TopicSubscriber.java       | 26 ++++++++----
 .../topology/TopologyEventMessageDelegator.java | 12 +++++-
 6 files changed, 94 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
index 88958b5..4e91f2f 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyReceiver.java
@@ -33,6 +33,8 @@ import org.apache.stratos.messaging.util.Constants;
 public class TopologyReceiver implements Runnable {
     private static final Log log = LogFactory.getLog(TopologyReceiver.class);
     private TopologyEventMessageDelegator messageDelegator;
+    private TopicSubscriber topicSubscriber;
+    private boolean terminated;
 
     public TopologyReceiver() {
         this.messageDelegator = new TopologyEventMessageDelegator();
@@ -45,26 +47,34 @@ public class TopologyReceiver implements Runnable {
     @Override
     public void run() {
         try {
-                // Start topic subscriber thread
-                TopicSubscriber topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
-                topicSubscriber.setMessageListener(new TopologyEventMessageReceiver());
-                Thread subscriberThread = new Thread(topicSubscriber);
-                subscriberThread.start();
-                if (log.isDebugEnabled()) {
-                    log.debug("Topology event message receiver thread started");
-                }
+            // Start topic subscriber thread
+            topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+            topicSubscriber.setMessageListener(new TopologyEventMessageReceiver());
+            Thread subscriberThread = new Thread(topicSubscriber);
+            subscriberThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Topology event message receiver thread started");
+            }
 
-                // Start topology message receiver thread
-                Thread receiverThread = new Thread(messageDelegator);
-                receiverThread.start();
-                if (log.isDebugEnabled()) {
-                    log.debug("Topology message processor thread started");
-                }
-        }
-        catch (Exception e) {
-            if(log.isErrorEnabled()) {
+            // Start topology event message delegator thread
+            Thread receiverThread = new Thread(messageDelegator);
+            receiverThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Topology event message delegator thread started");
+            }
+
+            // Keep the thread live until terminated
+            while (!terminated);
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
                 log.error("Topology receiver failed", e);
             }
         }
     }
+
+    public void terminate() {
+        topicSubscriber.terminate();
+        messageDelegator.terminate();
+        terminated = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 63df3b7..74af8f9 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -38,6 +38,9 @@ public class LoadBalancerExtension implements Runnable {
     private LoadBalancer loadBalancer;
     private LoadBalancerStatsReader statsReader;
     private boolean loadBalancerStarted;
+    private TopologyReceiver topologyReceiver;
+    private LoadBalancerStatsNotifier statsNotifier;
+    private boolean terminated;
 
     public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatsReader statsReader) {
         this.loadBalancer = loadBalancer;
@@ -47,16 +50,22 @@ public class LoadBalancerExtension implements Runnable {
     @Override
     public void run() {
         try {
+            if(log.isInfoEnabled()) {
+                log.info("Load balancer extension started");
+            }
+
             // Start topology receiver thread
-            TopologyReceiver topologyReceiver = new TopologyReceiver(createMessageDelegator());
+            topologyReceiver = new TopologyReceiver(createMessageDelegator());
             Thread topologyReceiverThread = new Thread(topologyReceiver);
             topologyReceiverThread.start();
 
             // Start stats notifier thread
-            LoadBalancerStatsNotifier statsNotifier = new LoadBalancerStatsNotifier(statsReader);
+            statsNotifier = new LoadBalancerStatsNotifier(statsReader);
             Thread statsNotifierThread = new Thread(statsNotifier);
             statsNotifierThread.start();
 
+            // Keep the thread live until terminated
+            while (!terminated);
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("Could not start load balancer extension", e);
@@ -82,11 +91,11 @@ public class LoadBalancerExtension implements Runnable {
                     // Complete topology event is only received once
                     // Remove event listener
                     messageDelegator.removeCompleteTopologyEventListener(this);
-                }
-                catch (Exception e) {
-                    if(log.isErrorEnabled()) {
+                } catch (Exception e) {
+                    if (log.isErrorEnabled()) {
                         log.error("Could not start load balancer", e);
                     }
+                    terminate();
                 }
             }
         });
@@ -130,14 +139,19 @@ public class LoadBalancerExtension implements Runnable {
 
     private void reloadConfiguration() {
         try {
-            if(loadBalancerStarted) {
+            if (loadBalancerStarted) {
                 loadBalancer.reload(TopologyManager.getTopology());
             }
-        }
-        catch (Exception e) {
-            if(log.isErrorEnabled()) {
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
                 log.error("Could not reload load balancer configuration", e);
             }
         }
     }
+
+    public void terminate() {
+        topologyReceiver.terminate();
+        statsNotifier.terminate();
+        terminated = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
index 5beaec0..dc5a8a6 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
@@ -39,6 +39,7 @@ public class LoadBalancerStatsNotifier implements Runnable {
     private LoadBalancerStatsReader statsReader;
     private final LoadBalancerStatsPublisher statsPublisher;
     private long statsPublisherInterval = 15000;
+    private boolean terminated;
 
     public LoadBalancerStatsNotifier(LoadBalancerStatsReader statsReader) {
         this.statsReader = statsReader;
@@ -52,7 +53,7 @@ public class LoadBalancerStatsNotifier implements Runnable {
 
     @Override
     public void run() {
-        while (true) {
+        while (!terminated) {
             try {
                 try {
                     Thread.sleep(statsPublisherInterval);
@@ -74,4 +75,11 @@ public class LoadBalancerStatsNotifier implements Runnable {
             }
         }
     }
+
+    /**
+     * Terminate load balancer statistics notifier thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
index 4d7638a..0327c20 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
@@ -34,8 +34,9 @@ public class TopicHealthChecker implements Runnable {
 
 	private static final Log log = LogFactory.getLog(TopicHealthChecker.class);
 	private String topicName;
+    private boolean terminated;
 
-	public TopicHealthChecker(String name) {
+    public TopicHealthChecker(String name) {
 		topicName = name;
 	}
 
@@ -44,7 +45,7 @@ public class TopicHealthChecker implements Runnable {
 		log.info("Topic Health Checker is running... ");
 
 		TopicConnector testConnector = new TopicConnector();
-		while (true) {
+		while (!terminated) {
 			try {
 				// health checker runs in every 30s
 				Thread.sleep(30000);
@@ -71,4 +72,7 @@ public class TopicHealthChecker implements Runnable {
 
 	}
 
+    public void terminate() {
+        terminated = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index 577fee4..d8ec008 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -29,17 +29,20 @@ import org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker;
 /**
  * Any instance who needs to subscribe to a topic, should communicate with this
  * object.
- * 
+ *
  * @author nirmal
- * 
+ *
  */
 public class TopicSubscriber implements Runnable {
 
 	private static final Log log = LogFactory.getLog(TopicSubscriber.class);
-	private MessageListener messageListener;
+
+    private boolean terminated = false;
+    private MessageListener messageListener;
 	private TopicSession topicSession;
 	private String topicName;
 	private TopicConnector connector;
+    private TopicHealthChecker healthChecker;
 	private javax.jms.TopicSubscriber topicSubscriber = null;
 
 	/**
@@ -84,7 +87,8 @@ public class TopicSubscriber implements Runnable {
 	@Override
 	public void run() {
 
-		while (true) {
+        // Keep the thread live until terminated
+		while (!terminated) {
 			try {
 				doSubscribe();
 
@@ -92,11 +96,12 @@ public class TopicSubscriber implements Runnable {
 				log.error("Error while subscribing to the topic: " + topicName, e);
 			} finally {
 				// start the health checker
-				Thread healthChecker = new Thread(new TopicHealthChecker(topicName));
-				healthChecker.start();
+                healthChecker = new TopicHealthChecker(topicName);
+			    Thread healthCheckerThread = new Thread(healthChecker);
+				healthCheckerThread.start();
 				try {
 					// waits till the thread finishes.
-					healthChecker.join();
+					healthCheckerThread.join();
 				} catch (InterruptedException ignore) {
 				}
 				// health checker failed
@@ -117,4 +122,11 @@ public class TopicSubscriber implements Runnable {
 		}
 	}
 
+    /**
+     * Terminate topic subscriber.
+     */
+    public void terminate() {
+        healthChecker.terminate();
+        terminated = true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ad1df3e1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 285bd0a..408e27c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -42,6 +42,7 @@ public class TopologyEventMessageDelegator implements Runnable {
     private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class);
     private CompleteTopologyEventProcessor completeTopEvMsgProcessor;
     private MessageProcessorChain processorChain;
+    private boolean terminated;
 
     public TopologyEventMessageDelegator() {
         this.completeTopEvMsgProcessor = new CompleteTopologyEventProcessor();
@@ -68,7 +69,7 @@ public class TopologyEventMessageDelegator implements Runnable {
                 log.info("Topology event message delegator started");
                 log.info("Waiting for the complete topology event message...");
             }
-            while (true) {
+            while (!terminated) {
                 try {
                     // First take the complete topology event
                     TextMessage message = TopologyEventQueue.getInstance().take();
@@ -86,7 +87,7 @@ public class TopologyEventMessageDelegator implements Runnable {
                 }
             }
 
-            while (true) {
+            while (!terminated) {
                 try {
                     TextMessage message = TopologyEventQueue.getInstance().take();
 
@@ -119,4 +120,11 @@ public class TopologyEventMessageDelegator implements Runnable {
             }
         }
     }
+
+    /**
+     * Terminate topology event message delegator thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
 }