You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by bi...@apache.org on 2010/09/08 06:41:56 UTC
svn commit: r993600 -
/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
Author: billgraham
Date: Wed Sep 8 04:41:55 2010
New Revision: 993600
URL: http://svn.apache.org/viewvc?rev=993600&view=rev
Log:
CHUKWA-517. Adding support for queues to JMSAdaptor
Modified:
incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java?rev=993600&r1=993599&r2=993600&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java Wed Sep 8 04:41:55 2010
@@ -29,28 +29,32 @@ import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.Session;
import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
import javax.jms.Connection;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.JMSException;
import javax.jms.ConnectionFactory;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
/**
- * Adaptor that is able to listen to a JMS topic for messages, receive the
- * message, and transform it to a Chukwa chunk. Transformation is handled by a
- * JMSMessageTransformer. The default JMSMessageTransformer used is the
+ * Adaptor that is able to listen to a JMS topic or queue for messages, receive
+ * the message, and transform it to a Chukwa chunk. Transformation is handled by
+ * a JMSMessageTransformer. The default JMSMessageTransformer used is the
* JMSTextMessageTransformer.
* <P>
* This adaptor is added to an Agent like so:
* <code>
- * add JMSAdaptor <dataType> <brokerURL> -t <topicName> [-s <JMSSelector>]
+ * add JMSAdaptor <dataType> <brokerURL> <-t <topicName>|-q <queueName>> [-s <JMSSelector>]
* [-x <transformerName>] [-p <transformerConfigs>] <offset>
* </code>
* <ul>
* <li><code>dataType</code> - The chukwa data type.</li>
* <li><code>brokerURL</code> - The JMS broker URL to bind to.</li>
* <li><code>topicName</code> - The JMS topic to listen on.</li>
+ * <li><code>queueName</code> - The JMS queue to listen on.</li>
* <li><code>JMSSelector</code> - The JMS selector to filter with. Surround
* with quotes if selector contains multiple words.</li>
* <li><code>transformerName</code> - Class name of the JMSMessageTransformer to
@@ -71,6 +75,7 @@ public class JMSAdaptor extends Abstract
Connection connection;
String brokerURL;
String topic;
+ String queue;
String selector = null;
JMSMessageTransformer transformer;
@@ -112,7 +117,7 @@ public class JMSAdaptor extends Abstract
/**
* This adaptor received configuration like this:
- * <brokerURL> -t <topicName> [-s <JMSSelector>] [-x <transformerName>]
+ * <brokerURL> <-t <topicName>|-q <queueName>> [-s <JMSSelector>] [-x <transformerName>]
* [-p <transformerProperties>]
*
* @param s
@@ -142,6 +147,9 @@ public class JMSAdaptor extends Abstract
if ("-t".equals(value)) {
topic = tokens[++i];
}
+ else if ("-q".equals(value)) {
+ queue = tokens[++i];
+ }
else if ("-s".equals(value) && i <= tokens.length - 1) {
selector = tokens[++i];
@@ -175,8 +183,13 @@ public class JMSAdaptor extends Abstract
}
}
- if (topic == null) {
- log.error("topicName must be set");
+ if (topic == null && queue == null) {
+ log.error("topicName or queueName must be set");
+ return null;
+ }
+
+ if (topic != null && queue != null) {
+ log.error("Either topicName or queueName must be set, but not both");
return null;
}
@@ -206,7 +219,14 @@ public class JMSAdaptor extends Abstract
}
status = s;
- source = "jms:"+brokerURL + ",topic:" + topic;
+
+ if(topic != null) {
+ source = "jms:"+brokerURL + ",topic:" + topic;
+ }
+ else if(queue != null) {
+ source = "jms:"+brokerURL + ",queue:" + queue;
+ }
+
return s;
}
@@ -224,7 +244,12 @@ public class JMSAdaptor extends Abstract
", offset =" + bytesReceived);
// this is where different initialization could be used for a queue
- initializeTopic(connection, topic, selector, new JMSListener());
+ if(topic != null) {
+ initializeTopic(connection, topic, selector, new JMSListener());
+ }
+ else if(queue != null) {
+ initializeQueue(connection, queue, selector, new JMSListener());
+ }
connection.start();
} catch(Exception e) {
@@ -278,9 +303,19 @@ public class JMSAdaptor extends Abstract
TopicSession session = ((TopicConnection)connection).
createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic jmsTopic = session.createTopic(topic);
- TopicSubscriber subscriber = session.
- createSubscriber(jmsTopic, selector, true);
- subscriber.setMessageListener(listener);
+ MessageConsumer consumer = session.createConsumer(jmsTopic, selector, true);
+ consumer.setMessageListener(listener);
+ }
+
+ private void initializeQueue(Connection connection,
+ String topic,
+ String selector,
+ JMSListener listener) throws JMSException {
+ QueueSession session = ((QueueConnection)connection).
+ createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(topic);
+ MessageConsumer consumer = session.createConsumer(queue, selector, true);
+ consumer.setMessageListener(listener);
}
private static String trimQuotes(String value) {