You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2010/04/24 01:56:23 UTC

svn commit: r937557 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/ src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/

Author: asrabkin
Date: Fri Apr 23 23:56:22 2010
New Revision: 937557

URL: http://svn.apache.org/viewvc?rev=937557&view=rev
Log:
CHUKWA-468. JMSAdaptor. Contributed by Bill Graham.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessageTransformer.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSMessagePropertyTransformer.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/ivy.xml
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=937557&r1=937556&r2=937557&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Fri Apr 23 23:56:22 2010
@@ -4,6 +4,13 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+
+  BUGS
+
+Chukwa 0.4 
+
+  NEW FEATURES
+
     CHUKWA-445. Realtime display at collector. (asrabkin)
 
     CHUKWA-454. DirTailingAdaptor can filter files. (Gerrit Jansen van Vuuren via asrabkin)

Modified: hadoop/chukwa/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/ivy.xml?rev=937557&r1=937556&r2=937557&view=diff
==============================================================================
--- hadoop/chukwa/trunk/ivy.xml (original)
+++ hadoop/chukwa/trunk/ivy.xml Fri Apr 23 23:56:22 2010
@@ -61,6 +61,10 @@
       name="xerces"
       rev="${xerces.version}"
       conf="jdiff->default"/>
+    <dependency org="org.apache.activemq"
+       name="activemq-core"
+       rev="${activemq.version}"
+       conf="common->default"/>
    <dependency org="commons-fileupload"
       name="commons-fileupload"
       rev="${commons-fileupload.version}"
@@ -88,7 +92,7 @@
    <dependency org="commons-net"
       name="commons-net"
       rev="${commons-net.version}"
-      conf="common->master"/> 
+      conf="common->master"/>
     <dependency org="org.mortbay.jetty"
       name="jetty"
       rev="${jetty.version}"

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.log4j.Logger;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.jms.Connection;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.JMSException;
+import javax.jms.ConnectionFactory;
+
+/**
+ * Adaptor that is able to listen to a JMS topic for messages, receive the
+ * message, and transform it to a Chukwa chunk. Transformation is handled by a
+ * JMSMessageTransformer. The default JMSMessageTransformer used is the
+ * JMSTextMessageTransformer.
+ * <P>
+ * This adaptor is added to an Agent like so:
+ * <code>
+ * add JMSAdaptor <dataType> <brokerURL> -t <topicName> [-s <JMSSelector>]
+ *  [-x <transformerName>] [-p <transformerConfigs>] <offset>
+ * </code>
+ * <ul>
+ * <li><code>dataType</code> - The chukwa data type.</li>
+ * <li><code>brokerURL</code> - The JMS broker URL to bind to.</li>
+ * <li><code>topicName</code> - The JMS topic to listen on.</li>
+ * <li><code>JMSSelector</code> - The JMS selector to filter with. Surround
+ * with quotes if selector contains multiple words.</li>
+ * <li><code>transformerName</code> - Class name of the JMSMessageTransformer to
+ * use.</li>
+ * <li><code>transformerConfigs</code> - Properties to be passed to the
+ * JMSMessageTransformer to use.  Surround with quotes if configs contain
+ * multiple words.</li>
+ * </ul>
+ *
+ * @see JMSMessageTransformer
+ * @see JMSTextMessageTransformer
+ */
+public class JMSAdaptor extends AbstractAdaptor {
+
+  static Logger log = Logger.getLogger(JMSAdaptor.class);
+
+  ConnectionFactory connectionFactory = null;
+  Connection connection;
+  String brokerURL;
+  String topic;
+  String selector = null;
+  JMSMessageTransformer transformer;
+
+  volatile long bytesReceived = 0;
+  String status; // used to write checkpoint info. See getStatus() below
+  String source; // added to the chunk to identify the stream
+
+  class JMSListener implements MessageListener {
+
+    public void onMessage(Message message) {
+      if (log.isDebugEnabled()) {
+        log.debug("got a JMS message");
+      }
+      
+      try {
+
+        byte[] bytes = transformer.transform(message);
+        if (bytes == null) {
+          return;
+        }
+
+        bytesReceived += bytes.length;
+
+        if (log.isDebugEnabled()) {
+          log.debug("Adding Chunk from JMS message: " + new String(bytes));
+        }
+
+        Chunk c = new ChunkImpl(type, source, bytesReceived, bytes, JMSAdaptor.this);
+        dest.add(c);
+
+      } catch (JMSException e) {
+        log.error("can't read JMS messages in " + adaptorID, e);
+      }
+      catch (InterruptedException e) {
+        log.error("can't add JMS messages in " + adaptorID, e);
+      }
+    }
+  }
+
+  /**
+   * This adaptor received configuration like this:
+   * <brokerURL> -t <topicName> [-s <JMSSelector>] [-x <transformerName>]
+   * [-p <transformerProperties>]
+   *
+   * @param s
+   * @return
+   */
+  @Override
+  public String parseArgs(String s) {
+    if (log.isDebugEnabled()) {
+      log.debug("Parsing args to initialize adaptor: " + s);
+    }
+
+    String[] tokens = s.split(" ");
+    if (tokens.length < 1) {
+      throw new IllegalArgumentException("Configuration must include brokerURL.");
+    }
+
+    brokerURL = tokens[0];
+
+    if (brokerURL.length() < 6 ||
+      (!brokerURL.startsWith("vm://") && !brokerURL.startsWith("tcp://"))) {
+      throw new IllegalArgumentException("Invalid brokerURL: " + brokerURL);
+    }
+
+    String transformerName = null;
+    String transformerConfs = null;
+    for (int i = 1; i < tokens.length; i++) {
+      String value = tokens[i];
+      if ("-t".equals(value)) {
+        topic = tokens[++i];
+      }
+      else if ("-s".equals(value) && i <= tokens.length - 2) {
+        selector = tokens[++i];
+
+        // selector can have multiple words
+        if (selector.startsWith("\"")) {
+          for(int j = i + 1; j < tokens.length - 1; j++) {
+            selector = selector + " " + tokens[++i];
+            if(tokens[j].endsWith("\"")) {
+              break;
+            }
+          }
+          selector = trimQuotes(selector);
+        }
+      }
+      else if ("-x".equals(value)) {
+        transformerName = tokens[++i];
+      }
+      else if ("-p".equals(value)) {
+        transformerConfs = tokens[++i];
+
+        // transformerConfs can have multiple words
+        if (transformerConfs.startsWith("\"")) {
+          for(int j = i + 1; j < tokens.length - 1; j++) {
+            transformerConfs = transformerConfs + " " + tokens[++i];
+            if(tokens[j].endsWith("\"")) {
+              break;
+            }
+          }
+          transformerConfs = trimQuotes(transformerConfs);
+        }
+      }
+    }
+
+    if (topic == null) {
+      log.error("topicName must be set");
+      return null;
+    }
+
+    // create transformer
+    if (transformerName != null) {
+      try {
+        Class classDefinition = Class.forName(transformerName);
+        Object object = classDefinition.newInstance();
+        transformer = (JMSMessageTransformer)object;
+      } catch (Exception e) {
+        log.error("Couldn't find class for transformerName=" + transformerName, e);
+        return null;
+      }
+    }
+    else {
+      transformer = new JMSTextMessageTransformer();
+    }
+
+    // configure transformer
+    if (transformerConfs != null) {
+      String result = transformer.parseArgs(transformerConfs);
+      if (result == null) {
+        log.error("JMSMessageTransformer couldn't parse transformer configs: " +
+                transformerConfs);
+        return null;
+      }
+    }
+
+    status = s;
+    source = "jms:"+brokerURL + ",topic:" + topic;
+    return s;
+  }
+
+  @Override
+  public void start(long offset) throws AdaptorException {
+
+    try {
+      bytesReceived = offset;
+
+      connectionFactory = initializeConnectionFactory(brokerURL);
+      connection = connectionFactory.createConnection();
+
+      log.info("Starting JMS adaptor: " + adaptorID + " started on brokerURL=" + brokerURL +
+              ", topic=" + topic + ", selector=" + selector +
+              ", offset =" + bytesReceived);
+
+      // this is where different initialization could be used for a queue
+      initializeTopic(connection, topic, selector, new JMSListener());
+      connection.start();
+
+    } catch(Exception e) {
+      throw new AdaptorException(e);
+    }
+  }
+
+  /**
+   * Override this to initialize with a different connection factory.
+   * @param brokerURL
+   * @return
+   */
+  protected ConnectionFactory initializeConnectionFactory(String brokerURL) {
+    return new ActiveMQConnectionFactory(brokerURL);
+  }
+
+  /**
+   * Status is used to write checkpoints. Checkpoints are written as:
+   * ADD <adaptorKey> = <adaptorClass> <currentStatus> <offset>
+   *
+   * Once they're reloaded, adaptors are re-initialized with
+   * <adaptorClass> <currentStatus> <offset>
+   *
+   * While doing so, this gets passed by to the parseArgs method:
+   * <currentStatus>
+   *
+   * Without the first token in <currentStatus>, which is expected to be <dataType>.
+   *
+   * @return
+   */
+  @Override
+  public String getCurrentStatus() {
+    return type + " " + status;
+  }
+
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+      throws AdaptorException {
+    try {
+      connection.stop();
+
+    } catch(Exception e) {}
+
+    return bytesReceived;
+  }
+
+  private void initializeTopic(Connection connection,
+                                      String topic,
+                                      String selector,
+                                      JMSListener listener) throws JMSException {
+    TopicSession session = ((TopicConnection)connection).
+            createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+    Topic jmsTopic = session.createTopic(topic);
+    TopicSubscriber subscriber = session.
+            createSubscriber(jmsTopic, selector, true);
+    subscriber.setMessageListener(listener);
+  }
+
+  private static String trimQuotes(String value) {
+    // trim leading and trailing quotes
+    if (value.charAt(0) == '"') {
+      value = value.substring(1);
+    }
+    if (value.charAt(value.length() - 1) == '"') {
+      value = value.substring(0, value.length() - 1);
+    }
+    return value;
+  }
+
+}
\ No newline at end of file

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+import java.util.ArrayList;
+
+/**
+ * JMSMessageTransformer that uses the properties of a JMS Message to build a
+ * Chukwa record payload. The value for each property configured will be used
+ * to create the record, with the delimiter value between each. The default
+ * delimiter is a tab (i.e., '\t').
+ * <P>
+ * To configure this transformer, set the -p field of the adaptor to the
+ * following (surrounded with double quotes):
+ * <code>
+ * <propertyNames> [-d <delimiter>] [-r <requiredPropertyNames>]
+ * </code>
+ * <ul>
+ * <li><code>propertyNames</code> - Comma-separated list of JMS properties.</li>
+ * <li><code>delimiter</code> - Delimiter to use, in single quotes.</li>
+ * <li><code>requiredPropertyNames</code> - Comma-separated list of required
+ * JMS properties. Default behavior is that all properties are required.</li>
+ * </ul>
+ *
+ */
+public class JMSMessagePropertyTransformer implements JMSMessageTransformer {
+  protected Log log = LogFactory.getLog(getClass());
+
+  private static final String DEFAULT_DELIMITER = "\t";
+
+  ArrayList<String> propertyNames = null;
+  ArrayList<String> requiredPropertyNames = null;
+  String delimiter = DEFAULT_DELIMITER;
+
+  public String parseArgs(String args) {
+    if (args == null || args.length() == 0) {
+      log.error("propertyNames must be set for this transformer");
+      return null;
+    }
+
+    propertyNames = new ArrayList<String>();
+    
+    String[] tokens = args.split(" ");
+    for (String propertyName : tokens[0].split(",")) {
+      propertyNames.add(propertyName);
+    }
+
+    for(int i = 1; i < tokens.length; i++) {
+      String token = tokens[i];
+
+      if ("-d".equals(token) && i <= tokens.length - 2) {
+        String value = tokens[++i];
+
+        // we lost all spaces with the split, so we have to put them back, yuck.
+        while (i <= tokens.length - 2 && !tokens[i + 1].startsWith("-")) {
+          value = value + " " + tokens[++i];
+        }
+
+        delimiter = trimSingleQuotes(value);
+      }
+      else if ("-r".equals(token) && i <= tokens.length - 2) {
+        // requiredPropertyNames = null means all are required.
+        requiredPropertyNames = new ArrayList<String>();
+
+        String[] required = tokens[++i].split(",");
+        for (String r : required) {
+          requiredPropertyNames.add(r);
+        }
+      }
+    }
+
+    log.info("Initialized JMSMessagePropertyTransformer: delimiter='" +
+            delimiter + "', propertyNames=" + propertyNames +
+            ", requiredProperties=" +
+            (requiredPropertyNames == null ? "ALL" : requiredPropertyNames));
+    return args;
+  }
+
+  /**
+   * Transforms message propertes into a byte array delimtied by delimiter. If
+   * all of the configured message properties are not found, returns null.
+   * <P>
+   * The could be enhanced to support the concept of optional/required properties.
+   * @param message
+   * @return
+   * @throws JMSException
+   */
+  public byte[] transform(Message message) throws JMSException {
+
+    if (propertyNames == null || propertyNames.size() == 0) {
+      log.error("No message properties configured for this JMS transformer.");
+      return null;
+    }
+
+    int valuesFound = 0;
+    StringBuilder sb = new StringBuilder();
+    for (String propertyName : propertyNames) {
+      Object propertyValue = message.getObjectProperty(propertyName);
+      String value = transformValue(propertyName, propertyValue);
+
+      // is a required value not found?
+      if (value == null) {
+        if (requiredPropertyNames == null ||
+            requiredPropertyNames.contains(propertyName)) {
+          return null;
+        }
+      }
+
+      if (valuesFound > 0) {
+        sb.append(delimiter);
+      }
+
+      sb.append(value);
+      valuesFound++;
+    }
+
+    if (sb.length() == 0 || valuesFound != propertyNames.size()) {
+      return null;
+    }
+
+    return sb.toString().getBytes();
+  }
+
+  /**
+   * Transforms the propertyValue found into the string that should be used for
+   * the message. Can handle String values and Number values. Override this method
+   * to handle other Java types, or to apply other value transformation logic.
+   *
+   * @param propertyName The name of the JMS property
+   * @param propertyValue The value of the property, which might be null.
+   * @return
+   */
+  protected String transformValue(String propertyName, Object propertyValue) {
+
+    if (propertyValue == null) {
+      return null;
+    }
+    else if (propertyValue instanceof String) {
+      return (String)propertyValue;
+    }
+    else if (propertyValue instanceof Number) {
+      return propertyValue.toString();
+    }
+
+    return null;
+  }
+
+  private static String trimSingleQuotes(String value) {
+    if (value.length() == 0) {
+      return value;
+    }
+
+    // trim leading and trailing quotes
+    if (value.charAt(0) == '\'') {
+      value = value.substring(1);
+    }
+    if (value.length() > 0 && value.charAt(value.length() - 1) == '\'') {
+      value = value.substring(0, value.length() - 1);
+    }
+    
+    return value;
+  }
+
+}

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessageTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessageTransformer.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessageTransformer.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessageTransformer.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+
+/**
+ * Class that knows how to transform a JMS Message to a byte array. The byte
+ * array will become the bytes bound to the Chukwa chunk.
+ */
+public interface JMSMessageTransformer {
+
+  /**
+   * Parse any transformer-specific args to initialize the transformer. Return
+   * a null if the arguments could not be parsed. This method will always be
+   * invoked before transform is called only if transformer arguments were
+   * passed. If they weren't, this method will never be called.
+   *
+   * @param args Arguments needed to configur the transformer.
+   * @return
+   */
+  public String parseArgs(String args);
+
+  /**
+   * Transform a Message to an array of bytes. Return null for a message that
+   * should be ignored.
+   *
+   * @param message JMS message received by a JMS Adaptor.
+   * @return the bytes that should be bound to the Chukwa chunk.
+   * @throws JMSException
+   */
+  public byte[] transform(Message message) throws JMSException;
+}

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
+
+/**
+ * Basic JMSMessageTransformer that uses the payload message of a JMS
+ * TextMessage as the Chukwa record payload. If the message is not an instance
+ * of TextMessage, or it is, but the payload is null or empty, returns null.
+ */
+public class JMSTextMessageTransformer implements JMSMessageTransformer {
+  protected Log log = LogFactory.getLog(getClass());
+
+  public String parseArgs(String s) {
+    return s;      
+  }
+
+  public byte[] transform(Message message) throws JMSException {
+    if (!(message instanceof TextMessage)) {
+      log.warn("Invalid message type received: " + message);
+      return null;
+    }
+
+    String text = ((TextMessage)message).getText();
+    if (text != null && text.length() > 0) {
+      return text.getBytes();
+    }
+
+    return null;
+  }
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java?rev=937557&r1=937556&r2=937557&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java Fri Apr 23 23:56:22 2010
@@ -35,6 +35,15 @@ public class TsProcessor extends Abstrac
   public TsProcessor() {
     // TODO move that to config
     sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+
+//    StringBuilder format = new StringBuilder();
+//    format.append("TsProcessor.time.format");
+//    format.append(chunk.getDataType());
+//    if(conf.get(format.toString())!=null) {
+//     sdf = new SimpleDateFormat(conf.get(format.toString()));
+//    } else {
+//     sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+//    }
   }
 
   @Override

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSAdaptor.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSAdaptor.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSAdaptor.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+import org.apache.hadoop.chukwa.Chunk;
+
+import org.apache.activemq.ActiveMQConnection;
+
+import javax.jms.Message;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+/**
+ * Tests the functionality of JMSAdapter and JMSTextMessageTransformer
+ */
+public class TestJMSAdaptor extends TestCase implements ChunkReceiver {
+  String DATA_TYPE = "Test";
+  String MESSAGE_PAYLOAD = "Some JMS message payload";
+
+  TopicConnection connection = null;
+  TopicSession session = null;
+  TopicPublisher publisher = null;
+  int bytesReceived = 0;
+  int messagesReceived = 0;
+
+  protected void setUp() throws Exception {
+    connection =  ActiveMQConnection.makeConnection("vm://localhost");
+    session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+    Topic topic = session.createTopic("test.topic");
+    publisher = session.createPublisher(topic);
+    messagesReceived = 0;
+    bytesReceived = 0;
+  }
+
+  protected void tearDown() throws Exception {
+    session.close();
+    connection.close();
+  }
+
+  public void testJMSTextMessage() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic 0",
+            AdaptorManager.NULL);
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+    assertEquals("Message not received", 1, messagesReceived);
+  }
+
+  public void testJMSTextMessageWithTransformer() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic -x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSTextMessageTransformer 0",
+            AdaptorManager.NULL);
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+    assertEquals("Message not received", 1, messagesReceived);
+  }
+
+  public void testJMSTextMessageWithSelector() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE,
+            "vm://localhost -t test.topic -s \"foo='bar'\" 0",
+            AdaptorManager.NULL);
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    publisher.publish(message);
+    
+    message = session.createTextMessage(MESSAGE_PAYLOAD);
+    message.setStringProperty("foo", "bar");
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+
+    assertEquals("Message not received", 1, messagesReceived);
+  }
+
+  public void testJMSTextMessageWithMultiWordSelector() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE,
+            "vm://localhost -t test.topic -s \"foo='bar' and bar='foo'\" 0",
+            AdaptorManager.NULL);    
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    publisher.publish(message);
+
+    message = session.createTextMessage(MESSAGE_PAYLOAD);
+    message.setStringProperty("foo", "bar");
+    publisher.publish(message);
+
+    message = session.createTextMessage(MESSAGE_PAYLOAD);
+    message.setStringProperty("foo", "bar");
+    message.setStringProperty("bar", "foo");
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+
+    assertEquals("Message not received", 1, messagesReceived);
+  }
+
+  public void add(Chunk c) {
+    bytesReceived += c.getData().length;
+    assertEquals("Unexpected data length",
+            MESSAGE_PAYLOAD.length(), c.getData().length);
+    assertEquals("Unexpected data type", DATA_TYPE, c.getDataType());
+    assertEquals("Chunk sequenceId should be total bytes received.",
+            bytesReceived, c.getSeqID());
+    assertEquals("Unexpected message payload",
+            MESSAGE_PAYLOAD, new String(c.getData()));
+    messagesReceived++;
+  }
+}
\ No newline at end of file

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSMessagePropertyTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSMessagePropertyTransformer.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSMessagePropertyTransformer.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSMessagePropertyTransformer.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+import org.apache.hadoop.chukwa.Chunk;
+
+import org.apache.activemq.ActiveMQConnection;
+
+import javax.jms.Message;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import java.util.ArrayList;
+
+/**
+ * Tests the functionality JMSMessagePropertyTransformer.
+ */
+public class TestJMSMessagePropertyTransformer extends TestCase implements ChunkReceiver {
+  String DATA_TYPE = "Test";
+  String MESSAGE_PAYLOAD = "Some JMS message payload";
+
+  TopicConnection connection = null;
+  TopicSession session = null;
+  TopicPublisher publisher = null;
+  ArrayList<String> chunkPayloads;
+  int bytesReceived = 0;
+
+  protected void setUp() throws Exception {
+    connection =  ActiveMQConnection.makeConnection("vm://localhost");
+    session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+    Topic topic = session.createTopic("test.topic");
+    publisher = session.createPublisher(topic);
+    chunkPayloads = new ArrayList<String>();
+    bytesReceived = 0;
+  }
+
+  protected void tearDown() throws Exception {
+    session.close();
+    connection.close();
+  }
+
+  public void testJMSMessageProperties() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+                    "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+                    "-p \"foo,bar,num\" 0",
+            AdaptorManager.NULL);
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    message.setStringProperty("bar", "bar_value");
+    message.setStringProperty("bat", "bat_value");
+    message.setStringProperty("foo", "foo_value");
+    message.setIntProperty("num", 1);
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+    assertEquals("Message not received", 1, chunkPayloads.size());
+    assertEquals("Incorrect chunk payload found",
+            "foo_value\tbar_value\t1", chunkPayloads.get(0));
+  }
+
+  public void testJMSMessagePropertiesNoQuotes() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+                    "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+                    "-p foo,bar,num 0",
+            AdaptorManager.NULL);
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    message.setStringProperty("bar", "bar_value");
+    message.setStringProperty("bat", "bat_value");
+    message.setStringProperty("foo", "foo_value");
+    message.setIntProperty("num", 1);
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+    assertEquals("Message not received", 1, chunkPayloads.size());
+    assertEquals("Incorrect chunk payload found",
+            "foo_value\tbar_value\t1", chunkPayloads.get(0));
+  }
+
+  public void testJMSMessagePropertiesWithDelimiter() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+                    "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+                    "-p \"foo,bar,num -d ' '\" 0",
+            AdaptorManager.NULL);
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    message.setStringProperty("bar", "bar_value");
+    message.setStringProperty("bat", "bat_value");
+    message.setStringProperty("foo", "foo_value");
+    message.setIntProperty("num", 1);
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+    assertEquals("Message not received", 1, chunkPayloads.size());
+    assertEquals("Incorrect chunk payload found", "foo_value bar_value 1", chunkPayloads.get(0));
+  }
+
+  public void testJMSMessagePropertiesWithNoQuotesDelimiter() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+                    "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+                    "-p \"foo,bar,num -d ^^^\" 0",
+            AdaptorManager.NULL);
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    message.setStringProperty("bar", "bar_value");
+    message.setStringProperty("bat", "bat_value");
+    message.setStringProperty("foo", "foo_value");
+    message.setIntProperty("num", 1);
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+    assertEquals("Message not received", 1, chunkPayloads.size());
+    assertEquals("Incorrect chunk payload found", "foo_value^^^bar_value^^^1", chunkPayloads.get(0));
+  }
+
+  public void testJMSMessagePropertiesWithMultiWordDelimiter() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+                    "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+                    "-p \"foo,bar,num -d '[ insert between values ]'\" 0",
+            AdaptorManager.NULL);
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    message.setStringProperty("bar", "bar_value");
+    message.setStringProperty("bat", "bat_value");
+    message.setStringProperty("foo", "foo_value");
+    message.setIntProperty("num", 1);
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+    assertEquals("Message not received", 1, chunkPayloads.size());
+    assertEquals("Incorrect chunk payload found",
+            "foo_value[ insert between values ]bar_value[ insert between values ]1",
+            chunkPayloads.get(0));
+  }
+
+  public void testJMSPropMissingWithAllRequired() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+                    "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+                    "-p \"foo,bar,num\" 0",
+            AdaptorManager.NULL);
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    message.setStringProperty("bar", "bar_value");
+    message.setStringProperty("bat", "bat_value");
+    message.setIntProperty("num", 1);
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+    assertEquals("Message should not have been received", 0, chunkPayloads.size());
+  }
+
+  public void testJMSPropMissingWithSomeRequired() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+                    "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+                    "-p \"foo,bar,num -r foo\" 0",
+            AdaptorManager.NULL);
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    message.setStringProperty("bar", "bar_value");
+    message.setStringProperty("bat", "bat_value");
+    message.setIntProperty("num", 1);
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+    assertEquals("Message should not have been received", 0, chunkPayloads.size());
+  }
+
+  public void testJMSPropfoundWithSomeRequired() throws Exception {
+
+    JMSAdaptor adaptor = new JMSAdaptor();
+    adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+                    "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+                    "-p \"foo,bar,num -r foo\" 0",
+            AdaptorManager.NULL);
+    adaptor.start("id", DATA_TYPE, 0, this);
+
+    Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+    message.setStringProperty("bar", "bar_value");
+    message.setStringProperty("bat", "bat_value");
+    message.setStringProperty("foo", "foo_value");
+    message.setIntProperty("num", 1);
+    publisher.publish(message);
+
+    synchronized(this) {
+      wait(1000);
+    }
+    assertEquals("Message not received", 1, chunkPayloads.size());
+    assertEquals("Incorrect chunk payload found", "foo_value\tbar_value\t1", chunkPayloads.get(0));
+  }
+
+
+
+  public void add(Chunk c) {
+    bytesReceived += c.getData().length;
+    assertEquals("Unexpected data type", DATA_TYPE, c.getDataType());
+    assertEquals("Chunk sequenceId should be total bytes received.",
+            bytesReceived, c.getSeqID());
+    chunkPayloads.add(new String(c.getData()));
+  }
+}
\ No newline at end of file