You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2010/04/24 01:56:23 UTC
svn commit: r937557 - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/
src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/
src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/
Author: asrabkin
Date: Fri Apr 23 23:56:22 2010
New Revision: 937557
URL: http://svn.apache.org/viewvc?rev=937557&view=rev
Log:
CHUKWA-468. JMSAdaptor. Contributed by Bill Graham.
Added:
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessageTransformer.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSAdaptor.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSMessagePropertyTransformer.java
Modified:
hadoop/chukwa/trunk/CHANGES.txt
hadoop/chukwa/trunk/ivy.xml
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=937557&r1=937556&r2=937557&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Fri Apr 23 23:56:22 2010
@@ -4,6 +4,13 @@ Trunk (unreleased changes)
NEW FEATURES
+
+ BUGS
+
+Chukwa 0.4
+
+ NEW FEATURES
+
CHUKWA-445. Realtime display at collector. (asrabkin)
CHUKWA-454. DirTailingAdaptor can filter files. (Gerrit Jansen van Vuuren via asrabkin)
Modified: hadoop/chukwa/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/ivy.xml?rev=937557&r1=937556&r2=937557&view=diff
==============================================================================
--- hadoop/chukwa/trunk/ivy.xml (original)
+++ hadoop/chukwa/trunk/ivy.xml Fri Apr 23 23:56:22 2010
@@ -61,6 +61,10 @@
name="xerces"
rev="${xerces.version}"
conf="jdiff->default"/>
+ <dependency org="org.apache.activemq"
+ name="activemq-core"
+ rev="${activemq.version}"
+ conf="common->default"/>
<dependency org="commons-fileupload"
name="commons-fileupload"
rev="${commons-fileupload.version}"
@@ -88,7 +92,7 @@
<dependency org="commons-net"
name="commons-net"
rev="${commons-net.version}"
- conf="common->master"/>
+ conf="common->master"/>
<dependency org="org.mortbay.jetty"
name="jetty"
rev="${jetty.version}"
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.log4j.Logger;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.jms.Connection;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.JMSException;
+import javax.jms.ConnectionFactory;
+
+/**
+ * Adaptor that is able to listen to a JMS topic for messages, receive the
+ * message, and transform it to a Chukwa chunk. Transformation is handled by a
+ * JMSMessageTransformer. The default JMSMessageTransformer used is the
+ * JMSTextMessageTransformer.
+ * <P>
+ * This adaptor is added to an Agent like so:
+ * <code>
+ * add JMSAdaptor <dataType> <brokerURL> -t <topicName> [-s <JMSSelector>]
+ * [-x <transformerName>] [-p <transformerConfigs>] <offset>
+ * </code>
+ * <ul>
+ * <li><code>dataType</code> - The chukwa data type.</li>
+ * <li><code>brokerURL</code> - The JMS broker URL to bind to.</li>
+ * <li><code>topicName</code> - The JMS topic to listen on.</li>
+ * <li><code>JMSSelector</code> - The JMS selector to filter with. Surround
+ * with quotes if selector contains multiple words.</li>
+ * <li><code>transformerName</code> - Class name of the JMSMessageTransformer to
+ * use.</li>
+ * <li><code>transformerConfigs</code> - Properties to be passed to the
+ * JMSMessageTransformer to use. Surround with quotes if configs contain
+ * multiple words.</li>
+ * </ul>
+ *
+ * @see JMSMessageTransformer
+ * @see JMSTextMessageTransformer
+ */
+public class JMSAdaptor extends AbstractAdaptor {
+
+ static Logger log = Logger.getLogger(JMSAdaptor.class);
+
+ ConnectionFactory connectionFactory = null;
+ Connection connection;
+ String brokerURL;
+ String topic;
+ String selector = null;
+ JMSMessageTransformer transformer;
+
+ volatile long bytesReceived = 0;
+ String status; // used to write checkpoint info. See getStatus() below
+ String source; // added to the chunk to identify the stream
+
+ class JMSListener implements MessageListener {
+
+ public void onMessage(Message message) {
+ if (log.isDebugEnabled()) {
+ log.debug("got a JMS message");
+ }
+
+ try {
+
+ byte[] bytes = transformer.transform(message);
+ if (bytes == null) {
+ return;
+ }
+
+ bytesReceived += bytes.length;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Adding Chunk from JMS message: " + new String(bytes));
+ }
+
+ Chunk c = new ChunkImpl(type, source, bytesReceived, bytes, JMSAdaptor.this);
+ dest.add(c);
+
+ } catch (JMSException e) {
+ log.error("can't read JMS messages in " + adaptorID, e);
+ }
+ catch (InterruptedException e) {
+ log.error("can't add JMS messages in " + adaptorID, e);
+ }
+ }
+ }
+
+ /**
+ * This adaptor received configuration like this:
+ * <brokerURL> -t <topicName> [-s <JMSSelector>] [-x <transformerName>]
+ * [-p <transformerProperties>]
+ *
+ * @param s
+ * @return
+ */
+ @Override
+ public String parseArgs(String s) {
+ if (log.isDebugEnabled()) {
+ log.debug("Parsing args to initialize adaptor: " + s);
+ }
+
+ String[] tokens = s.split(" ");
+ if (tokens.length < 1) {
+ throw new IllegalArgumentException("Configuration must include brokerURL.");
+ }
+
+ brokerURL = tokens[0];
+
+ if (brokerURL.length() < 6 ||
+ (!brokerURL.startsWith("vm://") && !brokerURL.startsWith("tcp://"))) {
+ throw new IllegalArgumentException("Invalid brokerURL: " + brokerURL);
+ }
+
+ String transformerName = null;
+ String transformerConfs = null;
+ for (int i = 1; i < tokens.length; i++) {
+ String value = tokens[i];
+ if ("-t".equals(value)) {
+ topic = tokens[++i];
+ }
+ else if ("-s".equals(value) && i <= tokens.length - 2) {
+ selector = tokens[++i];
+
+ // selector can have multiple words
+ if (selector.startsWith("\"")) {
+ for(int j = i + 1; j < tokens.length - 1; j++) {
+ selector = selector + " " + tokens[++i];
+ if(tokens[j].endsWith("\"")) {
+ break;
+ }
+ }
+ selector = trimQuotes(selector);
+ }
+ }
+ else if ("-x".equals(value)) {
+ transformerName = tokens[++i];
+ }
+ else if ("-p".equals(value)) {
+ transformerConfs = tokens[++i];
+
+ // transformerConfs can have multiple words
+ if (transformerConfs.startsWith("\"")) {
+ for(int j = i + 1; j < tokens.length - 1; j++) {
+ transformerConfs = transformerConfs + " " + tokens[++i];
+ if(tokens[j].endsWith("\"")) {
+ break;
+ }
+ }
+ transformerConfs = trimQuotes(transformerConfs);
+ }
+ }
+ }
+
+ if (topic == null) {
+ log.error("topicName must be set");
+ return null;
+ }
+
+ // create transformer
+ if (transformerName != null) {
+ try {
+ Class classDefinition = Class.forName(transformerName);
+ Object object = classDefinition.newInstance();
+ transformer = (JMSMessageTransformer)object;
+ } catch (Exception e) {
+ log.error("Couldn't find class for transformerName=" + transformerName, e);
+ return null;
+ }
+ }
+ else {
+ transformer = new JMSTextMessageTransformer();
+ }
+
+ // configure transformer
+ if (transformerConfs != null) {
+ String result = transformer.parseArgs(transformerConfs);
+ if (result == null) {
+ log.error("JMSMessageTransformer couldn't parse transformer configs: " +
+ transformerConfs);
+ return null;
+ }
+ }
+
+ status = s;
+ source = "jms:"+brokerURL + ",topic:" + topic;
+ return s;
+ }
+
+ @Override
+ public void start(long offset) throws AdaptorException {
+
+ try {
+ bytesReceived = offset;
+
+ connectionFactory = initializeConnectionFactory(brokerURL);
+ connection = connectionFactory.createConnection();
+
+ log.info("Starting JMS adaptor: " + adaptorID + " started on brokerURL=" + brokerURL +
+ ", topic=" + topic + ", selector=" + selector +
+ ", offset =" + bytesReceived);
+
+ // this is where different initialization could be used for a queue
+ initializeTopic(connection, topic, selector, new JMSListener());
+ connection.start();
+
+ } catch(Exception e) {
+ throw new AdaptorException(e);
+ }
+ }
+
+ /**
+ * Override this to initialize with a different connection factory.
+ * @param brokerURL
+ * @return
+ */
+ protected ConnectionFactory initializeConnectionFactory(String brokerURL) {
+ return new ActiveMQConnectionFactory(brokerURL);
+ }
+
+ /**
+ * Status is used to write checkpoints. Checkpoints are written as:
+ * ADD <adaptorKey> = <adaptorClass> <currentStatus> <offset>
+ *
+ * Once they're reloaded, adaptors are re-initialized with
+ * <adaptorClass> <currentStatus> <offset>
+ *
+ * While doing so, this gets passed by to the parseArgs method:
+ * <currentStatus>
+ *
+ * Without the first token in <currentStatus>, which is expected to be <dataType>.
+ *
+ * @return
+ */
+ @Override
+ public String getCurrentStatus() {
+ return type + " " + status;
+ }
+
+ @Override
+ public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+ throws AdaptorException {
+ try {
+ connection.stop();
+
+ } catch(Exception e) {}
+
+ return bytesReceived;
+ }
+
+ private void initializeTopic(Connection connection,
+ String topic,
+ String selector,
+ JMSListener listener) throws JMSException {
+ TopicSession session = ((TopicConnection)connection).
+ createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic jmsTopic = session.createTopic(topic);
+ TopicSubscriber subscriber = session.
+ createSubscriber(jmsTopic, selector, true);
+ subscriber.setMessageListener(listener);
+ }
+
+ private static String trimQuotes(String value) {
+ // trim leading and trailing quotes
+ if (value.charAt(0) == '"') {
+ value = value.substring(1);
+ }
+ if (value.charAt(value.length() - 1) == '"') {
+ value = value.substring(0, value.length() - 1);
+ }
+ return value;
+ }
+
+}
\ No newline at end of file
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+import java.util.ArrayList;
+
+/**
+ * JMSMessageTransformer that uses the properties of a JMS Message to build a
+ * Chukwa record payload. The value for each property configured will be used
+ * to create the record, with the delimiter value between each. The default
+ * delimiter is a tab (i.e., '\t').
+ * <P>
+ * To configure this transformer, set the -p field of the adaptor to the
+ * following (surrounded with double quotes):
+ * <code>
+ * <propertyNames> [-d <delimiter>] [-r <requiredPropertyNames>]
+ * </code>
+ * <ul>
+ * <li><code>propertyNames</code> - Comma-separated list of JMS properties.</li>
+ * <li><code>delimiter</code> - Delimiter to use, in single quotes.</li>
+ * <li><code>requiredPropertyNames</code> - Comma-separated list of required
+ * JMS properties. Default behavior is that all properties are required.</li>
+ * </ul>
+ *
+ */
+public class JMSMessagePropertyTransformer implements JMSMessageTransformer {
+ protected Log log = LogFactory.getLog(getClass());
+
+ private static final String DEFAULT_DELIMITER = "\t";
+
+ ArrayList<String> propertyNames = null;
+ ArrayList<String> requiredPropertyNames = null;
+ String delimiter = DEFAULT_DELIMITER;
+
+ public String parseArgs(String args) {
+ if (args == null || args.length() == 0) {
+ log.error("propertyNames must be set for this transformer");
+ return null;
+ }
+
+ propertyNames = new ArrayList<String>();
+
+ String[] tokens = args.split(" ");
+ for (String propertyName : tokens[0].split(",")) {
+ propertyNames.add(propertyName);
+ }
+
+ for(int i = 1; i < tokens.length; i++) {
+ String token = tokens[i];
+
+ if ("-d".equals(token) && i <= tokens.length - 2) {
+ String value = tokens[++i];
+
+ // we lost all spaces with the split, so we have to put them back, yuck.
+ while (i <= tokens.length - 2 && !tokens[i + 1].startsWith("-")) {
+ value = value + " " + tokens[++i];
+ }
+
+ delimiter = trimSingleQuotes(value);
+ }
+ else if ("-r".equals(token) && i <= tokens.length - 2) {
+ // requiredPropertyNames = null means all are required.
+ requiredPropertyNames = new ArrayList<String>();
+
+ String[] required = tokens[++i].split(",");
+ for (String r : required) {
+ requiredPropertyNames.add(r);
+ }
+ }
+ }
+
+ log.info("Initialized JMSMessagePropertyTransformer: delimiter='" +
+ delimiter + "', propertyNames=" + propertyNames +
+ ", requiredProperties=" +
+ (requiredPropertyNames == null ? "ALL" : requiredPropertyNames));
+ return args;
+ }
+
+ /**
+ * Transforms message propertes into a byte array delimtied by delimiter. If
+ * all of the configured message properties are not found, returns null.
+ * <P>
+ * The could be enhanced to support the concept of optional/required properties.
+ * @param message
+ * @return
+ * @throws JMSException
+ */
+ public byte[] transform(Message message) throws JMSException {
+
+ if (propertyNames == null || propertyNames.size() == 0) {
+ log.error("No message properties configured for this JMS transformer.");
+ return null;
+ }
+
+ int valuesFound = 0;
+ StringBuilder sb = new StringBuilder();
+ for (String propertyName : propertyNames) {
+ Object propertyValue = message.getObjectProperty(propertyName);
+ String value = transformValue(propertyName, propertyValue);
+
+ // is a required value not found?
+ if (value == null) {
+ if (requiredPropertyNames == null ||
+ requiredPropertyNames.contains(propertyName)) {
+ return null;
+ }
+ }
+
+ if (valuesFound > 0) {
+ sb.append(delimiter);
+ }
+
+ sb.append(value);
+ valuesFound++;
+ }
+
+ if (sb.length() == 0 || valuesFound != propertyNames.size()) {
+ return null;
+ }
+
+ return sb.toString().getBytes();
+ }
+
+ /**
+ * Transforms the propertyValue found into the string that should be used for
+ * the message. Can handle String values and Number values. Override this method
+ * to handle other Java types, or to apply other value transformation logic.
+ *
+ * @param propertyName The name of the JMS property
+ * @param propertyValue The value of the property, which might be null.
+ * @return
+ */
+ protected String transformValue(String propertyName, Object propertyValue) {
+
+ if (propertyValue == null) {
+ return null;
+ }
+ else if (propertyValue instanceof String) {
+ return (String)propertyValue;
+ }
+ else if (propertyValue instanceof Number) {
+ return propertyValue.toString();
+ }
+
+ return null;
+ }
+
+ private static String trimSingleQuotes(String value) {
+ if (value.length() == 0) {
+ return value;
+ }
+
+ // trim leading and trailing quotes
+ if (value.charAt(0) == '\'') {
+ value = value.substring(1);
+ }
+ if (value.length() > 0 && value.charAt(value.length() - 1) == '\'') {
+ value = value.substring(0, value.length() - 1);
+ }
+
+ return value;
+ }
+
+}
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessageTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessageTransformer.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessageTransformer.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessageTransformer.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+
+/**
+ * Class that knows how to transform a JMS Message to a byte array. The byte
+ * array will become the bytes bound to the Chukwa chunk.
+ */
+public interface JMSMessageTransformer {
+
+ /**
+ * Parse any transformer-specific args to initialize the transformer. Return
+ * a null if the arguments could not be parsed. This method will always be
+ * invoked before transform is called only if transformer arguments were
+ * passed. If they weren't, this method will never be called.
+ *
+ * @param args Arguments needed to configur the transformer.
+ * @return
+ */
+ public String parseArgs(String args);
+
+ /**
+ * Transform a Message to an array of bytes. Return null for a message that
+ * should be ignored.
+ *
+ * @param message JMS message received by a JMS Adaptor.
+ * @return the bytes that should be bound to the Chukwa chunk.
+ * @throws JMSException
+ */
+ public byte[] transform(Message message) throws JMSException;
+}
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
+
+/**
+ * Basic JMSMessageTransformer that uses the payload message of a JMS
+ * TextMessage as the Chukwa record payload. If the message is not an instance
+ * of TextMessage, or it is, but the payload is null or empty, returns null.
+ */
+public class JMSTextMessageTransformer implements JMSMessageTransformer {
+ protected Log log = LogFactory.getLog(getClass());
+
+ public String parseArgs(String s) {
+ return s;
+ }
+
+ public byte[] transform(Message message) throws JMSException {
+ if (!(message instanceof TextMessage)) {
+ log.warn("Invalid message type received: " + message);
+ return null;
+ }
+
+ String text = ((TextMessage)message).getText();
+ if (text != null && text.length() > 0) {
+ return text.getBytes();
+ }
+
+ return null;
+ }
+}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java?rev=937557&r1=937556&r2=937557&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java Fri Apr 23 23:56:22 2010
@@ -35,6 +35,15 @@ public class TsProcessor extends Abstrac
public TsProcessor() {
// TODO move that to config
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+
+// StringBuilder format = new StringBuilder();
+// format.append("TsProcessor.time.format");
+// format.append(chunk.getDataType());
+// if(conf.get(format.toString())!=null) {
+// sdf = new SimpleDateFormat(conf.get(format.toString()));
+// } else {
+// sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+// }
}
@Override
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSAdaptor.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSAdaptor.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSAdaptor.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+import org.apache.hadoop.chukwa.Chunk;
+
+import org.apache.activemq.ActiveMQConnection;
+
+import javax.jms.Message;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+/**
+ * Tests the functionality of JMSAdapter and JMSTextMessageTransformer
+ */
+public class TestJMSAdaptor extends TestCase implements ChunkReceiver {
+ String DATA_TYPE = "Test";
+ String MESSAGE_PAYLOAD = "Some JMS message payload";
+
+ TopicConnection connection = null;
+ TopicSession session = null;
+ TopicPublisher publisher = null;
+ int bytesReceived = 0;
+ int messagesReceived = 0;
+
+ protected void setUp() throws Exception {
+ connection = ActiveMQConnection.makeConnection("vm://localhost");
+ session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("test.topic");
+ publisher = session.createPublisher(topic);
+ messagesReceived = 0;
+ bytesReceived = 0;
+ }
+
+ protected void tearDown() throws Exception {
+ session.close();
+ connection.close();
+ }
+
+ public void testJMSTextMessage() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+ assertEquals("Message not received", 1, messagesReceived);
+ }
+
+ public void testJMSTextMessageWithTransformer() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic -x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSTextMessageTransformer 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+ assertEquals("Message not received", 1, messagesReceived);
+ }
+
+ public void testJMSTextMessageWithSelector() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE,
+ "vm://localhost -t test.topic -s \"foo='bar'\" 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ publisher.publish(message);
+
+ message = session.createTextMessage(MESSAGE_PAYLOAD);
+ message.setStringProperty("foo", "bar");
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+
+ assertEquals("Message not received", 1, messagesReceived);
+ }
+
+ public void testJMSTextMessageWithMultiWordSelector() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE,
+ "vm://localhost -t test.topic -s \"foo='bar' and bar='foo'\" 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ publisher.publish(message);
+
+ message = session.createTextMessage(MESSAGE_PAYLOAD);
+ message.setStringProperty("foo", "bar");
+ publisher.publish(message);
+
+ message = session.createTextMessage(MESSAGE_PAYLOAD);
+ message.setStringProperty("foo", "bar");
+ message.setStringProperty("bar", "foo");
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+
+ assertEquals("Message not received", 1, messagesReceived);
+ }
+
+ public void add(Chunk c) {
+ bytesReceived += c.getData().length;
+ assertEquals("Unexpected data length",
+ MESSAGE_PAYLOAD.length(), c.getData().length);
+ assertEquals("Unexpected data type", DATA_TYPE, c.getDataType());
+ assertEquals("Chunk sequenceId should be total bytes received.",
+ bytesReceived, c.getSeqID());
+ assertEquals("Unexpected message payload",
+ MESSAGE_PAYLOAD, new String(c.getData()));
+ messagesReceived++;
+ }
+}
\ No newline at end of file
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSMessagePropertyTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSMessagePropertyTransformer.java?rev=937557&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSMessagePropertyTransformer.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/jms/TestJMSMessagePropertyTransformer.java Fri Apr 23 23:56:22 2010
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+import org.apache.hadoop.chukwa.Chunk;
+
+import org.apache.activemq.ActiveMQConnection;
+
+import javax.jms.Message;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import java.util.ArrayList;
+
+/**
+ * Tests the functionality JMSMessagePropertyTransformer.
+ */
+public class TestJMSMessagePropertyTransformer extends TestCase implements ChunkReceiver {
+ String DATA_TYPE = "Test";
+ String MESSAGE_PAYLOAD = "Some JMS message payload";
+
+ TopicConnection connection = null;
+ TopicSession session = null;
+ TopicPublisher publisher = null;
+ ArrayList<String> chunkPayloads;
+ int bytesReceived = 0;
+
+ protected void setUp() throws Exception {
+ connection = ActiveMQConnection.makeConnection("vm://localhost");
+ session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("test.topic");
+ publisher = session.createPublisher(topic);
+ chunkPayloads = new ArrayList<String>();
+ bytesReceived = 0;
+ }
+
+ protected void tearDown() throws Exception {
+ session.close();
+ connection.close();
+ }
+
+ public void testJMSMessageProperties() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+ "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+ "-p \"foo,bar,num\" 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ message.setStringProperty("bar", "bar_value");
+ message.setStringProperty("bat", "bat_value");
+ message.setStringProperty("foo", "foo_value");
+ message.setIntProperty("num", 1);
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+ assertEquals("Message not received", 1, chunkPayloads.size());
+ assertEquals("Incorrect chunk payload found",
+ "foo_value\tbar_value\t1", chunkPayloads.get(0));
+ }
+
+ public void testJMSMessagePropertiesNoQuotes() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+ "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+ "-p foo,bar,num 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ message.setStringProperty("bar", "bar_value");
+ message.setStringProperty("bat", "bat_value");
+ message.setStringProperty("foo", "foo_value");
+ message.setIntProperty("num", 1);
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+ assertEquals("Message not received", 1, chunkPayloads.size());
+ assertEquals("Incorrect chunk payload found",
+ "foo_value\tbar_value\t1", chunkPayloads.get(0));
+ }
+
+ public void testJMSMessagePropertiesWithDelimiter() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+ "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+ "-p \"foo,bar,num -d ' '\" 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ message.setStringProperty("bar", "bar_value");
+ message.setStringProperty("bat", "bat_value");
+ message.setStringProperty("foo", "foo_value");
+ message.setIntProperty("num", 1);
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+ assertEquals("Message not received", 1, chunkPayloads.size());
+ assertEquals("Incorrect chunk payload found", "foo_value bar_value 1", chunkPayloads.get(0));
+ }
+
+ public void testJMSMessagePropertiesWithNoQuotesDelimiter() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+ "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+ "-p \"foo,bar,num -d ^^^\" 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ message.setStringProperty("bar", "bar_value");
+ message.setStringProperty("bat", "bat_value");
+ message.setStringProperty("foo", "foo_value");
+ message.setIntProperty("num", 1);
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+ assertEquals("Message not received", 1, chunkPayloads.size());
+ assertEquals("Incorrect chunk payload found", "foo_value^^^bar_value^^^1", chunkPayloads.get(0));
+ }
+
+ public void testJMSMessagePropertiesWithMultiWordDelimiter() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+ "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+ "-p \"foo,bar,num -d '[ insert between values ]'\" 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ message.setStringProperty("bar", "bar_value");
+ message.setStringProperty("bat", "bat_value");
+ message.setStringProperty("foo", "foo_value");
+ message.setIntProperty("num", 1);
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+ assertEquals("Message not received", 1, chunkPayloads.size());
+ assertEquals("Incorrect chunk payload found",
+ "foo_value[ insert between values ]bar_value[ insert between values ]1",
+ chunkPayloads.get(0));
+ }
+
+ public void testJMSPropMissingWithAllRequired() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+ "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+ "-p \"foo,bar,num\" 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ message.setStringProperty("bar", "bar_value");
+ message.setStringProperty("bat", "bat_value");
+ message.setIntProperty("num", 1);
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+ assertEquals("Message should not have been received", 0, chunkPayloads.size());
+ }
+
+ public void testJMSPropMissingWithSomeRequired() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+ "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+ "-p \"foo,bar,num -r foo\" 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ message.setStringProperty("bar", "bar_value");
+ message.setStringProperty("bat", "bat_value");
+ message.setIntProperty("num", 1);
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+ assertEquals("Message should not have been received", 0, chunkPayloads.size());
+ }
+
+ public void testJMSPropfoundWithSomeRequired() throws Exception {
+
+ JMSAdaptor adaptor = new JMSAdaptor();
+ adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
+ "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
+ "-p \"foo,bar,num -r foo\" 0",
+ AdaptorManager.NULL);
+ adaptor.start("id", DATA_TYPE, 0, this);
+
+ Message message = session.createTextMessage(MESSAGE_PAYLOAD);
+ message.setStringProperty("bar", "bar_value");
+ message.setStringProperty("bat", "bat_value");
+ message.setStringProperty("foo", "foo_value");
+ message.setIntProperty("num", 1);
+ publisher.publish(message);
+
+ synchronized(this) {
+ wait(1000);
+ }
+ assertEquals("Message not received", 1, chunkPayloads.size());
+ assertEquals("Incorrect chunk payload found", "foo_value\tbar_value\t1", chunkPayloads.get(0));
+ }
+
+
+
+ public void add(Chunk c) {
+ bytesReceived += c.getData().length;
+ assertEquals("Unexpected data type", DATA_TYPE, c.getDataType());
+ assertEquals("Chunk sequenceId should be total bytes received.",
+ bytesReceived, c.getSeqID());
+ chunkPayloads.add(new String(c.getData()));
+ }
+}
\ No newline at end of file