You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 22:34:02 UTC
[06/14] storm git commit: STORM-2416 Release Packaging Improvements
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/examples/src/main/resources/jms-activemq.xml
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/resources/jms-activemq.xml b/external/storm-jms/examples/src/main/resources/jms-activemq.xml
deleted file mode 100644
index 1a845b8..0000000
--- a/external/storm-jms/examples/src/main/resources/jms-activemq.xml
+++ /dev/null
@@ -1,53 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <!-- ActiveMQ -->
-
- <!-- embedded ActiveMQ Broker -->
- <!-- <amq:broker useJmx="false" persistent="false">
- <amq:transportConnectors>
- <amq:transportConnector uri="tcp://localhost:61616" />
- </amq:transportConnectors>
- </amq:broker> -->
-
- <amq:queue id="notificationQueue" physicalName="backtype.storm.contrib.example.queue" />
-
- <amq:topic id="notificationTopic" physicalName="backtype.storm.contrib.example.topic" />
-
- <amq:connectionFactory id="jmsConnectionFactory"
- brokerURL="tcp://localhost:61616" />
-
- <!-- <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
- <property name="connectionFactory">
- <ref bean="jmsConnectionFactory" />
- </property>
- <property name="pubSubDomain" value="false" />
- </bean> -->
-
-</beans>
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/examples/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/resources/log4j.properties b/external/storm-jms/examples/src/main/resources/log4j.properties
deleted file mode 100644
index 079b195..0000000
--- a/external/storm-jms/examples/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-
-log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n
-
-
-log4j.logger.backtype.storm.contrib=DEBUG
-log4j.logger.clojure.contrib=WARN
-log4j.logger.org.springframework=WARN
-log4j.logger.org.apache.zookeeper=WARN
-
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-jms/pom.xml b/external/storm-jms/pom.xml
index 6a6e60a..0e0feb4 100644
--- a/external/storm-jms/pom.xml
+++ b/external/storm-jms/pom.xml
@@ -27,8 +27,9 @@
- <artifactId>storm-jms-parent</artifactId>
- <packaging>pom</packaging>
+ <artifactId>storm-jms</artifactId>
+
+
<developers>
<developer>
@@ -38,10 +39,45 @@
</developer>
</developers>
- <modules>
- <module>core</module>
- <module>examples</module>
- </modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <!-- keep storm out of the jar-with-dependencies -->
+ <scope>${provided.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Active MQ -->
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <version>5.5.1</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
new file mode 100644
index 0000000..4932929
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.storm.tuple.ITuple;
+
+/**
+ * JmsMessageProducer implementations are responsible for translating
+ * a <code>org.apache.storm.tuple.Values</code> instance into a
+ * <code>javax.jms.Message</code> object.
+ * <p>
+ */
+public interface JmsMessageProducer extends Serializable {
+
+ /**
+ * Translate a <code>org.apache.storm.tuple.Tuple</code> object
+ * to a <code>javax.jms.Message</code object.
+ *
+ * @param session
+ * @param input
+ * @return
+ * @throws JMSException
+ */
+ public Message toMessage(Session session, ITuple input) throws JMSException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java
new file mode 100644
index 0000000..d976326
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms;
+
+import java.io.Serializable;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+
+/**
+ * A <code>JmsProvider</code> object encapsulates the <code>ConnectionFactory</code>
+ * and <code>Destination</code> JMS objects the <code>JmsSpout</code> needs to manage
+ * a topic/queue connection over the course of it's lifecycle.
+ *
+ */
+public interface JmsProvider extends Serializable {
+ /**
+ * Provides the JMS <code>ConnectionFactory</code>
+ *
+ * @return the connection factory
+ * @throws Exception
+ */
+ public ConnectionFactory connectionFactory() throws Exception;
+
+ /**
+ * Provides the <code>Destination</code> (topic or queue) from which the
+ * <code>JmsSpout</code> will receive messages.
+ *
+ * @return
+ * @throws Exception
+ */
+ public Destination destination() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
new file mode 100644
index 0000000..0bbb3a0
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
+
+/**
+ * Interface to define classes that can produce a Storm <code>Values</code> objects
+ * from a <code>javax.jms.Message</code> object>.
+ * <p>
+ * Implementations are also responsible for declaring the output
+ * fields they produce.
+ * <p>
+ * If for some reason the implementation can't process a message
+ * (for example if it received a <code>javax.jms.ObjectMessage</code>
+ * when it was expecting a <code>javax.jms.TextMessage</code> it should
+ * return <code>null</code> to indicate to the <code>JmsSpout</code> that
+ * the message could not be processed.
+ *
+ */
+public interface JmsTupleProducer extends Serializable {
+ /**
+ * Process a JMS message object to create a Values object.
+ *
+ * @param msg - the JMS message
+ * @return the Values tuple, or null if the message couldn't be processed.
+ * @throws JMSException
+ */
+ Values toTuple(Message msg) throws JMSException;
+
+ /**
+ * Declare the output fields produced by this JmsTupleProducer.
+ *
+ * @param declarer The OuputFieldsDeclarer for the spout.
+ */
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
new file mode 100644
index 0000000..d691e75
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.bolt;
+
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.jms.JmsMessageProducer;
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * A JmsBolt receives <code>org.apache.storm.tuple.Tuple</code> objects from a Storm
+ * topology and publishes JMS Messages to a destination (topic or queue).
+ * <p>
+ * To use a JmsBolt in a topology, the following must be supplied:
+ * <ol>
+ * <li>A <code>JmsProvider</code> implementation.</li>
+ * <li>A <code>JmsMessageProducer</code> implementation.</li>
+ * </ol>
+ * The <code>JmsProvider</code> provides the JMS <code>javax.jms.ConnectionFactory</code>
+ * and <code>javax.jms.Destination</code> objects requied to publish JMS messages.
+ * <p>
+ * The JmsBolt uses a <code>JmsMessageProducer</code> to translate
+ * <code>org.apache.storm.tuple.Tuple</code> objects into
+ * <code>javax.jms.Message</code> objects for publishing.
+ * <p>
+ * Both JmsProvider and JmsMessageProducer must be set, or the bolt will
+ * fail upon deployment to a cluster.
+ * <p>
+ * The JmsBolt is typically an endpoint in a topology -- in other words
+ * it does not emit any tuples.
+ */
+public class JmsBolt extends BaseTickTupleAwareRichBolt {
+ private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class);
+
+ private boolean autoAck = true;
+
+ // javax.jms objects
+ private Connection connection;
+ private Session session;
+ private MessageProducer messageProducer;
+
+ // JMS options
+ private boolean jmsTransactional = false;
+ private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
+
+ private JmsProvider jmsProvider;
+ private JmsMessageProducer producer;
+
+
+ private OutputCollector collector;
+
+ /**
+ * Set the JmsProvider used to connect to the JMS destination topic/queue
+ *
+ * @param provider
+ */
+ public void setJmsProvider(JmsProvider provider) {
+ this.jmsProvider = provider;
+ }
+
+ /**
+ * Set the JmsMessageProducer used to convert tuples
+ * into JMS messages.
+ *
+ * @param producer
+ */
+ public void setJmsMessageProducer(JmsMessageProducer producer) {
+ this.producer = producer;
+ }
+
+ /**
+ * Sets the JMS acknowledgement mode for JMS messages sent
+ * by this bolt.
+ * <p>
+ * Possible values:
+ * <ul>
+ * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+ * </ul>
+ *
+ * @param acknowledgeMode (constant defined in javax.jms.Session)
+ */
+ public void setJmsAcknowledgeMode(int acknowledgeMode) {
+ this.jmsAcknowledgeMode = acknowledgeMode;
+ }
+
+ /**
+ * Set the JMS transactional setting for the JMS session.
+ *
+ * @param transactional
+ */
+// public void setJmsTransactional(boolean transactional){
+// this.jmsTransactional = transactional;
+// }
+
+ /**
+ * Sets whether or not tuples should be acknowledged by this
+ * bolt.
+ * <p>
+ *
+ * @param autoAck
+ */
+ public void setAutoAck(boolean autoAck) {
+ this.autoAck = autoAck;
+ }
+
+
+ /**
+ * Consumes a tuple and sends a JMS message.
+ * <p>
+ * If autoAck is true, the tuple will be acknowledged
+ * after the message is sent.
+ * <p>
+ * If JMS sending fails, the tuple will be failed.
+ */
+ @Override
+ protected void process(Tuple input) {
+ // write the tuple to a JMS destination...
+ LOG.debug("Tuple received. Sending JMS message.");
+
+ try {
+ Message msg = this.producer.toMessage(this.session, input);
+ if (msg != null) {
+ if (msg.getJMSDestination() != null) {
+ this.messageProducer.send(msg.getJMSDestination(), msg);
+ } else {
+ this.messageProducer.send(msg);
+ }
+ }
+ if (this.autoAck) {
+ LOG.debug("ACKing tuple: " + input);
+ this.collector.ack(input);
+ }
+ } catch (JMSException e) {
+ // failed to send the JMS message, fail the tuple fast
+ LOG.warn("Failing tuple: " + input);
+ LOG.warn("Exception: ", e);
+ this.collector.fail(input);
+ }
+ }
+
+ /**
+ * Releases JMS resources.
+ */
+ @Override
+ public void cleanup() {
+ try {
+ LOG.debug("Closing JMS connection.");
+ this.session.close();
+ this.connection.close();
+ } catch (JMSException e) {
+ LOG.warn("Error closing JMS connection.", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ /**
+ * Initializes JMS resources.
+ */
+ @Override
+ public void prepare(Map stormConf, TopologyContext context,
+ OutputCollector collector) {
+ if (this.jmsProvider == null || this.producer == null) {
+ throw new IllegalStateException("JMS Provider and MessageProducer not set.");
+ }
+ this.collector = collector;
+ LOG.debug("Connecting JMS..");
+ try {
+ ConnectionFactory cf = this.jmsProvider.connectionFactory();
+ Destination dest = this.jmsProvider.destination();
+ this.connection = cf.createConnection();
+ this.session = connection.createSession(this.jmsTransactional,
+ this.jmsAcknowledgeMode);
+ this.messageProducer = session.createProducer(dest);
+
+ connection.start();
+ } catch (Exception e) {
+ LOG.warn("Error creating JMS connection.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
new file mode 100644
index 0000000..b78a41e
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.spout;
+
+import java.io.Serializable;
+
+public class JmsMessageID implements Comparable<JmsMessageID>, Serializable {
+
+ private String jmsID;
+
+ private Long sequence;
+
+ public JmsMessageID(long sequence, String jmsID){
+ this.jmsID = jmsID;
+ this.sequence = sequence;
+ }
+
+
+ public String getJmsID(){
+ return this.jmsID;
+ }
+
+ @Override
+ public int compareTo(JmsMessageID jmsMessageID) {
+ return (int)(this.sequence - jmsMessageID.sequence);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.sequence.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if(o instanceof JmsMessageID){
+ JmsMessageID id = (JmsMessageID)o;
+ return this.jmsID.equals(id.jmsID);
+ } else {
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
new file mode 100644
index 0000000..6aaa7c9
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.spout;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.jms.JmsTupleProducer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+/**
+ * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue
+ * and outputs tuples based on the messages it receives.
+ * <p>
+ * <code>JmsSpout</code> instances rely on <code>JmsProducer</code> implementations
+ * to obtain the JMS <code>ConnectionFactory</code> and <code>Destination</code> objects
+ * necessary to connect to a JMS topic/queue.
+ * <p>
+ * When a <code>JmsSpout</code> receives a JMS message, it delegates to an
+ * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the
+ * incoming message.
+ * <p>
+ * Typically, developers will supply a custom <code>JmsTupleProducer</code> implementation
+ * appropriate for the expected message content.
+ */
+@SuppressWarnings("serial")
+public class JmsSpout extends BaseRichSpout implements MessageListener {
+ private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
+
+ // JMS options
+ private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
+ private boolean distributed = true;
+
+ private JmsTupleProducer tupleProducer;
+
+ private JmsProvider jmsProvider;
+
+ private LinkedBlockingQueue<Message> queue;
+ private TreeSet<JmsMessageID> toCommit;
+ private HashMap<JmsMessageID, Message> pendingMessages;
+ private long messageSequence = 0;
+
+ private SpoutOutputCollector collector;
+
+ private transient Connection connection;
+ private transient Session session;
+
+ private boolean hasFailures = false;
+ public final Serializable recoveryMutex = "RECOVERY_MUTEX";
+ private Timer recoveryTimer = null;
+ private long recoveryPeriod = -1; // default to disabled
+
+ /**
+ * Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
+ * <p>
+ * Possible values:
+ * <ul>
+ * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+ * </ul>
+ *
+ * @param mode JMS Session Acknowledgement mode
+ * @throws IllegalArgumentException if the mode is not recognized.
+ */
+ public void setJmsAcknowledgeMode(int mode) {
+ switch (mode) {
+ case Session.AUTO_ACKNOWLEDGE:
+ case Session.CLIENT_ACKNOWLEDGE:
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
+
+ }
+ this.jmsAcknowledgeMode = mode;
+ }
+
+ /**
+ * Returns the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
+ *
+ * @return
+ */
+ public int getJmsAcknowledgeMode() {
+ return this.jmsAcknowledgeMode;
+ }
+
+ /**
+ * Set the <code>JmsProvider</code>
+ * implementation that this Spout will use to connect to
+ * a JMS <code>javax.jms.Desination</code>
+ *
+ * @param provider
+ */
+ public void setJmsProvider(JmsProvider provider) {
+ this.jmsProvider = provider;
+ }
+
+ /**
+ * Set the <code>JmsTupleProducer</code>
+ * implementation that will convert <code>javax.jms.Message</code>
+ * object to <code>org.apache.storm.tuple.Values</code> objects
+ * to be emitted.
+ *
+ * @param producer
+ */
+ public void setJmsTupleProducer(JmsTupleProducer producer) {
+ this.tupleProducer = producer;
+ }
+
+ /**
+ * <code>javax.jms.MessageListener</code> implementation.
+ * <p>
+ * Stored the JMS message in an internal queue for processing
+ * by the <code>nextTuple()</code> method.
+ */
+ public void onMessage(Message msg) {
+ try {
+ LOG.debug("Queuing msg [" + msg.getJMSMessageID() + "]");
+ } catch (JMSException e) {
+ }
+ this.queue.offer(msg);
+ }
+
+ /**
+ * <code>ISpout</code> implementation.
+ * <p>
+ * Connects the JMS spout to the configured JMS destination
+ * topic/queue.
+ */
+ @SuppressWarnings("rawtypes")
+ public void open(Map conf, TopologyContext context,
+ SpoutOutputCollector collector) {
+ if (this.jmsProvider == null) {
+ throw new IllegalStateException("JMS provider has not been set.");
+ }
+ if (this.tupleProducer == null) {
+ throw new IllegalStateException("JMS Tuple Producer has not been set.");
+ }
+ Integer topologyTimeout = (Integer) conf.get("topology.message.timeout.secs");
+ // TODO fine a way to get the default timeout from storm, so we're not hard-coding to 30 seconds (it could change)
+ topologyTimeout = topologyTimeout == null ? 30 : topologyTimeout;
+ if ((topologyTimeout.intValue() * 1000) > this.recoveryPeriod) {
+ LOG.warn("*** WARNING *** : " +
+ "Recovery period (" + this.recoveryPeriod + " ms.) is less then the configured " +
+ "'topology.message.timeout.secs' of " + topologyTimeout +
+ " secs. This could lead to a message replay flood!");
+ }
+ this.queue = new LinkedBlockingQueue<Message>();
+ this.toCommit = new TreeSet<JmsMessageID>();
+ this.pendingMessages = new HashMap<JmsMessageID, Message>();
+ this.collector = collector;
+ try {
+ ConnectionFactory cf = this.jmsProvider.connectionFactory();
+ Destination dest = this.jmsProvider.destination();
+ this.connection = cf.createConnection();
+ this.session = connection.createSession(false,
+ this.jmsAcknowledgeMode);
+ MessageConsumer consumer = session.createConsumer(dest);
+ consumer.setMessageListener(this);
+ this.connection.start();
+ if (this.isDurableSubscription() && this.recoveryPeriod > 0) {
+ this.recoveryTimer = new Timer();
+ this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 10, this.recoveryPeriod);
+ }
+
+ } catch (Exception e) {
+ LOG.warn("Error creating JMS connection.", e);
+ }
+
+ }
+
+ public void close() {
+ try {
+ LOG.debug("Closing JMS connection.");
+ this.session.close();
+ this.connection.close();
+ } catch (JMSException e) {
+ LOG.warn("Error closing JMS connection.", e);
+ }
+
+ }
+
+ public void nextTuple() {
+ Message msg = this.queue.poll();
+ if (msg == null) {
+ Utils.sleep(50);
+ } else {
+
+ LOG.debug("sending tuple: " + msg);
+ // get the tuple from the handler
+ try {
+ Values vals = this.tupleProducer.toTuple(msg);
+ // ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE
+ LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode()));
+ LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode));
+ if (this.isDurableSubscription()) {
+ LOG.debug("Requesting acks.");
+ JmsMessageID messageId = new JmsMessageID(this.messageSequence++, msg.getJMSMessageID());
+ this.collector.emit(vals, messageId);
+
+ // at this point we successfully emitted. Store
+ // the message and message ID so we can do a
+ // JMS acknowledge later
+ this.pendingMessages.put(messageId, msg);
+ this.toCommit.add(messageId);
+ } else {
+ this.collector.emit(vals);
+ }
+ } catch (JMSException e) {
+ LOG.warn("Unable to convert JMS message: " + msg);
+ }
+
+ }
+
+ }
+
+ /*
+ * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
+ */
+ public void ack(Object msgId) {
+
+ Message msg = this.pendingMessages.remove(msgId);
+ JmsMessageID oldest = this.toCommit.first();
+ if (msgId.equals(oldest)) {
+ if (msg != null) {
+ try {
+ LOG.debug("Committing...");
+ msg.acknowledge();
+ LOG.debug("JMS Message acked: " + msgId);
+ this.toCommit.remove(msgId);
+ } catch (JMSException e) {
+ LOG.warn("Error acknowldging JMS message: " + msgId, e);
+ }
+ } else {
+ LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId);
+ }
+ } else {
+ this.toCommit.remove(msgId);
+ }
+
+ }
+
+ /*
+ * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
+ */
+ public void fail(Object msgId) {
+ LOG.warn("Message failed: " + msgId);
+ this.pendingMessages.clear();
+ this.toCommit.clear();
+ synchronized (this.recoveryMutex) {
+ this.hasFailures = true;
+ }
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ this.tupleProducer.declareOutputFields(declarer);
+
+ }
+
+ /**
+ * Returns <code>true</code> if the spout has received failures
+ * from which it has not yet recovered.
+ */
+ public boolean hasFailures() {
+ return this.hasFailures;
+ }
+
+ protected void recovered() {
+ this.hasFailures = false;
+ }
+
+ /**
+ * Sets the periodicity of the timer task that
+ * checks for failures and recovers the JMS session.
+ *
+ * @param period
+ */
+ public void setRecoveryPeriod(long period) {
+ this.recoveryPeriod = period;
+ }
+
+ public boolean isDistributed() {
+ return this.distributed;
+ }
+
+ /**
+ * Sets the "distributed" mode of this spout.
+ * <p>
+ * If <code>true</code> multiple instances of this spout <i>may</i> be
+ * created across the cluster (depending on the "parallelism_hint" in the topology configuration).
+ * <p>
+ * Setting this value to <code>false</code> essentially means this spout will run as a singleton
+ * within the cluster ("parallelism_hint" will be ignored).
+ * <p>
+ * In general, this should be set to <code>false</code> if the underlying JMS destination is a
+ * topic, and <code>true</code> if it is a JMS queue.
+ *
+ * @param distributed
+ */
+ public void setDistributed(boolean distributed) {
+ this.distributed = distributed;
+ }
+
+
+ private static final String toDeliveryModeString(int deliveryMode) {
+ switch (deliveryMode) {
+ case Session.AUTO_ACKNOWLEDGE:
+ return "AUTO_ACKNOWLEDGE";
+ case Session.CLIENT_ACKNOWLEDGE:
+ return "CLIENT_ACKNOWLEDGE";
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ return "DUPS_OK_ACKNOWLEDGE";
+ default:
+ return "UNKNOWN";
+
+ }
+ }
+
+ protected Session getSession() {
+ return this.session;
+ }
+
+ private boolean isDurableSubscription() {
+ return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE);
+ }
+
+
+ private class RecoveryTask extends TimerTask {
+ private final Logger LOG = LoggerFactory.getLogger(RecoveryTask.class);
+
+ public void run() {
+ synchronized (JmsSpout.this.recoveryMutex) {
+ if (JmsSpout.this.hasFailures()) {
+ try {
+ LOG.info("Recovering from a message failure.");
+ JmsSpout.this.getSession().recover();
+ JmsSpout.this.recovered();
+ } catch (JMSException e) {
+ LOG.warn("Could not recover jms session.", e);
+ }
+ }
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
new file mode 100644
index 0000000..c990058
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.trident;
+
+/**
+ * Batch coordination metadata object for the TridentJmsSpout.
+ * This implementation does not use batch metadata, so the object is empty.
+ *
+ */
+public class JmsBatch {
+ // Empty class
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java
new file mode 100644
index 0000000..bfb78b5
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.trident;
+
+import org.apache.storm.jms.JmsMessageProducer;
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.topology.FailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import javax.jms.*;
+import java.io.Serializable;
+import java.lang.IllegalStateException;
+import java.util.List;
+
+public class JmsState implements State {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsState.class);
+
+ private Options options;
+ private Connection connection;
+ private Session session;
+ private MessageProducer messageProducer;
+
+ protected JmsState(Options options) {
+ this.options = options;
+ }
+
+ public static class Options implements Serializable {
+ private JmsProvider jmsProvider;
+ private JmsMessageProducer msgProducer;
+ private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+ private boolean jmsTransactional = true;
+
+ public Options withJmsProvider(JmsProvider provider) {
+ this.jmsProvider = provider;
+ return this;
+ }
+
+ public Options withMessageProducer(JmsMessageProducer msgProducer) {
+ this.msgProducer = msgProducer;
+ return this;
+ }
+
+ public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) {
+ this.jmsAcknowledgeMode = jmsAcknowledgeMode;
+ return this;
+ }
+
+ public Options withJmsTransactional(boolean jmsTransactional) {
+ this.jmsTransactional = jmsTransactional;
+ return this;
+ }
+ }
+
+ protected void prepare() {
+ if(this.options.jmsProvider == null || this.options.msgProducer == null){
+ throw new IllegalStateException("JMS Provider and MessageProducer not set.");
+ }
+ LOG.debug("Connecting JMS..");
+ try {
+ ConnectionFactory cf = this.options.jmsProvider.connectionFactory();
+ Destination dest = this.options.jmsProvider.destination();
+ this.connection = cf.createConnection();
+ this.session = connection.createSession(this.options.jmsTransactional,
+ this.options.jmsAcknowledgeMode);
+ this.messageProducer = session.createProducer(dest);
+
+ connection.start();
+ } catch (Exception e) {
+ LOG.warn("Error creating JMS connection.", e);
+ }
+ }
+
+ @Override
+ public void beginCommit(Long aLong) {
+ }
+
+ @Override
+ public void commit(Long aLong) {
+ LOG.debug("Committing JMS transaction.");
+ if(this.options.jmsTransactional) {
+ try {
+ session.commit();
+ } catch(JMSException e){
+ LOG.error("JMS Session commit failed.", e);
+ }
+ }
+ }
+
+ public void updateState(List<TridentTuple> tuples, TridentCollector collector) throws JMSException {
+ try {
+ for(TridentTuple tuple : tuples) {
+ Message msg = this.options.msgProducer.toMessage(this.session, tuple);
+ if (msg != null) {
+ if (msg.getJMSDestination() != null) {
+ this.messageProducer.send(msg.getJMSDestination(), msg);
+ } else {
+ this.messageProducer.send(msg);
+ }
+ }
+ }
+ } catch (JMSException e) {
+ LOG.warn("Failed to send jmd message for a trident batch ", e);
+ if(this.options.jmsTransactional) {
+ session.rollback();
+ }
+ throw new FailedException("Failed to write tuples", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
new file mode 100644
index 0000000..9a02ba9
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.trident;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class JmsStateFactory implements StateFactory {
+
+ private JmsState.Options options;
+
+ public JmsStateFactory(JmsState.Options options) {
+ this.options = options;
+ }
+
+ @Override
+ public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) {
+ JmsState state = new JmsState(options);
+ state.prepare();
+ return state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
new file mode 100644
index 0000000..a2709a4
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.trident;
+
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import javax.jms.JMSException;
+import java.util.List;
+
+public class JmsUpdater extends BaseStateUpdater<JmsState> {
+
+ @Override
+ public void updateState(JmsState jmsState, List<TridentTuple> tuples, TridentCollector collector) {
+ try {
+ jmsState.updateState(tuples, collector);
+ } catch (JMSException e) {
+ throw new FailedException("failed JMS opetation", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
new file mode 100644
index 0000000..55e29bc
--- /dev/null
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.trident;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.jms.JmsTupleProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.ITridentSpout;
+import org.apache.storm.trident.topology.TransactionAttempt;
+import org.apache.storm.Config;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsGetter;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+
+/**
+ * Trident implementation of the JmsSpout
+ * <p>
+ *
+ */
+public class TridentJmsSpout implements ITridentSpout<JmsBatch> {
+
+ public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
+
+ public static final int DEFAULT_BATCH_SIZE = 1000;
+
+ private static final long serialVersionUID = -3469351154693356655L;
+
+ private JmsTupleProducer tupleProducer;
+
+ private JmsProvider jmsProvider;
+
+ private int jmsAcknowledgeMode;
+
+ private String name;
+
+ private static int nameIndex = 1;
+
+ /**
+ * Create a TridentJmsSpout with a default name and acknowledge mode of AUTO_ACKNOWLEDGE
+ */
+ public TridentJmsSpout() {
+ this.name = "JmsSpout_"+(nameIndex++);
+ this.jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+ }
+
+ /**
+ * Set the name for this spout, to improve log identification
+ * @param name The name to be used in log messages
+ * @return This spout
+ */
+ public TridentJmsSpout named(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /**
+ * Set the <code>JmsProvider</code>
+ * implementation that this Spout will use to connect to
+ * a JMS <code>javax.jms.Desination</code>
+ *
+ * @param provider
+ */
+ public TridentJmsSpout withJmsProvider(JmsProvider provider){
+ this.jmsProvider = provider;
+ return this;
+ }
+
+ /**
+ * Set the <code>JmsTupleProducer</code>
+ * implementation that will convert <code>javax.jms.Message</code>
+ * object to <code>backtype.storm.tuple.Values</code> objects
+ * to be emitted.
+ *
+ * @param tupleProducer
+ * @return This spout
+ */
+ public TridentJmsSpout withTupleProducer(JmsTupleProducer tupleProducer) {
+ this.tupleProducer = tupleProducer;
+ return this;
+ }
+
+ /**
+ * Set the JMS acknowledge mode for messages being processed by this spout.
+ * <p/>
+ * Possible values:
+ * <ul>
+ * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+ * </ul>
+ * @param jmsAcknowledgeMode The chosen acknowledge mode
+ * @return This spout
+ * @throws IllegalArgumentException if the mode is not recognized
+ */
+ public TridentJmsSpout withJmsAcknowledgeMode(int jmsAcknowledgeMode) {
+ toDeliveryModeString(jmsAcknowledgeMode);
+ this.jmsAcknowledgeMode = jmsAcknowledgeMode;
+ return this;
+ }
+
+ /**
+ * Return a friendly string for the given JMS acknowledge mode, or throw an IllegalArgumentException if
+ * the mode is not recognized.
+ * <p/>
+ * Possible values:
+ * <ul>
+ * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+ * </ul>
+ * @param acknowledgeMode A valid JMS acknowledge mode
+ * @return A friendly string describing the acknowledge mode
+ * @throws IllegalArgumentException if the mode is not recognized
+ */
+ private static final String toDeliveryModeString(int acknowledgeMode) {
+ switch (acknowledgeMode) {
+ case Session.AUTO_ACKNOWLEDGE:
+ return "AUTO_ACKNOWLEDGE";
+ case Session.CLIENT_ACKNOWLEDGE:
+ return "CLIENT_ACKNOWLEDGE";
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ return "DUPS_OK_ACKNOWLEDGE";
+ default:
+ throw new IllegalArgumentException("Unknown JMS Acknowledge mode " + acknowledgeMode + " (See javax.jms.Session for valid values)");
+ }
+ }
+
+ @Override
+ public ITridentSpout.BatchCoordinator<JmsBatch> getCoordinator(
+ String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) {
+ return new JmsBatchCoordinator(name);
+ }
+
+ @Override
+ public Emitter<JmsBatch> getEmitter(String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) {
+ return new JmsEmitter(name, jmsProvider, tupleProducer, jmsAcknowledgeMode, conf);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ OutputFieldsGetter fieldGetter = new OutputFieldsGetter();
+ tupleProducer.declareOutputFields(fieldGetter);
+ StreamInfo streamInfo = fieldGetter.getFieldsDeclaration().get(Utils.DEFAULT_STREAM_ID);
+ if (streamInfo == null) {
+ throw new IllegalArgumentException("Jms Tuple producer has not declared output fields for the default stream");
+ }
+
+ return new Fields(streamInfo.get_output_fields());
+ }
+
+ /**
+ * The JmsEmitter class listens for incoming messages and stores them in a blocking queue. On each invocation of emit,
+ * the queued messages are emitted as a batch.
+ *
+ */
+ private class JmsEmitter implements Emitter<JmsBatch>, MessageListener {
+
+ private final LinkedBlockingQueue<Message> queue;
+ private final Connection connection;
+ private final Session session;
+
+ private final RotatingMap<Long, List<Message>> batchMessageMap; // Maps transaction Ids to JMS message ids.
+
+ private final long rotateTimeMillis;
+ private final int maxBatchSize;
+ private final String name;
+
+ private long lastRotate;
+
+ private final Logger LOG = LoggerFactory.getLogger(JmsEmitter.class);
+
+ public JmsEmitter(String name, JmsProvider jmsProvider, JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, @SuppressWarnings("rawtypes") Map conf) {
+ if (jmsProvider == null) {
+ throw new IllegalStateException("JMS provider has not been set.");
+ }
+ if (tupleProducer == null) {
+ throw new IllegalStateException("JMS Tuple Producer has not been set.");
+ }
+
+ this.queue = new LinkedBlockingQueue<Message>();
+ this.name = name;
+
+ batchMessageMap = new RotatingMap<Long, List<Message>>(3);
+ rotateTimeMillis = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
+ lastRotate = System.currentTimeMillis();
+
+ Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF);
+ maxBatchSize = batchSize != null ? batchSize.intValue() : DEFAULT_BATCH_SIZE;
+
+ try {
+ ConnectionFactory cf = jmsProvider.connectionFactory();
+ Destination dest = jmsProvider.destination();
+ this.connection = cf.createConnection();
+ this.session = connection.createSession(false, jmsAcknowledgeMode);
+ MessageConsumer consumer = session.createConsumer(dest);
+ consumer.setMessageListener(this);
+ this.connection.start();
+
+ LOG.info("Created JmsEmitter with max batch size "+maxBatchSize+" rotate time "+rotateTimeMillis+"ms and destination "+dest+" for "+name);
+
+ } catch (Exception e) {
+ LOG.warn("Error creating JMS connection.", e);
+ throw new IllegalStateException("Could not create JMS connection for spout ", e);
+ }
+
+ }
+
+ @Override
+ public void success(TransactionAttempt tx) {
+
+ @SuppressWarnings("unchecked")
+ List<Message> messages = (List<Message>) batchMessageMap.remove(tx.getTransactionId());
+
+ if (messages != null) {
+ if (!messages.isEmpty()) {
+ LOG.debug("Success for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
+ }
+
+ for (Message msg: messages) {
+ String messageId = "UnknownId";
+
+ try {
+ messageId = msg.getJMSMessageID();
+ msg.acknowledge();
+ LOG.trace("Acknowledged message "+messageId);
+ } catch (JMSException e) {
+ LOG.warn("Failed to acknowledge message "+messageId, e);
+ }
+ }
+ }
+ else {
+ LOG.warn("No messages found in batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId());
+ }
+ }
+
+ /**
+ * Fail a batch with the given transaction id. This is called when a batch is timed out, or a new batch with a
+ * matching transaction id is emitted. Note that the current implementation does nothing - i.e. it discards
+ * messages that have been failed.
+ * @param transactionId The transaction id of the failed batch
+ * @param messages The list of messages to fail.
+ */
+ private void fail(Long transactionId, List<Message> messages) {
+ LOG.debug("Failure for batch with transaction id "+transactionId+" for "+name);
+ if (messages != null) {
+ for (Message msg: messages) {
+ try {
+ LOG.trace("Failed message "+msg.getJMSMessageID());
+ } catch (JMSException e) {
+ LOG.warn("Could not identify failed message ", e);
+ }
+ }
+ }
+ else {
+ LOG.warn("Failed batch has no messages with transaction id "+transactionId);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ LOG.info("Closing JMS connection.");
+ this.session.close();
+ this.connection.close();
+ } catch (JMSException e) {
+ LOG.warn("Error closing JMS connection.", e);
+ }
+ }
+
+ @Override
+ public void emitBatch(TransactionAttempt tx, JmsBatch coordinatorMeta,
+ TridentCollector collector) {
+
+ long now = System.currentTimeMillis();
+ if(now - lastRotate > rotateTimeMillis) {
+ Map<Long, List<Message>> failed = batchMessageMap.rotate();
+ for(Long id: failed.keySet()) {
+ LOG.warn("TIMED OUT batch with transaction id "+id+" for "+name);
+ fail(id, failed.get(id));
+ }
+ lastRotate = now;
+ }
+
+ if(batchMessageMap.containsKey(tx.getTransactionId())) {
+ LOG.warn("FAILED duplicate batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
+ fail(tx.getTransactionId(), batchMessageMap.get(tx.getTransactionId()));
+ }
+
+ List<Message> batchMessages = new ArrayList<Message>();
+
+ for (int index=0; index<maxBatchSize; index++) {
+ Message msg = queue.poll();
+ if (msg == null) {
+ Utils.sleep(50); // Back off
+ break;
+ }
+
+ try {
+ if (TridentJmsSpout.this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE) {
+ batchMessages.add(msg);
+ }
+ Values tuple = tupleProducer.toTuple(msg);
+ collector.emit(tuple);
+ } catch (JMSException e) {
+ LOG.warn("Failed to emit message, could not retrieve data for "+name+": "+e );
+ }
+ }
+
+ if (!batchMessages.isEmpty()) {
+ LOG.debug("Emitting batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" and size "+batchMessages.size()+" for "+name);
+ }
+ else {
+ LOG.trace("No items to acknowledge for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
+ }
+ batchMessageMap.put(tx.getTransactionId(), batchMessages);
+ }
+
+ @Override
+ public void onMessage(Message msg) {
+ try {
+ LOG.trace("Queuing msg [" + msg.getJMSMessageID() + "]");
+ } catch (JMSException e) {
+ // Nothing here, could not get message id
+ }
+ this.queue.offer(msg);
+ }
+
+ }
+
+ /**
+ * Bare implementation of a BatchCoordinator, returning a null JmsBatch object
+ *
+ */
+ private class JmsBatchCoordinator implements BatchCoordinator<JmsBatch> {
+
+ private final String name;
+
+ private final Logger LOG = LoggerFactory.getLogger(JmsBatchCoordinator.class);
+
+ public JmsBatchCoordinator(String name) {
+ this.name = name;
+ LOG.info("Created batch coordinator for "+name);
+ }
+
+ @Override
+ public JmsBatch initializeTransaction(long txid, JmsBatch prevMetadata, JmsBatch curMetadata) {
+ LOG.debug("Initialise transaction "+txid+" for "+name);
+ return null;
+ }
+
+ @Override
+ public void success(long txid) {
+ }
+
+ @Override
+ public boolean isReady(long txid) {
+ return true;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+
+}
+
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
new file mode 100644
index 0000000..e80f70a
--- /dev/null
+++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.spout;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsSpoutTest {
+ private static final Logger LOG = LoggerFactory.getLogger(JmsSpoutTest.class);
+
+ @Test
+ public void testFailure() throws JMSException, Exception{
+ JmsSpout spout = new JmsSpout();
+ JmsProvider mockProvider = new MockJmsProvider();
+ MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector();
+ SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
+ spout.setJmsProvider(new MockJmsProvider());
+ spout.setJmsTupleProducer(new MockTupleProducer());
+ spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
+ spout.setRecoveryPeriod(10); // Rapid recovery for testing.
+ spout.open(new HashMap<String,String>(), null, collector);
+ Message msg = this.sendMessage(mockProvider.connectionFactory(), mockProvider.destination());
+ Thread.sleep(100);
+ spout.nextTuple(); // Pretend to be storm.
+ Assert.assertTrue(mockCollector.emitted);
+
+ mockCollector.reset();
+ spout.fail(msg.getJMSMessageID()); // Mock failure
+ Thread.sleep(5000);
+ spout.nextTuple(); // Pretend to be storm.
+ Thread.sleep(5000);
+ Assert.assertTrue(mockCollector.emitted); // Should have been re-emitted
+ }
+
+ @Test
+ public void testSerializability() throws IOException{
+ JmsSpout spout = new JmsSpout();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ oos.writeObject(spout);
+ oos.close();
+ Assert.assertTrue(out.toByteArray().length > 0);
+ }
+
+ public Message sendMessage(ConnectionFactory connectionFactory, Destination destination) throws JMSException {
+ Session mySess = connectionFactory.createConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = mySess.createProducer(destination);
+ TextMessage msg = mySess.createTextMessage();
+ msg.setText("Hello World");
+ LOG.info("Sending Message: {}", msg.getText());
+ producer.send(msg);
+ return msg;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
new file mode 100644
index 0000000..3ba0853
--- /dev/null
+++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.spout;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import org.apache.storm.jms.JmsProvider;
+
+public class MockJmsProvider implements JmsProvider {
+ private static final long serialVersionUID = 1L;
+
+ private ConnectionFactory connectionFactory = null;
+ private Destination destination = null;
+
+ public MockJmsProvider() throws NamingException{
+ this.connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ Context jndiContext = new InitialContext();
+ this.destination = (Destination) jndiContext.lookup("dynamicQueues/FOO.BAR");
+
+ }
+
+ /**
+ * Provides the JMS <code>ConnectionFactory</code>
+ * @return the connection factory
+ * @throws Exception
+ */
+ public ConnectionFactory connectionFactory() throws Exception{
+ return this.connectionFactory;
+ }
+
+ /**
+ * Provides the <code>Destination</code> (topic or queue) from which the
+ * <code>JmsSpout</code> will receive messages.
+ * @return
+ * @throws Exception
+ */
+ public Destination destination() throws Exception{
+ return this.destination;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
new file mode 100644
index 0000000..a5a6c51
--- /dev/null
+++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.spout;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.storm.spout.ISpoutOutputCollector;
+
+public class MockSpoutOutputCollector implements ISpoutOutputCollector {
+ boolean emitted = false;
+
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+ emitted = true;
+ return new ArrayList<Integer>();
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
+ emitted = true;
+ }
+
+ @Override
+ public void reportError(Throwable error) {
+ }
+
+ public boolean emitted(){
+ return this.emitted;
+ }
+
+ public void reset(){
+ this.emitted = false;
+ }
+
+ @Override
+ public long getPendingCount() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
new file mode 100644
index 0000000..ea571fc
--- /dev/null
+++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.spout;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import org.apache.storm.jms.JmsTupleProducer;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class MockTupleProducer implements JmsTupleProducer {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Values toTuple(Message msg) throws JMSException {
+ if (msg instanceof TextMessage) {
+ String json = ((TextMessage) msg).getText();
+ return new Values(json);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("json"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/src/test/resources/jndi.properties
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/test/resources/jndi.properties b/external/storm-jms/src/test/resources/jndi.properties
new file mode 100644
index 0000000..af19521
--- /dev/null
+++ b/external/storm-jms/src/test/resources/jndi.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
+java.naming.provider.url = vm://localhost?broker.persistent=false
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 2ff98c1..02c2823 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -46,7 +46,7 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
- <scope>provided</scope>
+ <scope>${provided.scope}</scope>
</dependency>
<!--kafka libraries-->
<dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 9c69c05..6571a21 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -126,7 +126,7 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
- <scope>provided</scope>
+ <scope>${provided.scope}</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/pom.xml b/external/storm-kinesis/pom.xml
index f1872dc..763d9c2 100644
--- a/external/storm-kinesis/pom.xml
+++ b/external/storm-kinesis/pom.xml
@@ -42,7 +42,7 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
- <scope>provided</scope>
+ <scope>${provided.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-metrics/pom.xml b/external/storm-metrics/pom.xml
index 24b34e8..ef51c56 100644
--- a/external/storm-metrics/pom.xml
+++ b/external/storm-metrics/pom.xml
@@ -60,7 +60,7 @@
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
<!-- keep storm out of the jar-with-dependencies -->
- <scope>provided</scope>
+ <scope>${provided.scope}</scope>
</dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/pom.xml b/external/storm-mongodb/pom.xml
index 7653ac8..47b6f63 100644
--- a/external/storm-mongodb/pom.xml
+++ b/external/storm-mongodb/pom.xml
@@ -1,74 +1,74 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>storm-mongodb</artifactId>
-
- <developers>
- <developer>
- <id>vesense</id>
- <name>Xin Wang</name>
- <email>data.xinwang@gmail.com</email>
- </developer>
- </developers>
-
- <properties>
- <mongodb.version>3.2.0</mongodb.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.mongodb</groupId>
- <artifactId>mongo-java-driver</artifactId>
- <version>${mongodb.version}</version>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </dependency>
- <!--test dependencies -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-mongodb</artifactId>
+
+ <developers>
+ <developer>
+ <id>vesense</id>
+ <name>Xin Wang</name>
+ <email>data.xinwang@gmail.com</email>
+ </developer>
+ </developers>
+
+ <properties>
+ <mongodb.version>3.2.0</mongodb.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>${provided.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <version>${mongodb.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <!--test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>