You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ra...@apache.org on 2014/10/12 14:40:35 UTC

git commit: applying changes from PR#87

Repository: stratos
Updated Branches:
  refs/heads/master 50f890f65 -> ec1da2c38


applying changes from PR#87


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ec1da2c3
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ec1da2c3
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ec1da2c3

Branch: refs/heads/master
Commit: ec1da2c38b6061b452e1f4bcf75d299f30e79632
Parents: 50f890f
Author: R-Rajkumar <rr...@gmail.com>
Authored: Sun Oct 12 18:10:10 2014 +0530
Committer: R-Rajkumar <rr...@gmail.com>
Committed: Sun Oct 12 18:10:10 2014 +0530

----------------------------------------------------------------------
 .../broker/publish/EventPublisherPool.java      |  1 -
 .../broker/publish/TopicPublisher.java          | 92 +++++++++-----------
 .../broker/subscribe/TopicSubscriber.java       | 54 +++++-------
 .../stat/HealthStatEventMessageListener.java    | 45 +++++-----
 .../InstanceNotifierEventMessageListener.java   | 46 +++++-----
 5 files changed, 108 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
index 175d09b..257c56c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
@@ -52,7 +52,6 @@ public class EventPublisherPool {
     public static void close(String topicName) {
         synchronized (EventPublisherPool.class) {
             if(topicNameEventPublisherMap.containsKey(topicName)) {
-                topicNameEventPublisherMap.get(topicName).close();
                 topicNameEventPublisherMap.remove(topicName);
                 if(log.isDebugEnabled()) {
                     log.debug(String.format("Event publisher closed and removed from pool: [topic] %s", topicName));

http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index b3e6843..15fc98e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.connect.MQTTConnector;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 
 import com.google.gson.Gson;
@@ -69,56 +70,45 @@ public class TopicPublisher {
 	public void publish(Object messageObj, boolean retry) {
 
 		synchronized (TopicPublisher.class) {
-			Gson gson = new Gson();
-			String message = gson.toJson(messageObj);
-			boolean published = false;
-			while (!published)
-				try {
-					mqttClient = MQTTConnector.getMQTTConClient();
-
-					MqttMessage mqttMSG = new MqttMessage(message.getBytes());
-
-					mqttMSG.setQos(QOS);
-					MqttConnectOptions connOpts = new MqttConnectOptions();
-					connOpts.setCleanSession(true);
-					mqttClient.connect(connOpts);
-					mqttClient.publish(topic, mqttMSG);
-					mqttClient.disconnect();
-					published = true;
-				} catch (Exception e) {
-					initialized = false;
-					if (log.isErrorEnabled()) {
-						log.error("Error while publishing to the topic: " + topic, e);
-					}
-					if (!retry) {
-						if (log.isDebugEnabled()) {
-							log.debug("Retry disabled for topic " + topic);
-						}
-						throw new RuntimeException(e);
-					}
-
-					if (log.isInfoEnabled()) {
-						log.info("Will try to re-publish in 60 sec");
-					}
-					try {
-						Thread.sleep(60000);
-					} catch (InterruptedException ignore) {
-					}
-				}
-				finally {
-
-				}
-		}
+            Gson gson = new Gson();
+            String message = gson.toJson(messageObj);
+            boolean published = false;
+            while (!published) {
+                mqttClient = MQTTConnector.getMQTTConClient();
+                MqttMessage mqttMSG = new MqttMessage(message.getBytes());
+
+                mqttMSG.setQos(QOS);
+                MqttConnectOptions connOpts = new MqttConnectOptions();
+                connOpts.setCleanSession(true);
+                try {
+                    mqttClient.connect(connOpts);
+                    mqttClient.publish(topic, mqttMSG);
+
+                    published = true;
+                } catch (MqttException e) {
+                    initialized = false;
+                    if (!retry) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Retry disabled for topic " + topic);
+                        }
+                        throw new RuntimeException(e);
+                    }
+
+                    if (log.isInfoEnabled()) {
+                        log.info("Will try to re-publish in 60 sec");
+                    }
+                    try {
+                        Thread.sleep(60000);
+                    } catch (InterruptedException ignore) {
+                    }
+                } finally {
+                    try {
+                        mqttClient.disconnect();
+                    } catch (MqttException ignore) {
+
+                    }
+                }
+            }
+        }
 	}
-
-	public void close() {
-		synchronized (TopicPublisher.class) {
-			// closes all sessions/connections
-			try {
-
-			} catch (Exception ignore) {
-			}
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index b9d0905..a76cc9e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -46,13 +46,11 @@ public class TopicSubscriber implements Runnable {
 
 	private boolean terminated = false;
 	private MqttCallback messageListener;
-	private TopicSession topicSession;
 	private final String topicName;
 
 	private TopicHealthChecker healthChecker;
 	private final javax.jms.TopicSubscriber topicSubscriber = null;
 	private boolean subscribed;
-	private final MessageProcessorChain processorChain;
 
 	/**
 	 * @param aTopicName topic name of this subscriber instance.
@@ -63,7 +61,6 @@ public class TopicSubscriber implements Runnable {
 		if (log.isDebugEnabled()) {
 			log.debug(String.format("Topic subscriber connector created: [topic] %s", topicName));
 		}
-		this.processorChain = new InstanceNotifierMessageProcessorChain();
 	}
 
 	private void doSubscribe() throws MqttException {
@@ -74,26 +71,31 @@ public class TopicSubscriber implements Runnable {
 			log.debug("Subscribing to topic '" + topicName + "' from " +
 			          mqttClient.getServerURI());
 		}
-		// Subscribing to specific topic
-		try {
-
-			MqttConnectOptions connOpts = new MqttConnectOptions();
-			connOpts.setCleanSession(true);
-			mqttClient.connect(connOpts);
-			// Continue waiting for messages
-			mqttClient.subscribe(topicName);
-			mqttClient.setCallback(messageListener);
-			subscribed = true;
-			while (true) {
-				try {
-					Thread.sleep(1000);
-				} catch (InterruptedException e) {
-				}
-			}
 
-		} finally {
-			mqttClient.disconnect();
-		}
+		/* Subscribing to specific topic */
+        while(true) {
+            try {
+
+                MqttConnectOptions connOpts = new MqttConnectOptions();
+                connOpts.setCleanSession(true);
+                mqttClient.connect(connOpts);
+
+                mqttClient.subscribe(topicName);
+                mqttClient.setCallback(messageListener);
+                subscribed = true;
+                // Continue waiting for messages
+                while (true) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+
+            } finally {
+                mqttClient.disconnect();
+            }
+        }
+		
 	}
 
 	/**
@@ -151,14 +153,6 @@ public class TopicSubscriber implements Runnable {
 							                        topicName));
 						}
 					}
-					if (topicSession != null) {
-						topicSession.close();
-						if (log.isDebugEnabled()) {
-							log.debug(String.format("Topic subscriber session closed: [topic] %s",
-							                        topicName));
-						}
-					}
-
 				} catch (JMSException ignore) {
 				}
 			}

http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
index bfaf622..aabc432 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
@@ -47,42 +47,37 @@ public class HealthStatEventMessageListener implements MqttCallback {
 	}
 
 	@Override
-	public void connectionLost(Throwable arg0) {
-		// TODO Auto-generated method stub
-
+	public void connectionLost(Throwable err) {
+		log.debug("MQTT connection lost", err);
 	}
 
 	@Override
 	public void deliveryComplete(IMqttDeliveryToken arg0) {
-		// TODO Auto-generated method stub
-
+		log.debug("Message delivery completed");
 	}
 
 	@Override
-	public void messageArrived(String topicName, MqttMessage message) throws Exception {
-		if (message instanceof MqttMessage) {
+	public void messageArrived(String topicName, MqttMessage message)
+			throws Exception {
+		TextMessage receivedMessage = new ActiveMQTextMessage();
+		if (log.isDebugEnabled()) {
+			log.debug(String.format("Health stat event messege received...."));
+		}
+		receivedMessage.setText(new String(message.getPayload()));
+		receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+				Util.getEventNameForTopic(topicName));
 
-			TextMessage receivedMessage = new ActiveMQTextMessage();
+		try {
 			if (log.isDebugEnabled()) {
-				log.debug(String.format("Health stat event messege received...."));
-
+				log.debug(String.format(
+						"Health stat event message received: %s",
+						((TextMessage) message).getText()));
 			}
-			receivedMessage.setText(new String(message.getPayload()));
-			receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
-			                                  Util.getEventNameForTopic(topicName));
+			// Add received message to the queue
+			messageQueue.add(receivedMessage);
 
-			try {
-				if (log.isDebugEnabled()) {
-					log.debug(String.format("Health stat event message received: %s",
-					                        ((TextMessage) message).getText()));
-				}
-				// Add received message to the queue
-				messageQueue.add(receivedMessage);
-
-			} catch (JMSException e) {
-				log.error(e.getMessage(), e);
-			}
+		} catch (JMSException e) {
+			log.error(e.getMessage(), e);
 		}
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
index c7d1e98..5203ce4 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
@@ -47,43 +47,43 @@ class InstanceNotifierEventMessageListener implements MqttCallback {
 	}
 
 	@Override
-	public void connectionLost(Throwable arg0) {
+	public void connectionLost(Throwable err) {
 		if (log.isDebugEnabled()) {
-			log.debug("MQTT connection lost");
+			log.debug("MQTT connection lost" , err);
 		}
 
 	}
 
 	@Override
-	public void deliveryComplete(IMqttDeliveryToken arg0) {
-
+	public void deliveryComplete(IMqttDeliveryToken err) {
+		log.debug("Message delivery completed");
 	}
 
 	@Override
-	public void messageArrived(String topicName, MqttMessage message) throws Exception {
-		if (message instanceof MqttMessage) {
-
-			TextMessage receivedMessage = new ActiveMQTextMessage();
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("instance notifier messege received...."));
+	public void messageArrived(String topicName, MqttMessage message)
+			throws Exception {
 
-			}
+		TextMessage receivedMessage = new ActiveMQTextMessage();
+		if (log.isDebugEnabled()) {
+			log.debug(String.format("instance notifier messege received...."));
 
-			receivedMessage.setText(new String(message.getPayload()));
-			receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
-			                                  Util.getEventNameForTopic(topicName));
+		}
 
-			try {
-				if (log.isDebugEnabled()) {
-					log.debug(String.format("Instance notifier message received: %s",
-					                        ((TextMessage) message).getText()));
-				}
-				// Add received message to the queue
-				messageQueue.add(receivedMessage);
+		receivedMessage.setText(new String(message.getPayload()));
+		receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+				Util.getEventNameForTopic(topicName));
 
-			} catch (JMSException e) {
-				log.error(e.getMessage(), e);
+		try {
+			if (log.isDebugEnabled()) {
+				log.debug(String.format(
+						"Instance notifier message received: %s",
+						((TextMessage) message).getText()));
 			}
+			// Add received message to the queue
+			messageQueue.add(receivedMessage);
+
+		} catch (JMSException e) {
+			log.error(e.getMessage(), e);
 		}
 
 	}