You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/04/24 14:12:46 UTC
[1/3] git commit: Changed load balancer endpoint timeout to 60 sec
Repository: incubator-stratos
Updated Branches:
refs/heads/master 6095199b0 -> 5612e5f74
Changed load balancer endpoint timeout to 60 sec
Signed-off-by: Imesh Gunaratne <im...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/5f808a02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/5f808a02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/5f808a02
Branch: refs/heads/master
Commit: 5f808a02cc8f656e86116def9cc0e4bdd48178cd
Parents: 6095199
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Apr 24 12:50:38 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Apr 24 17:41:57 2014 +0530
----------------------------------------------------------------------
.../modules/distribution/src/main/conf/loadbalancer.conf | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5f808a02/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
----------------------------------------------------------------------
diff --git a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
index ba3e8d0..8060030 100644
--- a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
+++ b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
@@ -32,7 +32,7 @@ loadbalancer {
session-affinity: true;
# Endpoint timeout in milli-seconds
- endpoint-timeout: 30000;
+ endpoint-timeout: 60000;
# Session timeout in milli-seconds
session-timeout: 90000;
[3/3] git commit: Removed topic publisher retry from health checker
ping event
Posted by im...@apache.org.
Removed topic publisher retry from health checker ping event
Signed-off-by: Imesh Gunaratne <im...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/5612e5f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/5612e5f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/5612e5f7
Branch: refs/heads/master
Commit: 5612e5f74670bb4a6c07f7844e863f11bd6407d9
Parents: 89832ae
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Apr 24 13:29:10 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Apr 24 17:42:23 2014 +0530
----------------------------------------------------------------------
.../messaging/broker/heartbeat/TopicHealthChecker.java | 2 +-
.../messaging/broker/publish/EventPublisher.java | 6 +++++-
.../messaging/broker/publish/TopicPublisher.java | 13 ++++++++++---
.../stratos/messaging/publish/MessagePublisher.java | 4 ++--
4 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5612e5f7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
index d36b9cb..72466aa 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
@@ -57,7 +57,7 @@ public class TopicHealthChecker implements Runnable {
Thread.sleep(1000);
testConnector.init(topicName);
// A ping event is published to detect a session timeout
- EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent());
+ EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent(), false);
} catch (Exception e) {
// Implies connection is not established
// sleep for 30 sec and retry
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5612e5f7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 8db203c..2150494 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -43,10 +43,14 @@ public class EventPublisher extends TopicPublisher {
* @param event event to be published
*/
public void publish(Event event) {
+ publish(event, true);
+ }
+
+ public void publish(Event event, boolean retry) {
synchronized (EventPublisher.class) {
Properties headers = new Properties();
headers.put(Constants.EVENT_CLASS_NAME, event.getClass().getName());
- super.publish(event, headers);
+ super.publish(event, headers, retry);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5612e5f7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index 7394d5b..c0074f5 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -67,11 +67,11 @@ public class TopicPublisher extends MessagePublisher {
* lost, this will perform re-subscription periodically, until a connection
* obtained.
*/
- public void publish(Object messageObj) {
- publish(messageObj, null);
+ public void publish(Object messageObj, boolean retry) {
+ publish(messageObj, null, retry);
}
- public void publish(Object messageObj, Properties headers) {
+ public void publish(Object messageObj, Properties headers, boolean retry) {
synchronized (TopicPublisher.class) {
Gson gson = new Gson();
String message = gson.toJson(messageObj);
@@ -86,6 +86,13 @@ public class TopicPublisher extends MessagePublisher {
if(log.isErrorEnabled()) {
log.error("Error while publishing to the topic: " + getName(), e);
}
+ if(!retry) {
+ if(log.isDebugEnabled()) {
+ log.debug("Retry disabled for topic " + getName());
+ }
+ throw new RuntimeException(e);
+ }
+
if(log.isInfoEnabled()) {
log.info("Will try to re-publish in 60 sec");
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/5612e5f7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
index ca3399f..758089f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
@@ -54,7 +54,7 @@ public abstract class MessagePublisher {
* @param messageObj
* POJO to be published.
*/
- public abstract void publish(Object messageObj);
+ public abstract void publish(Object messageObj, boolean retry);
/**
* This operation get triggered when a message is ready to be published.
@@ -67,5 +67,5 @@ public abstract class MessagePublisher {
* @param headers
* properties to be set as message headers.
*/
- public abstract void publish(Object messageObj, Properties headers);
+ public abstract void publish(Object messageObj, Properties headers, boolean retry);
}
[2/3] git commit: Fixed messaging model session timeout handling,
subscriber re-connection and publisher retry logic
Posted by im...@apache.org.
Fixed messaging model session timeout handling, subscriber re-connection and publisher retry logic
Signed-off-by: Imesh Gunaratne <im...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/89832ae0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/89832ae0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/89832ae0
Branch: refs/heads/master
Commit: 89832ae06defd09a98417d953a175f3289cab58e
Parents: 5f808a0
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Apr 24 12:49:35 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Apr 24 17:42:13 2014 +0530
----------------------------------------------------------------------
.../broker/connect/TopicConnector.java | 1 -
.../broker/heartbeat/TopicHealthChecker.java | 23 ++--
.../broker/publish/EventPublisher.java | 8 +-
.../broker/publish/TopicPublisher.java | 135 ++++++++++++++-----
.../broker/subscribe/TopicSubscriber.java | 119 ++++++++++------
.../stratos/messaging/event/ping/PingEvent.java | 30 +++++
.../stratos/messaging/util/Constants.java | 1 +
7 files changed, 226 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
index a4fc5c4..ebb339d 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
@@ -67,7 +67,6 @@ public class TopicConnector {
}
topicConnection = connFactory.createTopicConnection();
topicConnection.start();
-
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
index 1fbd25e..d36b9cb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
@@ -21,6 +21,9 @@ package org.apache.stratos.messaging.broker.heartbeat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.broker.connect.TopicConnector;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.event.ping.PingEvent;
+import org.apache.stratos.messaging.util.Constants;
import javax.jms.JMSException;
@@ -43,22 +46,24 @@ public class TopicHealthChecker implements Runnable {
@Override
public void run() {
if(log.isDebugEnabled()){
- log.debug(topicName + " topic Health Checker is running... " );
+ log.debug(topicName + " topic health checker is running... " );
}
TopicConnector testConnector = new TopicConnector();
while (!terminated) {
try {
- // health checker runs in every 30s
- Thread.sleep(30000);
-
+ // Health checker needs to run with the smallest possible time interval
+ // to detect a connection drop. Otherwise the subscriber will not
+ // get reconnected after a connection drop.
+ Thread.sleep(1000);
testConnector.init(topicName);
-
+ // A ping event is published to detect a session timeout
+ EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent());
} catch (Exception e) {
- // implies connection is not established
- // sleep for 5s and retry
+ // Implies connection is not established
+ // sleep for 30 sec and retry
try {
- log.error(topicName + " topic health checker is failed and will retry to establish a connection after 5s.");
- Thread.sleep(5000);
+ log.error(topicName + " topic health checker is failed and will try to subscribe again in 30 sec");
+ Thread.sleep(30000);
break;
} catch (InterruptedException ignore) {
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 5d39956..8db203c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -43,8 +43,10 @@ public class EventPublisher extends TopicPublisher {
* @param event event to be published
*/
public void publish(Event event) {
- Properties headers = new Properties();
- headers.put(Constants.EVENT_CLASS_NAME, event.getClass().getName());
- super.publish(event, headers);
+ synchronized (EventPublisher.class) {
+ Properties headers = new Properties();
+ headers.put(Constants.EVENT_CLASS_NAME, event.getClass().getName());
+ super.publish(event, headers);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index 004be13..7394d5b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -48,6 +48,7 @@ public class TopicPublisher extends MessagePublisher {
private TopicSession topicSession;
private TopicConnector connector;
private javax.jms.TopicPublisher topicPublisher = null;
+ private boolean initialized;
/**
* @param aTopicName
@@ -56,6 +57,9 @@ public class TopicPublisher extends MessagePublisher {
TopicPublisher(String aTopicName) {
super(aTopicName);
connector = new TopicConnector();
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic publisher connector created: [topic] %s", getName()));
+ }
}
/**
@@ -68,35 +72,86 @@ public class TopicPublisher extends MessagePublisher {
}
public void publish(Object messageObj, Properties headers) {
-
- Gson gson = new Gson();
- String message = gson.toJson(messageObj);
- try {
- doPublish(message, headers);
-
- } catch (Exception e) {
- log.error("Error while publishing to the topic: " + getName(), e);
- // TODO would it be worth to throw this exception?
- }
+ synchronized (TopicPublisher.class) {
+ Gson gson = new Gson();
+ String message = gson.toJson(messageObj);
+ boolean published = false;
+ while(!published) {
+
+ try {
+ doPublish(message, headers);
+ published = true;
+ } catch (Exception e) {
+ initialized = false;
+ if(log.isErrorEnabled()) {
+ log.error("Error while publishing to the topic: " + getName(), e);
+ }
+ if(log.isInfoEnabled()) {
+ log.info("Will try to re-publish in 60 sec");
+ }
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+ }
}
public void close() {
-
- // closes all sessions/connections
- try {
- topicPublisher.close();
- topicSession.close();
- connector.close();
- } catch (JMSException ignore) {
- }
+ synchronized (TopicPublisher.class) {
+ // closes all sessions/connections
+ try {
+ if(topicPublisher != null) {
+ topicPublisher.close();
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic publisher closed: [topic] %s", getName()));
+ }
+ }
+ if(topicSession != null) {
+ topicSession.close();
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic publisher session closed: [topic] %s", getName()));
+ }
+ }
+ if(connector != null) {
+ connector.close();
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic publisher connector closed: [topic] %s", getName()));
+ }
+ }
+ } catch (JMSException ignore) {
+ }
+ }
}
private void doPublish(String message, Properties headers) throws Exception, JMSException {
- setPublisher();
+ if(!initialized) {
+ // Initialize a topic connection to the message broker
+ connector.init(getName());
+ initialized = true;
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic publisher connector initialized: [topic] %s", getName()));
+ }
+ }
- TextMessage textMessage = topicSession.createTextMessage(message);
+ try {
+ // Create a new session
+ topicSession = createSession(connector);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic publisher session created: [topic] %s", getName()));
+ }
+ // Create a publisher from session
+ topicPublisher = createPublisher(topicSession);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic publisher created: [topic] %s", getName()));
+ }
+
+ // Create text message
+ TextMessage textMessage = topicSession.createTextMessage(message);
if (headers != null) {
+ // Add header properties
@SuppressWarnings("rawtypes")
Enumeration e = headers.propertyNames();
@@ -110,26 +165,34 @@ public class TopicPublisher extends MessagePublisher {
if (log.isDebugEnabled()) {
log.debug(String.format("Message published: [topic] %s [header] %s [body] %s", getName(), (headers != null) ? headers.toString() : "null", message));
}
- }
+ }
+ finally {
+ if(topicPublisher != null) {
+ topicPublisher.close();
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic publisher closed: [topic] %s", getName()));
+ }
+ }
+ if(topicSession != null) {
+ topicSession.close();
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic publisher session closed: [topic] %s", getName()));
+ }
+ }
+ }
+ }
- private void setPublisher() throws Exception, JMSException {
- if (topicSession != null && topicPublisher != null) {
- return;
- }
-
- if (topicSession == null) {
- // initialize a TopicConnector
- connector.init(getName());
- // get a session
- topicSession = connector.newSession();
- }
-
- Topic topic = connector.getTopic();
+ private TopicSession createSession(TopicConnector topicConnector) throws Exception {
+ // Create a new session
+ return topicConnector.newSession();
+ }
+
+ private javax.jms.TopicPublisher createPublisher(TopicSession topicSession) throws Exception, JMSException {
+ Topic topic = connector.getTopic();
if (topic == null) {
// if the topic doesn't exist, create it.
topic = topicSession.createTopic(getName());
}
- topicPublisher = topicSession.createPublisher(topic);
+ return topicSession.createPublisher(topic);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index f6fa587..64ce136 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -53,30 +53,45 @@ public class TopicSubscriber implements Runnable {
public TopicSubscriber(String aTopicName) {
topicName = aTopicName;
connector = new TopicConnector();
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic subscriber connector created: [topic] %s", topicName));
+ }
}
private void doSubscribe() throws Exception, JMSException {
- if (topicSession != null && topicSubscriber != null) {
- return;
- }
-
- if (topicSession == null) {
- // initialize a TopicConnector
- connector.init(topicName);
- // get a session
- topicSession = connector.newSession();
- }
-
- Topic topic = connector.getTopic();
- if (topic == null) {
- // if topic doesn't exist, create it.
- topic = topicSession.createTopic(topicName);
- }
- topicSubscriber = topicSession.createSubscriber(topic);
- topicSubscriber.setMessageListener(messageListener);
+ // Initialize a topic connection
+ connector.init(topicName);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic subscriber connector initialized: [topic] %s", topicName));
+ }
+ // Create new session
+ topicSession = createSession(connector);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic subscriber session created: [topic] %s", topicName));
+ }
+ // Create a new subscriber
+ createSubscriber(topicSession);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic subscriber created: [topic] %s", topicName));
+ }
subscribed = true;
}
+ private void createSubscriber(TopicSession topicSession) throws JMSException {
+ Topic topic = connector.getTopic();
+ if (topic == null) {
+ // if topic doesn't exist, create it.
+ topic = topicSession.createTopic(topicName);
+ }
+ topicSubscriber = topicSession.createSubscriber(topic);
+ topicSubscriber.setMessageListener(messageListener);
+ }
+
+ private TopicSession createSession(TopicConnector topicConnector) throws Exception {
+ // Create a new session
+ return topicConnector.newSession();
+ }
+
/**
* @param messageListener
* this MessageListener will get triggered each time this
@@ -100,32 +115,52 @@ public class TopicSubscriber implements Runnable {
try {
doSubscribe();
} catch (Exception e) {
+ subscribed = false;
log.error("Error while subscribing to the topic: " + topicName, e);
} finally {
- // start the health checker
- healthChecker = new TopicHealthChecker(topicName);
- Thread healthCheckerThread = new Thread(healthChecker);
- healthCheckerThread.start();
- try {
- // waits till the thread finishes.
- healthCheckerThread.join();
- } catch (InterruptedException ignore) {
- }
- // health checker failed
- // closes all sessions/connections
- try {
- subscribed = false;
- if (topicSubscriber != null) {
- topicSubscriber.close();
- }
- if (topicSession != null) {
- topicSession.close();
- }
- if (connector != null) {
- connector.close();
- }
- } catch (JMSException ignore) {
- }
+ if(subscribed) {
+ // start the health checker if subscribed
+ healthChecker = new TopicHealthChecker(topicName);
+ Thread healthCheckerThread = new Thread(healthChecker);
+ healthCheckerThread.start();
+ try {
+ // waits till the thread finishes.
+ healthCheckerThread.join();
+ } catch (InterruptedException ignore) {
+ }
+ }
+ else {
+ // subscription failed
+ if(log.isInfoEnabled()) {
+ log.info("Will try to subscribe again in 30 sec");
+ }
+ try {
+ Thread.sleep(30000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ // closes all sessions/connections
+ try {
+ if (topicSubscriber != null) {
+ topicSubscriber.close();
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic subscriber closed: [topic] %s", topicName));
+ }
+ }
+ if (topicSession != null) {
+ topicSession.close();
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic subscriber session closed: [topic] %s", topicName));
+ }
+ }
+ if (connector != null) {
+ connector.close();
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Topic subscriber connector closed: [topic] %s", topicName));
+ }
+ }
+ } catch (JMSException ignore) {
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java
new file mode 100644
index 0000000..0fcb6a5
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.ping;
+
+import org.apache.stratos.messaging.event.instance.notifier.InstanceNotifierEvent;
+
+import java.io.Serializable;
+
+/**
+ * Ping event.
+ */
+public class PingEvent extends InstanceNotifierEvent implements Serializable {
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/89832ae0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
index 397a468..8b3c814 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
@@ -24,6 +24,7 @@ public class Constants {
public static final String HEALTH_STAT_TOPIC = "summarized-health-stats";
public static final String INSTANCE_STATUS_TOPIC = "instance-status";
public static final String INSTANCE_NOTIFIER_TOPIC = "instance-notifier";
+ public static final String PING_TOPIC = "ping";
public static final String TENANT_TOPIC = "tenant";
public static final String TENANT_RANGE_ALL = "*";