You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ra...@apache.org on 2014/10/12 14:40:35 UTC
git commit: applying changes from PR#87
Repository: stratos
Updated Branches:
refs/heads/master 50f890f65 -> ec1da2c38
applying changes from PR#87
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ec1da2c3
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ec1da2c3
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ec1da2c3
Branch: refs/heads/master
Commit: ec1da2c38b6061b452e1f4bcf75d299f30e79632
Parents: 50f890f
Author: R-Rajkumar <rr...@gmail.com>
Authored: Sun Oct 12 18:10:10 2014 +0530
Committer: R-Rajkumar <rr...@gmail.com>
Committed: Sun Oct 12 18:10:10 2014 +0530
----------------------------------------------------------------------
.../broker/publish/EventPublisherPool.java | 1 -
.../broker/publish/TopicPublisher.java | 92 +++++++++-----------
.../broker/subscribe/TopicSubscriber.java | 54 +++++-------
.../stat/HealthStatEventMessageListener.java | 45 +++++-----
.../InstanceNotifierEventMessageListener.java | 46 +++++-----
5 files changed, 108 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
index 175d09b..257c56c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
@@ -52,7 +52,6 @@ public class EventPublisherPool {
public static void close(String topicName) {
synchronized (EventPublisherPool.class) {
if(topicNameEventPublisherMap.containsKey(topicName)) {
- topicNameEventPublisherMap.get(topicName).close();
topicNameEventPublisherMap.remove(topicName);
if(log.isDebugEnabled()) {
log.debug(String.format("Event publisher closed and removed from pool: [topic] %s", topicName));
http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index b3e6843..15fc98e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.broker.connect.MQTTConnector;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.google.gson.Gson;
@@ -69,56 +70,45 @@ public class TopicPublisher {
public void publish(Object messageObj, boolean retry) {
synchronized (TopicPublisher.class) {
- Gson gson = new Gson();
- String message = gson.toJson(messageObj);
- boolean published = false;
- while (!published)
- try {
- mqttClient = MQTTConnector.getMQTTConClient();
-
- MqttMessage mqttMSG = new MqttMessage(message.getBytes());
-
- mqttMSG.setQos(QOS);
- MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setCleanSession(true);
- mqttClient.connect(connOpts);
- mqttClient.publish(topic, mqttMSG);
- mqttClient.disconnect();
- published = true;
- } catch (Exception e) {
- initialized = false;
- if (log.isErrorEnabled()) {
- log.error("Error while publishing to the topic: " + topic, e);
- }
- if (!retry) {
- if (log.isDebugEnabled()) {
- log.debug("Retry disabled for topic " + topic);
- }
- throw new RuntimeException(e);
- }
-
- if (log.isInfoEnabled()) {
- log.info("Will try to re-publish in 60 sec");
- }
- try {
- Thread.sleep(60000);
- } catch (InterruptedException ignore) {
- }
- }
- finally {
-
- }
- }
+ Gson gson = new Gson();
+ String message = gson.toJson(messageObj);
+ boolean published = false;
+ while (!published) {
+ mqttClient = MQTTConnector.getMQTTConClient();
+ MqttMessage mqttMSG = new MqttMessage(message.getBytes());
+
+ mqttMSG.setQos(QOS);
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ try {
+ mqttClient.connect(connOpts);
+ mqttClient.publish(topic, mqttMSG);
+
+ published = true;
+ } catch (MqttException e) {
+ initialized = false;
+ if (!retry) {
+ if (log.isDebugEnabled()) {
+ log.debug("Retry disabled for topic " + topic);
+ }
+ throw new RuntimeException(e);
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Will try to re-publish in 60 sec");
+ }
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException ignore) {
+ }
+ } finally {
+ try {
+ mqttClient.disconnect();
+ } catch (MqttException ignore) {
+
+ }
+ }
+ }
+ }
}
-
- public void close() {
- synchronized (TopicPublisher.class) {
- // closes all sessions/connections
- try {
-
- } catch (Exception ignore) {
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index b9d0905..a76cc9e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -46,13 +46,11 @@ public class TopicSubscriber implements Runnable {
private boolean terminated = false;
private MqttCallback messageListener;
- private TopicSession topicSession;
private final String topicName;
private TopicHealthChecker healthChecker;
private final javax.jms.TopicSubscriber topicSubscriber = null;
private boolean subscribed;
- private final MessageProcessorChain processorChain;
/**
* @param aTopicName topic name of this subscriber instance.
@@ -63,7 +61,6 @@ public class TopicSubscriber implements Runnable {
if (log.isDebugEnabled()) {
log.debug(String.format("Topic subscriber connector created: [topic] %s", topicName));
}
- this.processorChain = new InstanceNotifierMessageProcessorChain();
}
private void doSubscribe() throws MqttException {
@@ -74,26 +71,31 @@ public class TopicSubscriber implements Runnable {
log.debug("Subscribing to topic '" + topicName + "' from " +
mqttClient.getServerURI());
}
- // Subscribing to specific topic
- try {
-
- MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setCleanSession(true);
- mqttClient.connect(connOpts);
- // Continue waiting for messages
- mqttClient.subscribe(topicName);
- mqttClient.setCallback(messageListener);
- subscribed = true;
- while (true) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- }
- } finally {
- mqttClient.disconnect();
- }
+ /* Subscribing to specific topic */
+ while(true) {
+ try {
+
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ mqttClient.connect(connOpts);
+
+ mqttClient.subscribe(topicName);
+ mqttClient.setCallback(messageListener);
+ subscribed = true;
+ // Continue waiting for messages
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ } finally {
+ mqttClient.disconnect();
+ }
+ }
+
}
/**
@@ -151,14 +153,6 @@ public class TopicSubscriber implements Runnable {
topicName));
}
}
- if (topicSession != null) {
- topicSession.close();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Topic subscriber session closed: [topic] %s",
- topicName));
- }
- }
-
} catch (JMSException ignore) {
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
index bfaf622..aabc432 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
@@ -47,42 +47,37 @@ public class HealthStatEventMessageListener implements MqttCallback {
}
@Override
- public void connectionLost(Throwable arg0) {
- // TODO Auto-generated method stub
-
+ public void connectionLost(Throwable err) {
+ log.debug("MQTT connection lost", err);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
- // TODO Auto-generated method stub
-
+ log.debug("Message delivery completed");
}
@Override
- public void messageArrived(String topicName, MqttMessage message) throws Exception {
- if (message instanceof MqttMessage) {
+ public void messageArrived(String topicName, MqttMessage message)
+ throws Exception {
+ TextMessage receivedMessage = new ActiveMQTextMessage();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Health stat event messege received...."));
+ }
+ receivedMessage.setText(new String(message.getPayload()));
+ receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+ Util.getEventNameForTopic(topicName));
- TextMessage receivedMessage = new ActiveMQTextMessage();
+ try {
if (log.isDebugEnabled()) {
- log.debug(String.format("Health stat event messege received...."));
-
+ log.debug(String.format(
+ "Health stat event message received: %s",
+ ((TextMessage) message).getText()));
}
- receivedMessage.setText(new String(message.getPayload()));
- receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
- Util.getEventNameForTopic(topicName));
+ // Add received message to the queue
+ messageQueue.add(receivedMessage);
- try {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Health stat event message received: %s",
- ((TextMessage) message).getText()));
- }
- // Add received message to the queue
- messageQueue.add(receivedMessage);
-
- } catch (JMSException e) {
- log.error(e.getMessage(), e);
- }
+ } catch (JMSException e) {
+ log.error(e.getMessage(), e);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
index c7d1e98..5203ce4 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
@@ -47,43 +47,43 @@ class InstanceNotifierEventMessageListener implements MqttCallback {
}
@Override
- public void connectionLost(Throwable arg0) {
+ public void connectionLost(Throwable err) {
if (log.isDebugEnabled()) {
- log.debug("MQTT connection lost");
+ log.debug("MQTT connection lost" , err);
}
}
@Override
- public void deliveryComplete(IMqttDeliveryToken arg0) {
-
+ public void deliveryComplete(IMqttDeliveryToken err) {
+ log.debug("Message delivery completed");
}
@Override
- public void messageArrived(String topicName, MqttMessage message) throws Exception {
- if (message instanceof MqttMessage) {
-
- TextMessage receivedMessage = new ActiveMQTextMessage();
- if (log.isDebugEnabled()) {
- log.debug(String.format("instance notifier messege received...."));
+ public void messageArrived(String topicName, MqttMessage message)
+ throws Exception {
- }
+ TextMessage receivedMessage = new ActiveMQTextMessage();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("instance notifier messege received...."));
- receivedMessage.setText(new String(message.getPayload()));
- receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
- Util.getEventNameForTopic(topicName));
+ }
- try {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Instance notifier message received: %s",
- ((TextMessage) message).getText()));
- }
- // Add received message to the queue
- messageQueue.add(receivedMessage);
+ receivedMessage.setText(new String(message.getPayload()));
+ receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+ Util.getEventNameForTopic(topicName));
- } catch (JMSException e) {
- log.error(e.getMessage(), e);
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Instance notifier message received: %s",
+ ((TextMessage) message).getText()));
}
+ // Add received message to the queue
+ messageQueue.add(receivedMessage);
+
+ } catch (JMSException e) {
+ log.error(e.getMessage(), e);
}
}