You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by bi...@apache.org on 2010/09/08 06:41:56 UTC

svn commit: r993600 - /incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java

Author: billgraham
Date: Wed Sep  8 04:41:55 2010
New Revision: 993600

URL: http://svn.apache.org/viewvc?rev=993600&view=rev
Log:
CHUKWA-517. Adding support for queues to JMSAdaptor

Modified:
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java?rev=993600&r1=993599&r2=993600&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java Wed Sep  8 04:41:55 2010
@@ -29,28 +29,32 @@ import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
 import javax.jms.Session;
 import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
 import javax.jms.Connection;
 import javax.jms.MessageListener;
 import javax.jms.Message;
 import javax.jms.JMSException;
 import javax.jms.ConnectionFactory;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
 
 /**
- * Adaptor that is able to listen to a JMS topic for messages, receive the
- * message, and transform it to a Chukwa chunk. Transformation is handled by a
- * JMSMessageTransformer. The default JMSMessageTransformer used is the
+ * Adaptor that is able to listen to a JMS topic or queue for messages, receive
+ * the message, and transform it to a Chukwa chunk. Transformation is handled by
+ * a JMSMessageTransformer. The default JMSMessageTransformer used is the
  * JMSTextMessageTransformer.
  * <P>
  * This adaptor is added to an Agent like so:
  * <code>
- * add JMSAdaptor <dataType> <brokerURL> -t <topicName> [-s <JMSSelector>]
+ * add JMSAdaptor <dataType> <brokerURL> <-t <topicName>|-q <queueName>> [-s <JMSSelector>]
  *  [-x <transformerName>] [-p <transformerConfigs>] <offset>
  * </code>
  * <ul>
  * <li><code>dataType</code> - The chukwa data type.</li>
  * <li><code>brokerURL</code> - The JMS broker URL to bind to.</li>
  * <li><code>topicName</code> - The JMS topic to listen on.</li>
+ * <li><code>queueName</code> - The JMS queue to listen on.</li>
  * <li><code>JMSSelector</code> - The JMS selector to filter with. Surround
  * with quotes if selector contains multiple words.</li>
  * <li><code>transformerName</code> - Class name of the JMSMessageTransformer to
@@ -71,6 +75,7 @@ public class JMSAdaptor extends Abstract
   Connection connection;
   String brokerURL;
   String topic;
+  String queue;
   String selector = null;
   JMSMessageTransformer transformer;
 
@@ -112,7 +117,7 @@ public class JMSAdaptor extends Abstract
 
   /**
    * This adaptor received configuration like this:
-   * <brokerURL> -t <topicName> [-s <JMSSelector>] [-x <transformerName>]
+   * <brokerURL> <-t <topicName>|-q <queueName>> [-s <JMSSelector>] [-x <transformerName>]
    * [-p <transformerProperties>]
    *
    * @param s
@@ -142,6 +147,9 @@ public class JMSAdaptor extends Abstract
       if ("-t".equals(value)) {
         topic = tokens[++i];
       }
+      else if ("-q".equals(value)) {
+        queue = tokens[++i];
+      }
       else if ("-s".equals(value) && i <= tokens.length - 1) {
         selector = tokens[++i];
 
@@ -175,8 +183,13 @@ public class JMSAdaptor extends Abstract
       }
     }
 
-    if (topic == null) {
-      log.error("topicName must be set");
+    if (topic == null && queue == null) {
+      log.error("topicName or queueName must be set");
+      return null;
+    }
+
+    if (topic != null && queue != null) {
+      log.error("Either topicName or queueName must be set, but not both");
       return null;
     }
 
@@ -206,7 +219,14 @@ public class JMSAdaptor extends Abstract
     }
 
     status = s;
-    source = "jms:"+brokerURL + ",topic:" + topic;
+
+    if(topic != null) {
+      source = "jms:"+brokerURL + ",topic:" + topic;
+    }
+    else if(queue != null) {
+      source = "jms:"+brokerURL + ",queue:" + queue;
+    }
+
     return s;
   }
 
@@ -224,7 +244,12 @@ public class JMSAdaptor extends Abstract
               ", offset =" + bytesReceived);
 
       // this is where different initialization could be used for a queue
-      initializeTopic(connection, topic, selector, new JMSListener());
+      if(topic != null) {
+        initializeTopic(connection, topic, selector, new JMSListener());
+      }
+      else if(queue != null) {
+        initializeQueue(connection, queue, selector, new JMSListener());
+      }
       connection.start();
 
     } catch(Exception e) {
@@ -278,9 +303,19 @@ public class JMSAdaptor extends Abstract
     TopicSession session = ((TopicConnection)connection).
             createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
     Topic jmsTopic = session.createTopic(topic);
-    TopicSubscriber subscriber = session.
-            createSubscriber(jmsTopic, selector, true);
-    subscriber.setMessageListener(listener);
+    MessageConsumer consumer = session.createConsumer(jmsTopic, selector, true);
+    consumer.setMessageListener(listener);
+  }
+
+  private void initializeQueue(Connection connection,
+                                      String topic,
+                                      String selector,
+                                      JMSListener listener) throws JMSException {
+    QueueSession session = ((QueueConnection)connection).
+            createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+    Queue queue = session.createQueue(topic);
+    MessageConsumer consumer = session.createConsumer(queue, selector, true);
+    consumer.setMessageListener(listener);
   }
 
   private static String trimQuotes(String value) {