You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 22:34:03 UTC
[07/14] storm git commit: STORM-2416 Release Packaging Improvements
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
deleted file mode 100644
index d691e75..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.bolt;
-
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.jms.JmsMessageProducer;
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * A JmsBolt receives <code>org.apache.storm.tuple.Tuple</code> objects from a Storm
- * topology and publishes JMS Messages to a destination (topic or queue).
- * <p>
- * To use a JmsBolt in a topology, the following must be supplied:
- * <ol>
- * <li>A <code>JmsProvider</code> implementation.</li>
- * <li>A <code>JmsMessageProducer</code> implementation.</li>
- * </ol>
- * The <code>JmsProvider</code> provides the JMS <code>javax.jms.ConnectionFactory</code>
- * and <code>javax.jms.Destination</code> objects requied to publish JMS messages.
- * <p>
- * The JmsBolt uses a <code>JmsMessageProducer</code> to translate
- * <code>org.apache.storm.tuple.Tuple</code> objects into
- * <code>javax.jms.Message</code> objects for publishing.
- * <p>
- * Both JmsProvider and JmsMessageProducer must be set, or the bolt will
- * fail upon deployment to a cluster.
- * <p>
- * The JmsBolt is typically an endpoint in a topology -- in other words
- * it does not emit any tuples.
- */
-public class JmsBolt extends BaseTickTupleAwareRichBolt {
- private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class);
-
- private boolean autoAck = true;
-
- // javax.jms objects
- private Connection connection;
- private Session session;
- private MessageProducer messageProducer;
-
- // JMS options
- private boolean jmsTransactional = false;
- private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
-
- private JmsProvider jmsProvider;
- private JmsMessageProducer producer;
-
-
- private OutputCollector collector;
-
- /**
- * Set the JmsProvider used to connect to the JMS destination topic/queue
- *
- * @param provider
- */
- public void setJmsProvider(JmsProvider provider) {
- this.jmsProvider = provider;
- }
-
- /**
- * Set the JmsMessageProducer used to convert tuples
- * into JMS messages.
- *
- * @param producer
- */
- public void setJmsMessageProducer(JmsMessageProducer producer) {
- this.producer = producer;
- }
-
- /**
- * Sets the JMS acknowledgement mode for JMS messages sent
- * by this bolt.
- * <p>
- * Possible values:
- * <ul>
- * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
- * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
- * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
- * </ul>
- *
- * @param acknowledgeMode (constant defined in javax.jms.Session)
- */
- public void setJmsAcknowledgeMode(int acknowledgeMode) {
- this.jmsAcknowledgeMode = acknowledgeMode;
- }
-
- /**
- * Set the JMS transactional setting for the JMS session.
- *
- * @param transactional
- */
-// public void setJmsTransactional(boolean transactional){
-// this.jmsTransactional = transactional;
-// }
-
- /**
- * Sets whether or not tuples should be acknowledged by this
- * bolt.
- * <p>
- *
- * @param autoAck
- */
- public void setAutoAck(boolean autoAck) {
- this.autoAck = autoAck;
- }
-
-
- /**
- * Consumes a tuple and sends a JMS message.
- * <p>
- * If autoAck is true, the tuple will be acknowledged
- * after the message is sent.
- * <p>
- * If JMS sending fails, the tuple will be failed.
- */
- @Override
- protected void process(Tuple input) {
- // write the tuple to a JMS destination...
- LOG.debug("Tuple received. Sending JMS message.");
-
- try {
- Message msg = this.producer.toMessage(this.session, input);
- if (msg != null) {
- if (msg.getJMSDestination() != null) {
- this.messageProducer.send(msg.getJMSDestination(), msg);
- } else {
- this.messageProducer.send(msg);
- }
- }
- if (this.autoAck) {
- LOG.debug("ACKing tuple: " + input);
- this.collector.ack(input);
- }
- } catch (JMSException e) {
- // failed to send the JMS message, fail the tuple fast
- LOG.warn("Failing tuple: " + input);
- LOG.warn("Exception: ", e);
- this.collector.fail(input);
- }
- }
-
- /**
- * Releases JMS resources.
- */
- @Override
- public void cleanup() {
- try {
- LOG.debug("Closing JMS connection.");
- this.session.close();
- this.connection.close();
- } catch (JMSException e) {
- LOG.warn("Error closing JMS connection.", e);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- /**
- * Initializes JMS resources.
- */
- @Override
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- if (this.jmsProvider == null || this.producer == null) {
- throw new IllegalStateException("JMS Provider and MessageProducer not set.");
- }
- this.collector = collector;
- LOG.debug("Connecting JMS..");
- try {
- ConnectionFactory cf = this.jmsProvider.connectionFactory();
- Destination dest = this.jmsProvider.destination();
- this.connection = cf.createConnection();
- this.session = connection.createSession(this.jmsTransactional,
- this.jmsAcknowledgeMode);
- this.messageProducer = session.createProducer(dest);
-
- connection.start();
- } catch (Exception e) {
- LOG.warn("Error creating JMS connection.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
deleted file mode 100644
index b78a41e..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import java.io.Serializable;
-
-public class JmsMessageID implements Comparable<JmsMessageID>, Serializable {
-
- private String jmsID;
-
- private Long sequence;
-
- public JmsMessageID(long sequence, String jmsID){
- this.jmsID = jmsID;
- this.sequence = sequence;
- }
-
-
- public String getJmsID(){
- return this.jmsID;
- }
-
- @Override
- public int compareTo(JmsMessageID jmsMessageID) {
- return (int)(this.sequence - jmsMessageID.sequence);
- }
-
- @Override
- public int hashCode() {
- return this.sequence.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if(o instanceof JmsMessageID){
- JmsMessageID id = (JmsMessageID)o;
- return this.jmsID.equals(id.jmsID);
- } else {
- return false;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
deleted file mode 100644
index 6aaa7c9..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.jms.JmsTupleProducer;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-/**
- * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue
- * and outputs tuples based on the messages it receives.
- * <p>
- * <code>JmsSpout</code> instances rely on <code>JmsProducer</code> implementations
- * to obtain the JMS <code>ConnectionFactory</code> and <code>Destination</code> objects
- * necessary to connect to a JMS topic/queue.
- * <p>
- * When a <code>JmsSpout</code> receives a JMS message, it delegates to an
- * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the
- * incoming message.
- * <p>
- * Typically, developers will supply a custom <code>JmsTupleProducer</code> implementation
- * appropriate for the expected message content.
- */
-@SuppressWarnings("serial")
-public class JmsSpout extends BaseRichSpout implements MessageListener {
- private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
-
- // JMS options
- private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
- private boolean distributed = true;
-
- private JmsTupleProducer tupleProducer;
-
- private JmsProvider jmsProvider;
-
- private LinkedBlockingQueue<Message> queue;
- private TreeSet<JmsMessageID> toCommit;
- private HashMap<JmsMessageID, Message> pendingMessages;
- private long messageSequence = 0;
-
- private SpoutOutputCollector collector;
-
- private transient Connection connection;
- private transient Session session;
-
- private boolean hasFailures = false;
- public final Serializable recoveryMutex = "RECOVERY_MUTEX";
- private Timer recoveryTimer = null;
- private long recoveryPeriod = -1; // default to disabled
-
- /**
- * Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
- * <p>
- * Possible values:
- * <ul>
- * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
- * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
- * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
- * </ul>
- *
- * @param mode JMS Session Acknowledgement mode
- * @throws IllegalArgumentException if the mode is not recognized.
- */
- public void setJmsAcknowledgeMode(int mode) {
- switch (mode) {
- case Session.AUTO_ACKNOWLEDGE:
- case Session.CLIENT_ACKNOWLEDGE:
- case Session.DUPS_OK_ACKNOWLEDGE:
- break;
- default:
- throw new IllegalArgumentException("Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
-
- }
- this.jmsAcknowledgeMode = mode;
- }
-
- /**
- * Returns the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
- *
- * @return
- */
- public int getJmsAcknowledgeMode() {
- return this.jmsAcknowledgeMode;
- }
-
- /**
- * Set the <code>JmsProvider</code>
- * implementation that this Spout will use to connect to
- * a JMS <code>javax.jms.Desination</code>
- *
- * @param provider
- */
- public void setJmsProvider(JmsProvider provider) {
- this.jmsProvider = provider;
- }
-
- /**
- * Set the <code>JmsTupleProducer</code>
- * implementation that will convert <code>javax.jms.Message</code>
- * object to <code>org.apache.storm.tuple.Values</code> objects
- * to be emitted.
- *
- * @param producer
- */
- public void setJmsTupleProducer(JmsTupleProducer producer) {
- this.tupleProducer = producer;
- }
-
- /**
- * <code>javax.jms.MessageListener</code> implementation.
- * <p>
- * Stored the JMS message in an internal queue for processing
- * by the <code>nextTuple()</code> method.
- */
- public void onMessage(Message msg) {
- try {
- LOG.debug("Queuing msg [" + msg.getJMSMessageID() + "]");
- } catch (JMSException e) {
- }
- this.queue.offer(msg);
- }
-
- /**
- * <code>ISpout</code> implementation.
- * <p>
- * Connects the JMS spout to the configured JMS destination
- * topic/queue.
- */
- @SuppressWarnings("rawtypes")
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- if (this.jmsProvider == null) {
- throw new IllegalStateException("JMS provider has not been set.");
- }
- if (this.tupleProducer == null) {
- throw new IllegalStateException("JMS Tuple Producer has not been set.");
- }
- Integer topologyTimeout = (Integer) conf.get("topology.message.timeout.secs");
- // TODO fine a way to get the default timeout from storm, so we're not hard-coding to 30 seconds (it could change)
- topologyTimeout = topologyTimeout == null ? 30 : topologyTimeout;
- if ((topologyTimeout.intValue() * 1000) > this.recoveryPeriod) {
- LOG.warn("*** WARNING *** : " +
- "Recovery period (" + this.recoveryPeriod + " ms.) is less then the configured " +
- "'topology.message.timeout.secs' of " + topologyTimeout +
- " secs. This could lead to a message replay flood!");
- }
- this.queue = new LinkedBlockingQueue<Message>();
- this.toCommit = new TreeSet<JmsMessageID>();
- this.pendingMessages = new HashMap<JmsMessageID, Message>();
- this.collector = collector;
- try {
- ConnectionFactory cf = this.jmsProvider.connectionFactory();
- Destination dest = this.jmsProvider.destination();
- this.connection = cf.createConnection();
- this.session = connection.createSession(false,
- this.jmsAcknowledgeMode);
- MessageConsumer consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
- this.connection.start();
- if (this.isDurableSubscription() && this.recoveryPeriod > 0) {
- this.recoveryTimer = new Timer();
- this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 10, this.recoveryPeriod);
- }
-
- } catch (Exception e) {
- LOG.warn("Error creating JMS connection.", e);
- }
-
- }
-
- public void close() {
- try {
- LOG.debug("Closing JMS connection.");
- this.session.close();
- this.connection.close();
- } catch (JMSException e) {
- LOG.warn("Error closing JMS connection.", e);
- }
-
- }
-
- public void nextTuple() {
- Message msg = this.queue.poll();
- if (msg == null) {
- Utils.sleep(50);
- } else {
-
- LOG.debug("sending tuple: " + msg);
- // get the tuple from the handler
- try {
- Values vals = this.tupleProducer.toTuple(msg);
- // ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE
- LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode()));
- LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode));
- if (this.isDurableSubscription()) {
- LOG.debug("Requesting acks.");
- JmsMessageID messageId = new JmsMessageID(this.messageSequence++, msg.getJMSMessageID());
- this.collector.emit(vals, messageId);
-
- // at this point we successfully emitted. Store
- // the message and message ID so we can do a
- // JMS acknowledge later
- this.pendingMessages.put(messageId, msg);
- this.toCommit.add(messageId);
- } else {
- this.collector.emit(vals);
- }
- } catch (JMSException e) {
- LOG.warn("Unable to convert JMS message: " + msg);
- }
-
- }
-
- }
-
- /*
- * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
- */
- public void ack(Object msgId) {
-
- Message msg = this.pendingMessages.remove(msgId);
- JmsMessageID oldest = this.toCommit.first();
- if (msgId.equals(oldest)) {
- if (msg != null) {
- try {
- LOG.debug("Committing...");
- msg.acknowledge();
- LOG.debug("JMS Message acked: " + msgId);
- this.toCommit.remove(msgId);
- } catch (JMSException e) {
- LOG.warn("Error acknowldging JMS message: " + msgId, e);
- }
- } else {
- LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId);
- }
- } else {
- this.toCommit.remove(msgId);
- }
-
- }
-
- /*
- * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
- */
- public void fail(Object msgId) {
- LOG.warn("Message failed: " + msgId);
- this.pendingMessages.clear();
- this.toCommit.clear();
- synchronized (this.recoveryMutex) {
- this.hasFailures = true;
- }
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- this.tupleProducer.declareOutputFields(declarer);
-
- }
-
- /**
- * Returns <code>true</code> if the spout has received failures
- * from which it has not yet recovered.
- */
- public boolean hasFailures() {
- return this.hasFailures;
- }
-
- protected void recovered() {
- this.hasFailures = false;
- }
-
- /**
- * Sets the periodicity of the timer task that
- * checks for failures and recovers the JMS session.
- *
- * @param period
- */
- public void setRecoveryPeriod(long period) {
- this.recoveryPeriod = period;
- }
-
- public boolean isDistributed() {
- return this.distributed;
- }
-
- /**
- * Sets the "distributed" mode of this spout.
- * <p>
- * If <code>true</code> multiple instances of this spout <i>may</i> be
- * created across the cluster (depending on the "parallelism_hint" in the topology configuration).
- * <p>
- * Setting this value to <code>false</code> essentially means this spout will run as a singleton
- * within the cluster ("parallelism_hint" will be ignored).
- * <p>
- * In general, this should be set to <code>false</code> if the underlying JMS destination is a
- * topic, and <code>true</code> if it is a JMS queue.
- *
- * @param distributed
- */
- public void setDistributed(boolean distributed) {
- this.distributed = distributed;
- }
-
-
- private static final String toDeliveryModeString(int deliveryMode) {
- switch (deliveryMode) {
- case Session.AUTO_ACKNOWLEDGE:
- return "AUTO_ACKNOWLEDGE";
- case Session.CLIENT_ACKNOWLEDGE:
- return "CLIENT_ACKNOWLEDGE";
- case Session.DUPS_OK_ACKNOWLEDGE:
- return "DUPS_OK_ACKNOWLEDGE";
- default:
- return "UNKNOWN";
-
- }
- }
-
- protected Session getSession() {
- return this.session;
- }
-
- private boolean isDurableSubscription() {
- return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE);
- }
-
-
- private class RecoveryTask extends TimerTask {
- private final Logger LOG = LoggerFactory.getLogger(RecoveryTask.class);
-
- public void run() {
- synchronized (JmsSpout.this.recoveryMutex) {
- if (JmsSpout.this.hasFailures()) {
- try {
- LOG.info("Recovering from a message failure.");
- JmsSpout.this.getSession().recover();
- JmsSpout.this.recovered();
- } catch (JMSException e) {
- LOG.warn("Could not recover jms session.", e);
- }
- }
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
deleted file mode 100644
index c990058..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.trident;
-
-/**
- * Batch coordination metadata object for the TridentJmsSpout.
- * This implementation does not use batch metadata, so the object is empty.
- *
- */
-public class JmsBatch {
- // Empty class
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java
deleted file mode 100644
index bfb78b5..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.trident;
-
-import org.apache.storm.jms.JmsMessageProducer;
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.topology.FailedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import javax.jms.*;
-import java.io.Serializable;
-import java.lang.IllegalStateException;
-import java.util.List;
-
-public class JmsState implements State {
-
- private static final Logger LOG = LoggerFactory.getLogger(JmsState.class);
-
- private Options options;
- private Connection connection;
- private Session session;
- private MessageProducer messageProducer;
-
- protected JmsState(Options options) {
- this.options = options;
- }
-
- public static class Options implements Serializable {
- private JmsProvider jmsProvider;
- private JmsMessageProducer msgProducer;
- private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
- private boolean jmsTransactional = true;
-
- public Options withJmsProvider(JmsProvider provider) {
- this.jmsProvider = provider;
- return this;
- }
-
- public Options withMessageProducer(JmsMessageProducer msgProducer) {
- this.msgProducer = msgProducer;
- return this;
- }
-
- public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) {
- this.jmsAcknowledgeMode = jmsAcknowledgeMode;
- return this;
- }
-
- public Options withJmsTransactional(boolean jmsTransactional) {
- this.jmsTransactional = jmsTransactional;
- return this;
- }
- }
-
- protected void prepare() {
- if(this.options.jmsProvider == null || this.options.msgProducer == null){
- throw new IllegalStateException("JMS Provider and MessageProducer not set.");
- }
- LOG.debug("Connecting JMS..");
- try {
- ConnectionFactory cf = this.options.jmsProvider.connectionFactory();
- Destination dest = this.options.jmsProvider.destination();
- this.connection = cf.createConnection();
- this.session = connection.createSession(this.options.jmsTransactional,
- this.options.jmsAcknowledgeMode);
- this.messageProducer = session.createProducer(dest);
-
- connection.start();
- } catch (Exception e) {
- LOG.warn("Error creating JMS connection.", e);
- }
- }
-
- @Override
- public void beginCommit(Long aLong) {
- }
-
- @Override
- public void commit(Long aLong) {
- LOG.debug("Committing JMS transaction.");
- if(this.options.jmsTransactional) {
- try {
- session.commit();
- } catch(JMSException e){
- LOG.error("JMS Session commit failed.", e);
- }
- }
- }
-
- public void updateState(List<TridentTuple> tuples, TridentCollector collector) throws JMSException {
- try {
- for(TridentTuple tuple : tuples) {
- Message msg = this.options.msgProducer.toMessage(this.session, tuple);
- if (msg != null) {
- if (msg.getJMSDestination() != null) {
- this.messageProducer.send(msg.getJMSDestination(), msg);
- } else {
- this.messageProducer.send(msg);
- }
- }
- }
- } catch (JMSException e) {
- LOG.warn("Failed to send jmd message for a trident batch ", e);
- if(this.options.jmsTransactional) {
- session.rollback();
- }
- throw new FailedException("Failed to write tuples", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
deleted file mode 100644
index 9a02ba9..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.trident;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-import java.util.Map;
-
-public class JmsStateFactory implements StateFactory {
-
- private JmsState.Options options;
-
- public JmsStateFactory(JmsState.Options options) {
- this.options = options;
- }
-
- @Override
- public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) {
- JmsState state = new JmsState(options);
- state.prepare();
- return state;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
deleted file mode 100644
index a2709a4..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.trident;
-
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import javax.jms.JMSException;
-import java.util.List;
-
-public class JmsUpdater extends BaseStateUpdater<JmsState> {
-
- @Override
- public void updateState(JmsState jmsState, List<TridentTuple> tuples, TridentCollector collector) {
- try {
- jmsState.updateState(tuples, collector);
- } catch (JMSException e) {
- throw new FailedException("failed JMS opetation", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
deleted file mode 100644
index 55e29bc..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.trident;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.jms.JmsTupleProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.ITridentSpout;
-import org.apache.storm.trident.topology.TransactionAttempt;
-import org.apache.storm.Config;
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsGetter;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.RotatingMap;
-import org.apache.storm.utils.Utils;
-
-/**
- * Trident implementation of the JmsSpout
- * <p>
- *
- */
-public class TridentJmsSpout implements ITridentSpout<JmsBatch> {
-
- public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
-
- public static final int DEFAULT_BATCH_SIZE = 1000;
-
- private static final long serialVersionUID = -3469351154693356655L;
-
- private JmsTupleProducer tupleProducer;
-
- private JmsProvider jmsProvider;
-
- private int jmsAcknowledgeMode;
-
- private String name;
-
- private static int nameIndex = 1;
-
- /**
- * Create a TridentJmsSpout with a default name and acknowledge mode of AUTO_ACKNOWLEDGE
- */
- public TridentJmsSpout() {
- this.name = "JmsSpout_"+(nameIndex++);
- this.jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
- }
-
- /**
- * Set the name for this spout, to improve log identification
- * @param name The name to be used in log messages
- * @return This spout
- */
- public TridentJmsSpout named(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Set the <code>JmsProvider</code>
- * implementation that this Spout will use to connect to
- * a JMS <code>javax.jms.Desination</code>
- *
- * @param provider
- */
- public TridentJmsSpout withJmsProvider(JmsProvider provider){
- this.jmsProvider = provider;
- return this;
- }
-
- /**
- * Set the <code>JmsTupleProducer</code>
- * implementation that will convert <code>javax.jms.Message</code>
- * object to <code>backtype.storm.tuple.Values</code> objects
- * to be emitted.
- *
- * @param tupleProducer
- * @return This spout
- */
- public TridentJmsSpout withTupleProducer(JmsTupleProducer tupleProducer) {
- this.tupleProducer = tupleProducer;
- return this;
- }
-
- /**
- * Set the JMS acknowledge mode for messages being processed by this spout.
- * <p/>
- * Possible values:
- * <ul>
- * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
- * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
- * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
- * </ul>
- * @param jmsAcknowledgeMode The chosen acknowledge mode
- * @return This spout
- * @throws IllegalArgumentException if the mode is not recognized
- */
- public TridentJmsSpout withJmsAcknowledgeMode(int jmsAcknowledgeMode) {
- toDeliveryModeString(jmsAcknowledgeMode);
- this.jmsAcknowledgeMode = jmsAcknowledgeMode;
- return this;
- }
-
- /**
- * Return a friendly string for the given JMS acknowledge mode, or throw an IllegalArgumentException if
- * the mode is not recognized.
- * <p/>
- * Possible values:
- * <ul>
- * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
- * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
- * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
- * </ul>
- * @param acknowledgeMode A valid JMS acknowledge mode
- * @return A friendly string describing the acknowledge mode
- * @throws IllegalArgumentException if the mode is not recognized
- */
- private static final String toDeliveryModeString(int acknowledgeMode) {
- switch (acknowledgeMode) {
- case Session.AUTO_ACKNOWLEDGE:
- return "AUTO_ACKNOWLEDGE";
- case Session.CLIENT_ACKNOWLEDGE:
- return "CLIENT_ACKNOWLEDGE";
- case Session.DUPS_OK_ACKNOWLEDGE:
- return "DUPS_OK_ACKNOWLEDGE";
- default:
- throw new IllegalArgumentException("Unknown JMS Acknowledge mode " + acknowledgeMode + " (See javax.jms.Session for valid values)");
- }
- }
-
- @Override
- public ITridentSpout.BatchCoordinator<JmsBatch> getCoordinator(
- String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) {
- return new JmsBatchCoordinator(name);
- }
-
- @Override
- public Emitter<JmsBatch> getEmitter(String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) {
- return new JmsEmitter(name, jmsProvider, tupleProducer, jmsAcknowledgeMode, conf);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- @Override
- public Fields getOutputFields() {
- OutputFieldsGetter fieldGetter = new OutputFieldsGetter();
- tupleProducer.declareOutputFields(fieldGetter);
- StreamInfo streamInfo = fieldGetter.getFieldsDeclaration().get(Utils.DEFAULT_STREAM_ID);
- if (streamInfo == null) {
- throw new IllegalArgumentException("Jms Tuple producer has not declared output fields for the default stream");
- }
-
- return new Fields(streamInfo.get_output_fields());
- }
-
- /**
- * The JmsEmitter class listens for incoming messages and stores them in a blocking queue. On each invocation of emit,
- * the queued messages are emitted as a batch.
- *
- */
- private class JmsEmitter implements Emitter<JmsBatch>, MessageListener {
-
- private final LinkedBlockingQueue<Message> queue;
- private final Connection connection;
- private final Session session;
-
- private final RotatingMap<Long, List<Message>> batchMessageMap; // Maps transaction Ids to JMS message ids.
-
- private final long rotateTimeMillis;
- private final int maxBatchSize;
- private final String name;
-
- private long lastRotate;
-
- private final Logger LOG = LoggerFactory.getLogger(JmsEmitter.class);
-
- public JmsEmitter(String name, JmsProvider jmsProvider, JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, @SuppressWarnings("rawtypes") Map conf) {
- if (jmsProvider == null) {
- throw new IllegalStateException("JMS provider has not been set.");
- }
- if (tupleProducer == null) {
- throw new IllegalStateException("JMS Tuple Producer has not been set.");
- }
-
- this.queue = new LinkedBlockingQueue<Message>();
- this.name = name;
-
- batchMessageMap = new RotatingMap<Long, List<Message>>(3);
- rotateTimeMillis = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
- lastRotate = System.currentTimeMillis();
-
- Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF);
- maxBatchSize = batchSize != null ? batchSize.intValue() : DEFAULT_BATCH_SIZE;
-
- try {
- ConnectionFactory cf = jmsProvider.connectionFactory();
- Destination dest = jmsProvider.destination();
- this.connection = cf.createConnection();
- this.session = connection.createSession(false, jmsAcknowledgeMode);
- MessageConsumer consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
- this.connection.start();
-
- LOG.info("Created JmsEmitter with max batch size "+maxBatchSize+" rotate time "+rotateTimeMillis+"ms and destination "+dest+" for "+name);
-
- } catch (Exception e) {
- LOG.warn("Error creating JMS connection.", e);
- throw new IllegalStateException("Could not create JMS connection for spout ", e);
- }
-
- }
-
- @Override
- public void success(TransactionAttempt tx) {
-
- @SuppressWarnings("unchecked")
- List<Message> messages = (List<Message>) batchMessageMap.remove(tx.getTransactionId());
-
- if (messages != null) {
- if (!messages.isEmpty()) {
- LOG.debug("Success for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
- }
-
- for (Message msg: messages) {
- String messageId = "UnknownId";
-
- try {
- messageId = msg.getJMSMessageID();
- msg.acknowledge();
- LOG.trace("Acknowledged message "+messageId);
- } catch (JMSException e) {
- LOG.warn("Failed to acknowledge message "+messageId, e);
- }
- }
- }
- else {
- LOG.warn("No messages found in batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId());
- }
- }
-
- /**
- * Fail a batch with the given transaction id. This is called when a batch is timed out, or a new batch with a
- * matching transaction id is emitted. Note that the current implementation does nothing - i.e. it discards
- * messages that have been failed.
- * @param transactionId The transaction id of the failed batch
- * @param messages The list of messages to fail.
- */
- private void fail(Long transactionId, List<Message> messages) {
- LOG.debug("Failure for batch with transaction id "+transactionId+" for "+name);
- if (messages != null) {
- for (Message msg: messages) {
- try {
- LOG.trace("Failed message "+msg.getJMSMessageID());
- } catch (JMSException e) {
- LOG.warn("Could not identify failed message ", e);
- }
- }
- }
- else {
- LOG.warn("Failed batch has no messages with transaction id "+transactionId);
- }
- }
-
- @Override
- public void close() {
- try {
- LOG.info("Closing JMS connection.");
- this.session.close();
- this.connection.close();
- } catch (JMSException e) {
- LOG.warn("Error closing JMS connection.", e);
- }
- }
-
- @Override
- public void emitBatch(TransactionAttempt tx, JmsBatch coordinatorMeta,
- TridentCollector collector) {
-
- long now = System.currentTimeMillis();
- if(now - lastRotate > rotateTimeMillis) {
- Map<Long, List<Message>> failed = batchMessageMap.rotate();
- for(Long id: failed.keySet()) {
- LOG.warn("TIMED OUT batch with transaction id "+id+" for "+name);
- fail(id, failed.get(id));
- }
- lastRotate = now;
- }
-
- if(batchMessageMap.containsKey(tx.getTransactionId())) {
- LOG.warn("FAILED duplicate batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
- fail(tx.getTransactionId(), batchMessageMap.get(tx.getTransactionId()));
- }
-
- List<Message> batchMessages = new ArrayList<Message>();
-
- for (int index=0; index<maxBatchSize; index++) {
- Message msg = queue.poll();
- if (msg == null) {
- Utils.sleep(50); // Back off
- break;
- }
-
- try {
- if (TridentJmsSpout.this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE) {
- batchMessages.add(msg);
- }
- Values tuple = tupleProducer.toTuple(msg);
- collector.emit(tuple);
- } catch (JMSException e) {
- LOG.warn("Failed to emit message, could not retrieve data for "+name+": "+e );
- }
- }
-
- if (!batchMessages.isEmpty()) {
- LOG.debug("Emitting batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" and size "+batchMessages.size()+" for "+name);
- }
- else {
- LOG.trace("No items to acknowledge for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
- }
- batchMessageMap.put(tx.getTransactionId(), batchMessages);
- }
-
- @Override
- public void onMessage(Message msg) {
- try {
- LOG.trace("Queuing msg [" + msg.getJMSMessageID() + "]");
- } catch (JMSException e) {
- // Nothing here, could not get message id
- }
- this.queue.offer(msg);
- }
-
- }
-
- /**
- * Bare implementation of a BatchCoordinator, returning a null JmsBatch object
- *
- */
- private class JmsBatchCoordinator implements BatchCoordinator<JmsBatch> {
-
- private final String name;
-
- private final Logger LOG = LoggerFactory.getLogger(JmsBatchCoordinator.class);
-
- public JmsBatchCoordinator(String name) {
- this.name = name;
- LOG.info("Created batch coordinator for "+name);
- }
-
- @Override
- public JmsBatch initializeTransaction(long txid, JmsBatch prevMetadata, JmsBatch curMetadata) {
- LOG.debug("Initialise transaction "+txid+" for "+name);
- return null;
- }
-
- @Override
- public void success(long txid) {
- }
-
- @Override
- public boolean isReady(long txid) {
- return true;
- }
-
- @Override
- public void close() {
- }
-
- }
-
-}
-
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
deleted file mode 100644
index e80f70a..0000000
--- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsSpoutTest {
- private static final Logger LOG = LoggerFactory.getLogger(JmsSpoutTest.class);
-
- @Test
- public void testFailure() throws JMSException, Exception{
- JmsSpout spout = new JmsSpout();
- JmsProvider mockProvider = new MockJmsProvider();
- MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector();
- SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
- spout.setJmsProvider(new MockJmsProvider());
- spout.setJmsTupleProducer(new MockTupleProducer());
- spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
- spout.setRecoveryPeriod(10); // Rapid recovery for testing.
- spout.open(new HashMap<String,String>(), null, collector);
- Message msg = this.sendMessage(mockProvider.connectionFactory(), mockProvider.destination());
- Thread.sleep(100);
- spout.nextTuple(); // Pretend to be storm.
- Assert.assertTrue(mockCollector.emitted);
-
- mockCollector.reset();
- spout.fail(msg.getJMSMessageID()); // Mock failure
- Thread.sleep(5000);
- spout.nextTuple(); // Pretend to be storm.
- Thread.sleep(5000);
- Assert.assertTrue(mockCollector.emitted); // Should have been re-emitted
- }
-
- @Test
- public void testSerializability() throws IOException{
- JmsSpout spout = new JmsSpout();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(out);
- oos.writeObject(spout);
- oos.close();
- Assert.assertTrue(out.toByteArray().length > 0);
- }
-
- public Message sendMessage(ConnectionFactory connectionFactory, Destination destination) throws JMSException {
- Session mySess = connectionFactory.createConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = mySess.createProducer(destination);
- TextMessage msg = mySess.createTextMessage();
- msg.setText("Hello World");
- LOG.info("Sending Message: {}", msg.getText());
- producer.send(msg);
- return msg;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
deleted file mode 100644
index 3ba0853..0000000
--- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-
-import org.apache.storm.jms.JmsProvider;
-
-public class MockJmsProvider implements JmsProvider {
- private static final long serialVersionUID = 1L;
-
- private ConnectionFactory connectionFactory = null;
- private Destination destination = null;
-
- public MockJmsProvider() throws NamingException{
- this.connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
- Context jndiContext = new InitialContext();
- this.destination = (Destination) jndiContext.lookup("dynamicQueues/FOO.BAR");
-
- }
-
- /**
- * Provides the JMS <code>ConnectionFactory</code>
- * @return the connection factory
- * @throws Exception
- */
- public ConnectionFactory connectionFactory() throws Exception{
- return this.connectionFactory;
- }
-
- /**
- * Provides the <code>Destination</code> (topic or queue) from which the
- * <code>JmsSpout</code> will receive messages.
- * @return
- * @throws Exception
- */
- public Destination destination() throws Exception{
- return this.destination;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
deleted file mode 100644
index a5a6c51..0000000
--- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.storm.spout.ISpoutOutputCollector;
-
-public class MockSpoutOutputCollector implements ISpoutOutputCollector {
- boolean emitted = false;
-
- @Override
- public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
- emitted = true;
- return new ArrayList<Integer>();
- }
-
- @Override
- public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
- emitted = true;
- }
-
- @Override
- public void reportError(Throwable error) {
- }
-
- public boolean emitted(){
- return this.emitted;
- }
-
- public void reset(){
- this.emitted = false;
- }
-
- @Override
- public long getPendingCount() {
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
deleted file mode 100644
index ea571fc..0000000
--- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-
-import org.apache.storm.jms.JmsTupleProducer;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class MockTupleProducer implements JmsTupleProducer {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Values toTuple(Message msg) throws JMSException {
- if (msg instanceof TextMessage) {
- String json = ((TextMessage) msg).getText();
- return new Values(json);
- } else {
- return null;
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("json"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/core/src/test/resources/jndi.properties
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/test/resources/jndi.properties b/external/storm-jms/core/src/test/resources/jndi.properties
deleted file mode 100644
index af19521..0000000
--- a/external/storm-jms/core/src/test/resources/jndi.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
-java.naming.provider.url = vm://localhost?broker.persistent=false
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/examples/README.markdown
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/README.markdown b/external/storm-jms/examples/README.markdown
deleted file mode 100644
index 7a4d8f0..0000000
--- a/external/storm-jms/examples/README.markdown
+++ /dev/null
@@ -1,12 +0,0 @@
-## About Storm JMS Examples
-This project contains a simple storm topology that illustrates the usage of "storm-jms".
-
-To build:
-
-`mvn clean install`
-
-The default build will create a jar file that can be deployed to to a Storm cluster in the "target" directory:
-
-`storm-jms-examples-0.1-SNAPSHOT-jar-with-dependencies.jar`
-
-
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/examples/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/pom.xml b/external/storm-jms/examples/pom.xml
deleted file mode 100644
index b099b88..0000000
--- a/external/storm-jms/examples/pom.xml
+++ /dev/null
@@ -1,150 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-jms-parent</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
-
- <artifactId>storm-jms-examples</artifactId>
-
- <properties>
- <spring.version>2.5.6</spring.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-beans</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-core</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-jms</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.xbean</groupId>
- <artifactId>xbean-spring</artifactId>
- <version>3.7</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <!-- keep storm out of the jar-with-dependencies -->
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-jms</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-core</artifactId>
- <version>5.4.0</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <!-- bind the maven-assembly-plugin to the package phase this will create
- a jar file without the storm dependencies suitable for deployment to a cluster. -->
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- <archive>
- <manifest>
- </manifest>
- </archive>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
-
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <version>1.2.1</version>
- <executions>
- <execution>
- <goals>
- <goal>exec</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <executable>java</executable>
- <includeProjectDependencies>true</includeProjectDependencies>
- <includePluginDependencies>true</includePluginDependencies>
- <mainClass>org.apache.storm.jms.example.ExampleJmsTopology</mainClass>
- <systemProperties>
- <systemProperty>
- <key>log4j.configuration</key>
- <value>file:./src/main/resources/log4j.properties</value>
- </systemProperty>
- </systemProperties>
- </configuration>
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- </dependency>
- </dependencies>
- </plugin>
- </plugins>
- </build>
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
deleted file mode 100644
index 82dbd5b..0000000
--- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.example;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.jms.JmsMessageProducer;
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.jms.JmsTupleProducer;
-import org.apache.storm.jms.bolt.JmsBolt;
-import org.apache.storm.jms.spout.JmsSpout;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.ITuple;
-import org.apache.storm.utils.Utils;
-
-public class ExampleJmsTopology {
- public static final String JMS_QUEUE_SPOUT = "JMS_QUEUE_SPOUT";
- public static final String INTERMEDIATE_BOLT = "INTERMEDIATE_BOLT";
- public static final String FINAL_BOLT = "FINAL_BOLT";
- public static final String JMS_TOPIC_BOLT = "JMS_TOPIC_BOLT";
- public static final String JMS_TOPIC_SPOUT = "JMS_TOPIC_SPOUT";
- public static final String ANOTHER_BOLT = "ANOTHER_BOLT";
-
- @SuppressWarnings("serial")
- public static void main(String[] args) throws Exception {
-
- // JMS Queue Provider
- JmsProvider jmsQueueProvider = new SpringJmsProvider(
- "jms-activemq.xml", "jmsConnectionFactory",
- "notificationQueue");
-
- // JMS Topic provider
- JmsProvider jmsTopicProvider = new SpringJmsProvider(
- "jms-activemq.xml", "jmsConnectionFactory",
- "notificationTopic");
-
- // JMS Producer
- JmsTupleProducer producer = new JsonTupleProducer();
-
- // JMS Queue Spout
- JmsSpout queueSpout = new JmsSpout();
- queueSpout.setJmsProvider(jmsQueueProvider);
- queueSpout.setJmsTupleProducer(producer);
- queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
- queueSpout.setDistributed(true); // allow multiple instances
-
- TopologyBuilder builder = new TopologyBuilder();
-
- // spout with 5 parallel instances
- builder.setSpout(JMS_QUEUE_SPOUT, queueSpout, 5);
-
- // intermediate bolt, subscribes to jms spout, anchors on tuples, and auto-acks
- builder.setBolt(INTERMEDIATE_BOLT,
- new GenericBolt("INTERMEDIATE_BOLT", true, true, new Fields("json")), 3).shuffleGrouping(
- JMS_QUEUE_SPOUT);
-
- // bolt that subscribes to the intermediate bolt, and auto-acks
- // messages.
- builder.setBolt(FINAL_BOLT, new GenericBolt("FINAL_BOLT", true, true), 3).shuffleGrouping(
- INTERMEDIATE_BOLT);
-
- // bolt that subscribes to the intermediate bolt, and publishes to a JMS Topic
- JmsBolt jmsBolt = new JmsBolt();
- jmsBolt.setJmsProvider(jmsTopicProvider);
-
- // anonymous message producer just calls toString() on the tuple to create a jms message
- jmsBolt.setJmsMessageProducer(new JmsMessageProducer() {
- @Override
- public Message toMessage(Session session, ITuple input) throws JMSException {
- System.out.println("Sending JMS Message:" + input.toString());
- TextMessage tm = session.createTextMessage(input.toString());
- return tm;
- }
- });
-
- builder.setBolt(JMS_TOPIC_BOLT, jmsBolt).shuffleGrouping(INTERMEDIATE_BOLT);
-
- // JMS Topic spout
- JmsSpout topicSpout = new JmsSpout();
- topicSpout.setJmsProvider(jmsTopicProvider);
- topicSpout.setJmsTupleProducer(producer);
- topicSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
- topicSpout.setDistributed(false);
-
- builder.setSpout(JMS_TOPIC_SPOUT, topicSpout);
-
- builder.setBolt(ANOTHER_BOLT, new GenericBolt("ANOTHER_BOLT", true, true), 1).shuffleGrouping(
- JMS_TOPIC_SPOUT);
-
- Config conf = new Config();
-
- if (args.length > 0) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopology(args[0], conf,
- builder.createTopology());
- } else {
-
- conf.setDebug(true);
-
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("storm-jms-example", conf, builder.createTopology());) {
- Utils.sleep(60000);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
deleted file mode 100644
index 57de1ba..0000000
--- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.example;
-
-import java.util.Map;
-
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * A generic <code>org.apache.storm.topology.IRichBolt</code> implementation
- * for testing/debugging the Storm JMS Spout and example topologies.
- * <p>
- * For debugging purposes, set the log level of the
- * <code>org.apache.storm.contrib.jms</code> package to DEBUG for debugging
- * output.
- *
- * @author tgoetz
- */
-@SuppressWarnings("serial")
-public class GenericBolt extends BaseRichBolt {
- private static final Logger LOG = LoggerFactory.getLogger(GenericBolt.class);
- private OutputCollector collector;
- private boolean autoAck = false;
- private boolean autoAnchor = false;
- private Fields declaredFields;
- private String name;
-
- /**
- * Constructs a new <code>GenericBolt</code> instance.
- *
- * @param name The name of the bolt (used in DEBUG logging)
- * @param autoAck Whether or not this bolt should automatically acknowledge received tuples.
- * @param autoAnchor Whether or not this bolt should automatically anchor to received tuples.
- * @param declaredFields The fields this bolt declares as output.
- */
- public GenericBolt(String name, boolean autoAck, boolean autoAnchor, Fields declaredFields) {
- this.name = name;
- this.autoAck = autoAck;
- this.autoAnchor = autoAnchor;
- this.declaredFields = declaredFields;
- }
-
- public GenericBolt(String name, boolean autoAck, boolean autoAnchor) {
- this(name, autoAck, autoAnchor, null);
- }
-
- @SuppressWarnings("rawtypes")
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
-
- }
-
- public void execute(Tuple input) {
- LOG.debug("[" + this.name + "] Received message: " + input);
-
-
- // only emit if we have declared fields.
- if (this.declaredFields != null) {
- LOG.debug("[" + this.name + "] emitting: " + input);
- if (this.autoAnchor) {
- this.collector.emit(input, input.getValues());
- } else {
- this.collector.emit(input.getValues());
- }
- }
-
- if (this.autoAck) {
- LOG.debug("[" + this.name + "] ACKing tuple: " + input);
- this.collector.ack(input);
- }
-
- }
-
- public void cleanup() {
-
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- if (this.declaredFields != null) {
- declarer.declare(this.declaredFields);
- }
- }
-
- public boolean isAutoAck() {
- return this.autoAck;
- }
-
- public void setAutoAck(boolean autoAck) {
- this.autoAck = autoAck;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
deleted file mode 100644
index 9ee175e..0000000
--- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.example;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-
-import org.apache.storm.jms.JmsTupleProducer;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-/**
- * A simple <code>JmsTupleProducer</code> that expects to receive
- * JMS <code>TextMessage</code> objects with a body in JSON format.
- * <p/>
- * Ouputs a tuple with field name "json" and a string value
- * containing the raw json.
- * <p/>
- * <b>NOTE: </b> Currently this implementation assumes the text is valid
- * JSON and does not attempt to parse or validate it.
- *
- * @author tgoetz
- *
- */
-@SuppressWarnings("serial")
-public class JsonTupleProducer implements JmsTupleProducer {
-
- public Values toTuple(Message msg) throws JMSException {
- if(msg instanceof TextMessage){
- String json = ((TextMessage) msg).getText();
- return new Values(json);
- } else {
- return null;
- }
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("json"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
deleted file mode 100644
index 306fc25..0000000
--- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.example;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.ClassPathXmlApplicationContext;
-
-import org.apache.storm.jms.JmsProvider;
-
-
-/**
- * A <code>JmsProvider</code> that uses the spring framework
- * to obtain a JMS <code>ConnectionFactory</code> and
- * <code>Desitnation</code> objects.
- * <p/>
- * The constructor takes three arguments:
- * <ol>
- * <li>A string pointing to the the spring application context file contining the JMS configuration
- * (must be on the classpath)
- * </li>
- * <li>The name of the connection factory bean</li>
- * <li>The name of the destination bean</li>
- * </ol>
- *
- *
- *
- */
-@SuppressWarnings("serial")
-public class SpringJmsProvider implements JmsProvider {
- private ConnectionFactory connectionFactory;
- private Destination destination;
-
- /**
- * Constructs a <code>SpringJmsProvider</code> object given the name of a
- * classpath resource (the spring application context file), and the bean
- * names of a JMS connection factory and destination.
- *
- * @param appContextClasspathResource - the spring configuration file (classpath resource)
- * @param connectionFactoryBean - the JMS connection factory bean name
- * @param destinationBean - the JMS destination bean name
- */
- public SpringJmsProvider(String appContextClasspathResource, String connectionFactoryBean, String destinationBean){
- ApplicationContext context = new ClassPathXmlApplicationContext(appContextClasspathResource);
- this.connectionFactory = (ConnectionFactory)context.getBean(connectionFactoryBean);
- this.destination = (Destination)context.getBean(destinationBean);
- }
-
- public ConnectionFactory connectionFactory() throws Exception {
- return this.connectionFactory;
- }
-
- public Destination destination() throws Exception {
- return this.destination;
- }
-
-}