You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/04/24 14:12:46 UTC

[1/3] git commit: Changed load balancer endpoint timeout to 60 sec

Repository: incubator-stratos
Updated Branches:
  refs/heads/master 6095199b0 -> 5612e5f74


Changed load balancer endpoint timeout to 60 sec

Signed-off-by: Imesh Gunaratne <im...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/5f808a02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/5f808a02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/5f808a02

Branch: refs/heads/master
Commit: 5f808a02cc8f656e86116def9cc0e4bdd48178cd
Parents: 6095199
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Apr 24 12:50:38 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Apr 24 17:41:57 2014 +0530

----------------------------------------------------------------------
 .../modules/distribution/src/main/conf/loadbalancer.conf           | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5f808a02/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
----------------------------------------------------------------------
diff --git a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
index ba3e8d0..8060030 100644
--- a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
+++ b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
@@ -32,7 +32,7 @@ loadbalancer {
     session-affinity: true;
 
     # Endpoint timeout in milli-seconds
-    endpoint-timeout: 30000;
+    endpoint-timeout: 60000;
 
     # Session timeout in milli-seconds
     session-timeout: 90000;


[3/3] git commit: Removed topic publisher retry from health checker ping event

Posted by im...@apache.org.
Removed topic publisher retry from health checker ping event

Signed-off-by: Imesh Gunaratne <im...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/5612e5f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/5612e5f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/5612e5f7

Branch: refs/heads/master
Commit: 5612e5f74670bb4a6c07f7844e863f11bd6407d9
Parents: 89832ae
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Apr 24 13:29:10 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Apr 24 17:42:23 2014 +0530

----------------------------------------------------------------------
 .../messaging/broker/heartbeat/TopicHealthChecker.java |  2 +-
 .../messaging/broker/publish/EventPublisher.java       |  6 +++++-
 .../messaging/broker/publish/TopicPublisher.java       | 13 ++++++++++---
 .../stratos/messaging/publish/MessagePublisher.java    |  4 ++--
 4 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5612e5f7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
index d36b9cb..72466aa 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
@@ -57,7 +57,7 @@ public class TopicHealthChecker implements Runnable {
 				Thread.sleep(1000);
 				testConnector.init(topicName);
                 // A ping event is published to detect a session timeout
-                EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent());
+                EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent(), false);
 			} catch (Exception e) {
 				// Implies connection is not established
 				// sleep for 30 sec and retry

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5612e5f7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 8db203c..2150494 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -43,10 +43,14 @@ public class EventPublisher extends TopicPublisher {
      * @param event event to be published
      */
     public void publish(Event event) {
+        publish(event, true);
+    }
+
+    public void publish(Event event, boolean retry) {
         synchronized (EventPublisher.class) {
             Properties headers = new Properties();
             headers.put(Constants.EVENT_CLASS_NAME, event.getClass().getName());
-            super.publish(event, headers);
+            super.publish(event, headers, retry);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5612e5f7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index 7394d5b..c0074f5 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -67,11 +67,11 @@ public class TopicPublisher extends MessagePublisher {
 	 * lost, this will perform re-subscription periodically, until a connection
 	 * obtained.
 	 */
-	public void publish(Object messageObj) {
-		publish(messageObj, null);
+	public void publish(Object messageObj, boolean retry) {
+		publish(messageObj, null, retry);
 	}
 	
-	public void publish(Object messageObj, Properties headers) {
+	public void publish(Object messageObj, Properties headers, boolean retry) {
         synchronized (TopicPublisher.class) {
             Gson gson = new Gson();
             String message = gson.toJson(messageObj);
@@ -86,6 +86,13 @@ public class TopicPublisher extends MessagePublisher {
                     if(log.isErrorEnabled()) {
                         log.error("Error while publishing to the topic: " + getName(), e);
                     }
+                    if(!retry) {
+                        if(log.isDebugEnabled()) {
+                            log.debug("Retry disabled for topic " + getName());
+                        }
+                        throw new RuntimeException(e);
+                    }
+
                     if(log.isInfoEnabled()) {
                         log.info("Will try to re-publish in 60 sec");
                     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5612e5f7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
index ca3399f..758089f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
@@ -54,7 +54,7 @@ public abstract class MessagePublisher {
 	 * @param messageObj
 	 *            POJO to be published.
 	 */
-	public abstract void publish(Object messageObj);
+	public abstract void publish(Object messageObj, boolean retry);
 
 	/**
 	 * This operation get triggered when a message is ready to be published.
@@ -67,5 +67,5 @@ public abstract class MessagePublisher {
 	 * @param headers
 	 *            properties to be set as message headers.
 	 */
-	public abstract void publish(Object messageObj, Properties headers);
+	public abstract void publish(Object messageObj, Properties headers, boolean retry);
 }


[2/3] git commit: Fixed messaging model session timeout handling, subscriber re-connection and publisher retry logic

Posted by im...@apache.org.
Fixed messaging model session timeout handling, subscriber re-connection and publisher retry logic

Signed-off-by: Imesh Gunaratne <im...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/89832ae0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/89832ae0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/89832ae0

Branch: refs/heads/master
Commit: 89832ae06defd09a98417d953a175f3289cab58e
Parents: 5f808a0
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Apr 24 12:49:35 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Apr 24 17:42:13 2014 +0530

----------------------------------------------------------------------
 .../broker/connect/TopicConnector.java          |   1 -
 .../broker/heartbeat/TopicHealthChecker.java    |  23 ++--
 .../broker/publish/EventPublisher.java          |   8 +-
 .../broker/publish/TopicPublisher.java          | 135 ++++++++++++++-----
 .../broker/subscribe/TopicSubscriber.java       | 119 ++++++++++------
 .../stratos/messaging/event/ping/PingEvent.java |  30 +++++
 .../stratos/messaging/util/Constants.java       |   1 +
 7 files changed, 226 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
index a4fc5c4..ebb339d 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
@@ -67,7 +67,6 @@ public class TopicConnector {
         }
         topicConnection = connFactory.createTopicConnection();
         topicConnection.start();
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
index 1fbd25e..d36b9cb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
@@ -21,6 +21,9 @@ package org.apache.stratos.messaging.broker.heartbeat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.connect.TopicConnector;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.event.ping.PingEvent;
+import org.apache.stratos.messaging.util.Constants;
 
 import javax.jms.JMSException;
 
@@ -43,22 +46,24 @@ public class TopicHealthChecker implements Runnable {
 	@Override
 	public void run() {
         if(log.isDebugEnabled()){
-		    log.debug(topicName + " topic Health Checker is running... " );
+		    log.debug(topicName + " topic health checker is running... " );
         }
 		TopicConnector testConnector = new TopicConnector();
 		while (!terminated) {
 			try {
-				// health checker runs in every 30s
-				Thread.sleep(30000);
-
+				// Health checker needs to run with the smallest possible time interval
+				// to detect a connection drop. Otherwise the subscriber will not
+                // get reconnected after a connection drop.
+				Thread.sleep(1000);
 				testConnector.init(topicName);
-
+                // A ping event is published to detect a session timeout
+                EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent());
 			} catch (Exception e) {
-				// implies connection is not established
-				// sleep for 5s and retry
+				// Implies connection is not established
+				// sleep for 30 sec and retry
 				try {
-					log.error(topicName + " topic health checker is failed and will retry to establish a connection after 5s.");
-					Thread.sleep(5000);
+					log.error(topicName + " topic health checker is failed and will try to subscribe again in 30 sec");
+					Thread.sleep(30000);
 					break;
 				} catch (InterruptedException ignore) {
 				}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 5d39956..8db203c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -43,8 +43,10 @@ public class EventPublisher extends TopicPublisher {
      * @param event event to be published
      */
     public void publish(Event event) {
-        Properties headers = new Properties();
-        headers.put(Constants.EVENT_CLASS_NAME, event.getClass().getName());
-        super.publish(event, headers);
+        synchronized (EventPublisher.class) {
+            Properties headers = new Properties();
+            headers.put(Constants.EVENT_CLASS_NAME, event.getClass().getName());
+            super.publish(event, headers);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index 004be13..7394d5b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -48,6 +48,7 @@ public class TopicPublisher extends MessagePublisher {
 	private TopicSession topicSession;
 	private TopicConnector connector;
 	private javax.jms.TopicPublisher topicPublisher = null;
+    private boolean initialized;
 
 	/**
 	 * @param aTopicName
@@ -56,6 +57,9 @@ public class TopicPublisher extends MessagePublisher {
 	TopicPublisher(String aTopicName) {
 		super(aTopicName);
 		connector = new TopicConnector();
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic publisher connector created: [topic] %s", getName()));
+        }
 	}
 
 	/**
@@ -68,35 +72,86 @@ public class TopicPublisher extends MessagePublisher {
 	}
 	
 	public void publish(Object messageObj, Properties headers) {
-		
-		Gson gson = new Gson();
-		String message = gson.toJson(messageObj);
-		try {
-			doPublish(message, headers);
-			
-		} catch (Exception e) {
-			log.error("Error while publishing to the topic: " + getName(), e);
-			// TODO would it be worth to throw this exception?
-		}
+        synchronized (TopicPublisher.class) {
+            Gson gson = new Gson();
+            String message = gson.toJson(messageObj);
+            boolean published = false;
+            while(!published) {
+
+                try {
+                    doPublish(message, headers);
+                    published = true;
+                } catch (Exception e) {
+                    initialized = false;
+                    if(log.isErrorEnabled()) {
+                        log.error("Error while publishing to the topic: " + getName(), e);
+                    }
+                    if(log.isInfoEnabled()) {
+                        log.info("Will try to re-publish in 60 sec");
+                    }
+                    try {
+                        Thread.sleep(60000);
+                    } catch (InterruptedException ignore) {
+                    }
+                }
+            }
+        }
 	}
 
 	public void close() {
-
-		// closes all sessions/connections
-		try {
-			topicPublisher.close();
-			topicSession.close();
-			connector.close();
-		} catch (JMSException ignore) {
-		}
+        synchronized (TopicPublisher.class) {
+            // closes all sessions/connections
+            try {
+                if(topicPublisher != null) {
+                    topicPublisher.close();
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Topic publisher closed: [topic] %s", getName()));
+                    }
+                }
+                if(topicSession != null) {
+                    topicSession.close();
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Topic publisher session closed: [topic] %s", getName()));
+                    }
+                }
+                if(connector != null) {
+                    connector.close();
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Topic publisher connector closed: [topic] %s", getName()));
+                    }
+                }
+            } catch (JMSException ignore) {
+            }
+        }
 	}
 
 	private void doPublish(String message, Properties headers) throws Exception, JMSException {
-		setPublisher();
+        if(!initialized) {
+            // Initialize a topic connection to the message broker
+            connector.init(getName());
+            initialized = true;
+            if(log.isDebugEnabled()) {
+                log.debug(String.format("Topic publisher connector initialized: [topic] %s", getName()));
+            }
+        }
 
-		TextMessage textMessage = topicSession.createTextMessage(message);
+        try {
+        // Create a new session
+        topicSession = createSession(connector);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic publisher session created: [topic] %s", getName()));
+        }
+        // Create a publisher from session
+        topicPublisher = createPublisher(topicSession);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic publisher created: [topic] %s", getName()));
+        }
+
+        // Create text message
+        TextMessage textMessage = topicSession.createTextMessage(message);
 		
 		if (headers != null) {
+            // Add header properties
 			@SuppressWarnings("rawtypes")
 			Enumeration e = headers.propertyNames();
 
@@ -110,26 +165,34 @@ public class TopicPublisher extends MessagePublisher {
         if (log.isDebugEnabled()) {
             log.debug(String.format("Message published: [topic] %s [header] %s [body] %s", getName(), (headers != null) ? headers.toString() : "null", message));
         }
-	}
+        }
+        finally {
+            if(topicPublisher != null) {
+                topicPublisher.close();
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Topic publisher closed: [topic] %s", getName()));
+                }
+            }
+            if(topicSession != null) {
+                topicSession.close();
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Topic publisher session closed: [topic] %s", getName()));
+                }
+            }
+        }
+    }
 
-	private void setPublisher() throws Exception, JMSException {
-		if (topicSession != null && topicPublisher != null) {
-			return;
-		}
-		
-		if (topicSession == null) {
-			// initialize a TopicConnector
-			connector.init(getName());
-			// get a session
-			topicSession = connector.newSession();
-		}
-		
-		Topic topic = connector.getTopic();
+    private TopicSession createSession(TopicConnector topicConnector) throws Exception {
+        // Create a new session
+        return topicConnector.newSession();
+    }
+
+	private javax.jms.TopicPublisher createPublisher(TopicSession topicSession) throws Exception, JMSException {
+        Topic topic = connector.getTopic();
 		if (topic == null) {
 			// if the topic doesn't exist, create it.
 			topic = topicSession.createTopic(getName());
 		}
-		topicPublisher = topicSession.createPublisher(topic);
+		return topicSession.createPublisher(topic);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index f6fa587..64ce136 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -53,30 +53,45 @@ public class TopicSubscriber implements Runnable {
 	public TopicSubscriber(String aTopicName) {
 		topicName = aTopicName;
 		connector = new TopicConnector();
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic subscriber connector created: [topic] %s", topicName));
+        }
 	}
 
 	private void doSubscribe() throws Exception, JMSException {
-		if (topicSession != null && topicSubscriber != null) {
-			return;
-		}
-		
-		if (topicSession == null) {
-			// initialize a TopicConnector
-			connector.init(topicName);
-			// get a session
-			topicSession = connector.newSession();
-		}
-		
-		Topic topic = connector.getTopic();
-		if (topic == null) {
-			// if topic doesn't exist, create it.
-			topic = topicSession.createTopic(topicName);
-		}
-		topicSubscriber = topicSession.createSubscriber(topic);
-		topicSubscriber.setMessageListener(messageListener);
+		// Initialize a topic connection
+		connector.init(topicName);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic subscriber connector initialized: [topic] %s", topicName));
+        }
+        // Create new session
+        topicSession = createSession(connector);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic subscriber session created: [topic] %s", topicName));
+        }
+        // Create a new subscriber
+        createSubscriber(topicSession);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic subscriber created: [topic] %s", topicName));
+        }
         subscribed = true;
 	}
 
+    private void createSubscriber(TopicSession topicSession) throws JMSException {
+        Topic topic = connector.getTopic();
+        if (topic == null) {
+            // if topic doesn't exist, create it.
+            topic = topicSession.createTopic(topicName);
+        }
+        topicSubscriber = topicSession.createSubscriber(topic);
+        topicSubscriber.setMessageListener(messageListener);
+    }
+
+    private TopicSession createSession(TopicConnector topicConnector) throws Exception {
+        // Create a new session
+        return topicConnector.newSession();
+    }
+
 	/**
 	 * @param messageListener
 	 *            this MessageListener will get triggered each time this
@@ -100,32 +115,52 @@ public class TopicSubscriber implements Runnable {
 			try {
 				doSubscribe();
 			} catch (Exception e) {
+                subscribed = false;
 				log.error("Error while subscribing to the topic: " + topicName, e);
 			} finally {
-				// start the health checker
-                healthChecker = new TopicHealthChecker(topicName);
-			    Thread healthCheckerThread = new Thread(healthChecker);
-				healthCheckerThread.start();
-				try {
-					// waits till the thread finishes.
-					healthCheckerThread.join();
-				} catch (InterruptedException ignore) {
-				}
-				// health checker failed
-				// closes all sessions/connections
-				try {
-                    subscribed = false;
-					if (topicSubscriber != null) {
-						topicSubscriber.close();
-					}
-					if (topicSession != null) {
-						topicSession.close();
-					}
-					if (connector != null) {
-						connector.close();
-					}
-				} catch (JMSException ignore) {
-				}
+                if(subscribed) {
+                    // start the health checker if subscribed
+                    healthChecker = new TopicHealthChecker(topicName);
+                    Thread healthCheckerThread = new Thread(healthChecker);
+                    healthCheckerThread.start();
+                    try {
+                        // waits till the thread finishes.
+                        healthCheckerThread.join();
+                    } catch (InterruptedException ignore) {
+                    }
+                }
+                else {
+				    // subscription failed
+                    if(log.isInfoEnabled()) {
+                        log.info("Will try to subscribe again in 30 sec");
+                    }
+                    try {
+                        Thread.sleep(30000);
+                    } catch (InterruptedException ignore) {
+                    }
+                }
+                // closes all sessions/connections
+                try {
+                    if (topicSubscriber != null) {
+                        topicSubscriber.close();
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Topic subscriber closed: [topic] %s", topicName));
+                        }
+                    }
+                    if (topicSession != null) {
+                        topicSession.close();
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Topic subscriber session closed: [topic] %s", topicName));
+                        }
+                    }
+                    if (connector != null) {
+                        connector.close();
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Topic subscriber connector closed: [topic] %s", topicName));
+                        }
+                    }
+                } catch (JMSException ignore) {
+                }
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java
new file mode 100644
index 0000000..0fcb6a5
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.ping;
+
+import org.apache.stratos.messaging.event.instance.notifier.InstanceNotifierEvent;
+
+import java.io.Serializable;
+
+/**
+ * Ping event.
+ */
+public class PingEvent extends InstanceNotifierEvent implements Serializable {
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
index 397a468..8b3c814 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
@@ -24,6 +24,7 @@ public class Constants {
 	public static final String HEALTH_STAT_TOPIC = "summarized-health-stats";
     public static final String INSTANCE_STATUS_TOPIC = "instance-status";
     public static final String INSTANCE_NOTIFIER_TOPIC = "instance-notifier";
+    public static final String PING_TOPIC = "ping";
     public static final String TENANT_TOPIC = "tenant";
     public static final String TENANT_RANGE_ALL = "*";