You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/05/14 02:03:55 UTC
[1/2] storm git commit: STORM-3035: fix the issue in JmsSpout.ack
when toCommit is empty
Repository: storm
Updated Branches:
refs/heads/master 3dbc67f4c -> 85877154f
STORM-3035: fix the issue in JmsSpout.ack when toCommit is empty
This closes #2639
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9a9cc1f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9a9cc1f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9a9cc1f
Branch: refs/heads/master
Commit: c9a9cc1fcdbc9efc9635124f620c146e118fff92
Parents: 3dbc67f
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Apr 19 10:52:38 2018 -0700
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon May 14 11:03:08 2018 +0900
----------------------------------------------------------------------
.../storm/jms/example/ExampleJmsTopology.java | 1 -
.../org/apache/storm/jms/spout/JmsSpout.java | 590 ++++++++++---------
.../apache/storm/jms/spout/JmsSpoutTest.java | 1 -
3 files changed, 313 insertions(+), 279 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c9a9cc1f/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
index accb052..096a16e 100644
--- a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
@@ -62,7 +62,6 @@ public class ExampleJmsTopology {
queueSpout.setJmsProvider(jmsQueueProvider);
queueSpout.setJmsTupleProducer(producer);
queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
- queueSpout.setDistributed(true); // allow multiple instances
TopologyBuilder builder = new TopologyBuilder();
http://git-wip-us.apache.org/repos/asf/storm/blob/c9a9cc1f/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
index 41d5636..96226d6 100644
--- a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
@@ -21,19 +21,16 @@ package org.apache.storm.jms.spout;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.Collections;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
import javax.jms.Session;
+
import org.apache.storm.Config;
import org.apache.storm.jms.JmsProvider;
import org.apache.storm.jms.JmsTupleProducer;
@@ -42,57 +39,36 @@ import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue and outputs tuples based on the messages it receives.
+ * A Storm <code>Spout</code> implementation that listens to a JMS topic or
+ * queue and outputs tuples based on the messages it receives.
*
* <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
* implementations to obtain the JMS
* <code>ConnectionFactory</code> and <code>Destination</code> objects necessary
* to connect to a JMS topic/queue.
*
- * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to an
- * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the incoming message.
+ * <p>When a {@code JmsSpout} receives a JMS message, it delegates to an
+ * internal {@code JmsTupleProducer} instance to create a Storm tuple from
+ * the incoming message.
*
* <p>Typically, developers will supply a custom <code>JmsTupleProducer</code>
* implementation appropriate for the expected message content.
*/
@SuppressWarnings("serial")
-public class JmsSpout extends BaseRichSpout implements MessageListener {
+public class JmsSpout extends BaseRichSpout {
- /**
- * The logger object instance for this class.
- */
+ /** The logger object instance for this class. */
private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
- /**
- * The logger of the recovery task.
- */
- private static final Logger RECOVERY_TASK_LOG = LoggerFactory.getLogger(RecoveryTask.class);
-
- /**
- * Time to sleep between queue polling attempts.
- */
+ /** Time to sleep between queue polling attempts. */
private static final int POLL_INTERVAL_MS = 50;
/**
- * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
- */
- private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
-
- /**
- * Time to wait before queuing the first recovery task.
- */
- private static final int RECOVERY_DELAY_MS = 10;
- /**
- * Used to safely recover failed JMS sessions across instances.
- */
- private final Serializable recoveryMutex = "RECOVERY_MUTEX";
- /**
* The acknowledgment mode used for this instance.
*
* @see Session
@@ -102,80 +78,96 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
* Indicates whether or not this spout should run as a singleton.
*/
private boolean distributed = true;
- /**
- * Used to generate tuples from incoming messages.
- */
+
+ /** Sets up the way we want to handle the emit, ack and fails. */
+ private MessageHandler messageHandler = new MessageHandler();
+
+ /** Used to generate tuples from incoming messages. */
private JmsTupleProducer tupleProducer;
- /**
- * Encapsulates jms related classes needed to communicate with the mq.
- */
+
+ /** Encapsulates jms related classes needed to communicate with the mq. */
private JmsProvider jmsProvider;
- /**
- * Stores incoming messages for later sending.
- */
- private LinkedBlockingQueue<Message> queue;
- /**
- * Contains all message ids of messages that were not yet acked.
- */
- private TreeSet<JmsMessageID> toCommit;
- /**
- * Maps between message ids of not-yet acked messages, and the messages.
- */
- private HashMap<JmsMessageID, Message> pendingMessages;
- /**
- * Counter of handled messages.
- */
+
+ /** Counter of handled messages. */
private long messageSequence = 0;
- /**
- * The collector used to emit tuples.
- */
+
+ /** The collector used to emit tuples. */
private SpoutOutputCollector collector;
- /**
- * Connection to the jms queue.
- */
+
+ /** Connection to the jms queue. */
private transient Connection connection;
- /**
- * The active jms session.
- */
+
+ /** The active jms session. */
private transient Session session;
+
/**
- * Indicates whether or not a message failed to be processed.
- */
- private boolean hasFailures = false;
- /**
- * Schedules recovery tasks periodically.
+ * The message consumer.
*/
- private Timer recoveryTimer = null;
+ private MessageConsumer consumer;
+
/**
- * Time to wait between recovery attempts.
+ * If JMS provider supports ack-ing individual messages.
*/
- private long recoveryPeriodMs = -1; // default to disabled
+ private boolean individualAcks;
/**
- * Translate the {@code int} value of an acknowledgment to a {@code String}.
+ * Sets the JMS Session acknowledgement mode for the JMS session.
*
- * @param deliveryMode the mode to translate.
- * @return its {@code String} explanation (name).
+ * <p>Possible values:
+ * <ul>
+ * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+ * </ul>
*
- * @see Session
+ * <p>Any other vendor specific modes are not supported.
+ *
+ * @param mode JMS Session Acknowledgement mode
*/
- private static String toDeliveryModeString(int deliveryMode) {
- switch (deliveryMode) {
+ public void setJmsAcknowledgeMode(final int mode) {
+ switch (mode) {
case Session.AUTO_ACKNOWLEDGE:
- return "AUTO_ACKNOWLEDGE";
- case Session.CLIENT_ACKNOWLEDGE:
- return "CLIENT_ACKNOWLEDGE";
case Session.DUPS_OK_ACKNOWLEDGE:
- return "DUPS_OK_ACKNOWLEDGE";
+ messageHandler = new MessageHandler();
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ messageHandler = new ClientAckHandler();
+ break;
+ case Session.SESSION_TRANSACTED:
+ messageHandler = new TransactedSessionMessageHandler();
+ break;
default:
- return "UNKNOWN";
+ LOG.warn("Unsupported Acknowledge mode: "
+ + mode + " (See javax.jms.Session for valid values)");
+ }
+ jmsAcknowledgeMode = mode;
+ }
+ /**
+ * Validates the unsupported vendor specific ack mode.
+ */
+ private void validateJmsAckMode() {
+ if (jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE
+ && jmsAcknowledgeMode != Session.DUPS_OK_ACKNOWLEDGE
+ && jmsAcknowledgeMode != Session.CLIENT_ACKNOWLEDGE
+ && jmsAcknowledgeMode != Session.SESSION_TRANSACTED) {
+ LOG.warn("Unsupported Acknowledge mode: " + jmsAcknowledgeMode
+ + " (See javax.jms.Session for valid values)");
+
+ if (individualAcks) {
+ LOG.warn("Allowing vendor specific mode due "
+ + "to setIndividualAcks");
+ } else {
+ throw new IllegalArgumentException("Unsupported"
+ + "Acknowledge mode: " + jmsAcknowledgeMode);
+ }
}
}
/**
- * Returns the JMS Session acknowledgement mode for the JMS session associated with this spout. Can be either of:
+ * Returns the JMS Session acknowledgement mode for the JMS session
+ * associated with this spout. Can be either of:
* <ul>
* <li>{@link Session#AUTO_ACKNOWLEDGE}</li>
* <li>{@link Session#CLIENT_ACKNOWLEDGE}</li>
@@ -190,37 +182,11 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
}
/**
- * Sets the JMS Session acknowledgement mode for the JMS session.
- *
- * <p>Possible values:
- * <ul>
- * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
- * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
- * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
- * </ul>
- *
- * @param mode JMS Session Acknowledgement mode
- * @throws IllegalArgumentException if the mode is not recognized.
- */
- public void setJmsAcknowledgeMode(final int mode) {
- switch (mode) {
- case Session.AUTO_ACKNOWLEDGE:
- case Session.CLIENT_ACKNOWLEDGE:
- case Session.DUPS_OK_ACKNOWLEDGE:
- break;
- default:
- throw new IllegalArgumentException(
- "Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
-
- }
- this.jmsAcknowledgeMode = mode;
- }
-
- /**
* Set {@link #jmsProvider}.
*
* <p>Set the <code>JmsProvider</code>
- * implementation that this Spout will use to connect to a JMS <code>javax.jms.Desination</code>
+ * implementation that this Spout will use to connect to
+ * a JMS <code>javax.jms.Desination</code>
*
* @param provider the provider to use
*/
@@ -229,30 +195,24 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
}
/**
- * Set the <code>JmsTupleProducer</code> implementation that will convert <code>javax.jms.Message</code> object to
- * <code>org.apache.storm.tuple.Values</code> objects to be emitted.
+ * Set the <code>JmsTupleProducer</code>
+ * implementation that will convert <code>javax.jms.Message</code>
+ * object to <code>org.apache.storm.tuple.Values</code> objects
+ * to be emitted.
*
* @param producer the producer instance to use
*/
- public void setJmsTupleProducer(JmsTupleProducer producer) {
+ public void setJmsTupleProducer(final JmsTupleProducer producer) {
this.tupleProducer = producer;
}
/**
- * <code>javax.jms.MessageListener</code> implementation.
- *
- * <p>Stored the JMS message in an internal queue for processing
- * by the <code>nextTuple()</code> method.
- *
- * @param msg the message to handle
+ * Set if JMS vendor supports ack-ing individual messages. The appropriate
+ * mode must be set via {{@link #setJmsAcknowledgeMode(int)}}.
*/
- public void onMessage(Message msg) {
- try {
- LOG.debug("Queuing msg [" + msg.getJMSMessageID() + "]");
- } catch (JMSException ignored) {
- // Do nothing
- }
- this.queue.offer(msg);
+ public void setIndividualAcks() {
+ individualAcks = true;
+ messageHandler = new MessageAckHandler();
}
/**
@@ -262,42 +222,27 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
* topic/queue.
*/
@Override
- public void open(Map<String, Object> conf,
- TopologyContext context,
- SpoutOutputCollector collector) {
+ public void open(final Map<String, Object> conf,
+ final TopologyContext context,
+ final SpoutOutputCollector spoutOutputCollector) {
- if (this.jmsProvider == null) {
- throw new IllegalStateException("JMS provider has not been set.");
- }
- if (this.tupleProducer == null) {
- throw new IllegalStateException("JMS Tuple Producer has not been set.");
+ if (jmsProvider == null) {
+ throw new IllegalStateException(
+ "JMS provider has not been set.");
}
- // TODO get the default value from storm instead of hard coding 30 secs
- Long topologyTimeout =
- ((Number) conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
- if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > this.recoveryPeriodMs) {
- LOG.warn("*** WARNING *** : "
- + "Recovery period (" + this.recoveryPeriodMs + " ms.) is less then the configured "
- + "'topology.message.timeout.secs' of " + topologyTimeout
- + " secs. This could lead to a message replay flood!");
+ if (tupleProducer == null) {
+ throw new IllegalStateException(
+ "JMS Tuple Producer has not been set.");
}
- this.queue = new LinkedBlockingQueue<Message>();
- this.toCommit = new TreeSet<JmsMessageID>();
- this.pendingMessages = new HashMap<JmsMessageID, Message>();
- this.collector = collector;
+ validateJmsAckMode();
+ collector = spoutOutputCollector;
try {
- ConnectionFactory cf = this.jmsProvider.connectionFactory();
- Destination dest = this.jmsProvider.destination();
- this.connection = cf.createConnection();
- this.session = connection.createSession(false, this.jmsAcknowledgeMode);
- MessageConsumer consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
- this.connection.start();
- if (this.isDurableSubscription() && this.recoveryPeriodMs > 0) {
- this.recoveryTimer = new Timer();
- this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), RECOVERY_DELAY_MS, this.recoveryPeriodMs);
- }
-
+ ConnectionFactory cf = jmsProvider.connectionFactory();
+ Destination dest = jmsProvider.destination();
+ connection = cf.createConnection();
+ session = messageHandler.createSession(connection);
+ consumer = session.createConsumer(dest);
+ connection.start();
} catch (Exception e) {
LOG.warn("Error creating JMS connection.", e);
}
@@ -307,13 +252,14 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
/**
* Close the {@link #session} and {@link #connection}.
*
- * <p>When overridden, should always call {@code super} to finalize the active connections.
+ * <p>When overridden, should always call {@code super}
+ * to finalize the active connections.
*/
public void close() {
try {
LOG.debug("Closing JMS connection.");
- this.session.close();
- this.connection.close();
+ session.close();
+ connection.close();
} catch (JMSException e) {
LOG.warn("Error closing JMS connection.", e);
}
@@ -324,74 +270,33 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
* Generate the next tuple from a message.
*
* <p>This method polls the queue that's being filled asynchronously by the
- * jms connection, every {@link #POLL_INTERVAL_MS} seconds. When a message arrives, a {@link Values} (tuple) is generated using {@link
- * #tupleProducer}. It is emitted, and the message is saved to {@link #toCommit} and {@link #pendingMessages} for later handling.
+ * jms connection, every {@link #POLL_INTERVAL_MS} seconds.
*/
public void nextTuple() {
- Message msg = this.queue.poll();
- if (msg == null) {
- Utils.sleep(POLL_INTERVAL_MS);
- } else {
-
- LOG.debug("sending tuple: " + msg);
- // get the tuple from the handler
- try {
- Values vals = this.tupleProducer.toTuple(msg);
- // ack if we're not in AUTO_ACKNOWLEDGE mode,
- // or the message requests ACKNOWLEDGE
- LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode()));
- LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode));
- if (this.isDurableSubscription()) {
- LOG.debug("Requesting acks.");
- JmsMessageID messageId = new JmsMessageID(this.messageSequence++, msg.getJMSMessageID());
- this.collector.emit(vals, messageId);
-
- // at this point we successfully emitted. Store
- // the message and message ID so we can do a
- // JMS acknowledge later
- this.pendingMessages.put(messageId, msg);
- this.toCommit.add(messageId);
- } else {
- this.collector.emit(vals);
- }
- } catch (JMSException e) {
- LOG.warn("Unable to convert JMS message: " + msg);
+ try {
+ Message msg = consumer.receive(POLL_INTERVAL_MS);
+ if (msg != null) {
+ LOG.debug("sending tuple {}", msg);
+ messageHandler.emit(msg);
}
-
+ } catch (JMSException ex) {
+ LOG.warn("Got error trying to process tuple", ex);
}
-
}
/**
* Ack a successfully handled message by the matching {@link JmsMessageID}.
*
* <p>Acking means removing the message from the pending messages
- * collections, and if it was the oldest pending message - ack it to the mq as well, so that it's the only one acked.
+ * collections, and if it was the oldest pending message -
+ * ack it to the mq as well, so that it's the only one acked.
*
* <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE.
*/
@Override
- public void ack(Object msgId) {
-
- Message msg = this.pendingMessages.remove(msgId);
- JmsMessageID oldest = this.toCommit.first();
- if (msgId.equals(oldest)) {
- if (msg != null) {
- try {
- LOG.debug("Committing...");
- msg.acknowledge();
- LOG.debug("JMS Message acked: " + msgId);
- this.toCommit.remove(msgId);
- } catch (JMSException e) {
- LOG.warn("Error acknowldging JMS message: " + msgId, e);
- }
- } else {
- LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId);
- }
- } else {
- this.toCommit.remove(msgId);
- }
-
+ public void ack(final Object msgId) {
+ LOG.debug("Received ACK for message: {}", msgId);
+ messageHandler.ack(msgId);
}
/**
@@ -403,57 +308,31 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
* <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
*/
@Override
- public void fail(Object msgId) {
- LOG.warn("Message failed: " + msgId);
- this.pendingMessages.clear();
- this.toCommit.clear();
- synchronized (this.recoveryMutex) {
- this.hasFailures = true;
- }
+ public void fail(final Object msgId) {
+ LOG.warn("Received fail for message {}", msgId);
+ messageHandler.fail(msgId);
}
/**
- * Use the {@link #tupleProducer} to determine which fields are about to be emitted.
+ * Use the {@link #tupleProducer} to determine which fields are about
+ * to be emitted.
*
- * <p>Note that {@link #nextTuple()} always emits to the default stream, and thus only fields declared
- * for this stream are used.
+ * <p>Note that {@link #nextTuple()} always emits to the default stream,
+ * and thus only fields declared for this stream are used.
*/
@Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
this.tupleProducer.declareOutputFields(declarer);
}
/**
- * Returns <code>true</code> if the spout has received failures from which it has not yet recovered.
+ * Returns if the spout is distributed.
*
- * @return {@code true} if there were failures, {@code false} otherwise.
- */
- public boolean hasFailures() {
- return this.hasFailures;
- }
-
- /**
- * Marks a healthy session state.
- */
- protected void recovered() {
- this.hasFailures = false;
- }
-
- /**
- * Sets the periodicity of the timer task that checks for failures and recovers the JMS session.
- *
- * @param period desired wait period
- */
- public void setRecoveryPeriodMs(long period) {
- this.recoveryPeriodMs = period;
- }
-
- /**
* @return {@link #distributed}.
*/
public boolean isDistributed() {
- return this.distributed;
+ return distributed;
}
/**
@@ -471,50 +350,207 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
* @param isDistributed {@code true} if should be distributed, {@code false} otherwise.
*/
public void setDistributed(boolean isDistributed) {
- this.distributed = isDistributed;
+ distributed = isDistributed;
}
/**
- * @return The currently active session.
+ * Returns the currently active session.
+ *
+ * @return The currently active session
*/
protected Session getSession() {
- return this.session;
+ return session;
}
- /**
- * Check if the subscription requires messages to be acked.
- *
- * @return {@code true} if there is a pending messages state, {@code false} otherwise.
- */
- private boolean isDurableSubscription() {
- return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE);
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return distributed ? null :
+ Collections.singletonMap(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
}
-
/**
- * The periodic task used to try and recover failed sessions.
+ * Handles messages in JMS AUTO or DUPS_OK ack mode.
*/
- private class RecoveryTask extends TimerTask {
+ private class MessageHandler implements Serializable {
/**
- * Try to recover a failed active session.
+ * Emit a message.
*
- * <p>If there is no active recovery task, and the session is failed,
- * try to recover the session.
+ * @param msg the message
*/
- public void run() {
- synchronized (JmsSpout.this.recoveryMutex) {
- if (JmsSpout.this.hasFailures()) {
+ void emit(final Message msg) {
+ LOG.debug("Received msg {}", msg);
+ try {
+ Values vals = tupleProducer.toTuple(msg);
+ collector.emit(vals);
+ } catch (JMSException ex) {
+ LOG.warn("Error processing message {}", msg);
+ }
+ }
+
+ /**
+ * Ack a message.
+ *
+ * @param msgId the message id
+ */
+ void ack(final Object msgId) {
+ // NOOP
+ }
+
+ /**
+ * Fail a message.
+ *
+ * @param msgId the message id
+ */
+ void fail(final Object msgId) {
+ // NOOP
+ }
+
+ /**
+ * Create a session.
+ *
+ * @param conn the connection
+ * @return the session
+ * @throws JMSException the JMS exception in case of error
+ */
+ Session createSession(final Connection conn) throws JMSException {
+ return conn.createSession(false, jmsAcknowledgeMode);
+ }
+ }
+
+ /**
+ * JMS mode where individual messages can be ack-ed.
+ */
+ private class MessageAckHandler extends MessageHandler {
+ /**
+ * Maps between message ids of not-yet acked messages and the messages.
+ */
+ private Map<JmsMessageID, Message> pendingAcks = new HashMap<>();
+
+ @Override
+ void emit(final Message msg) {
+ LOG.debug("Received msg {}, Requesting acks.", msg);
+ try {
+ JmsMessageID messageId = new JmsMessageID(messageSequence++,
+ msg.getJMSMessageID());
+ Values vals = tupleProducer.toTuple(msg);
+ collector.emit(vals, messageId);
+ pendingAcks.put(messageId, msg);
+ } catch (JMSException ex) {
+ LOG.warn("Error processing message {}", msg);
+ }
+ }
+
+ @Override
+ void ack(final Object msgId) {
+ if (pendingAcks.isEmpty()) {
+ LOG.debug("Not processing the ACK, pendingAcks is empty");
+ } else {
+ Message msg = pendingAcks.remove(msgId);
+ if (msg != null) {
try {
- RECOVERY_TASK_LOG.info("Recovering from a message failure.");
- JmsSpout.this.getSession().recover();
- JmsSpout.this.recovered();
+ doAck(msg);
} catch (JMSException e) {
- RECOVERY_TASK_LOG.warn("Could not recover jms session.", e);
+ LOG.warn("Error acknowledging JMS message: {}",
+ msgId, e);
}
+ } else {
+ LOG.warn("Couldn't acknowledge unknown JMS message: {}",
+ msgId);
}
}
}
+ @Override
+ void fail(final Object msgId) {
+ try {
+ // all the JMS un-acked messages are going to be re-delivered
+ // so clear the pendingAcks
+ if (!pendingAcks.isEmpty()) {
+ pendingAcks.clear();
+ doFail();
+ }
+ } catch (JMSException ex) {
+ LOG.warn("Error during session recovery", ex);
+ }
+ }
+
+ /**
+ * Ack the message.
+ *
+ * @param msg the message
+ * @throws JMSException the JMS exception in case of error
+ */
+ protected void doAck(final Message msg) throws JMSException {
+ msg.acknowledge();
+ LOG.debug("JMS message acked");
+ }
+
+ /**
+ * Fail the messages.
+ *
+ * @throws JMSException in case of error
+ */
+ protected void doFail() throws JMSException {
+ LOG.info("Triggering session recovery");
+ getSession().recover();
+ }
+
+ /**
+ * Returns the pending acks.
+ *
+ * @return the pending acks
+ */
+ protected Map<JmsMessageID, Message> getPendingAcks() {
+ return pendingAcks;
+ }
+ }
+
+ /**
+ * JMS CLIENT_ACKNOWLEDGE mode where acking a message
+ * acks all consumed messages in the session.
+ */
+ private class ClientAckHandler extends MessageAckHandler {
+ @Override
+ protected void doAck(final Message msg) throws JMSException {
+ // if there are no more pending consumed messages
+ // and storm delivered ack for all
+ if (getPendingAcks().isEmpty()) {
+ msg.acknowledge();
+ LOG.debug("JMS message acked");
+ } else {
+ LOG.debug("Not acknowledging the JMS message "
+ + "since there are pending messages in the session");
+ }
+ }
+ }
+
+ /**
+ * JMS SESSION_TRANSACTED mode.
+ */
+ private class TransactedSessionMessageHandler extends MessageAckHandler {
+ @Override
+ protected void doAck(final Message msg) throws JMSException {
+ // if there are no more pending consumed messages
+ // and storm delivered ack for all
+ if (getPendingAcks().isEmpty()) {
+ session.commit();
+ LOG.debug("JMS session committed");
+ } else {
+ LOG.debug("Not committing the session "
+ + "since there are pending messages in the session");
+ }
+ }
+
+ @Override
+ protected void doFail() throws JMSException {
+ LOG.info("Triggering session rollback");
+ session.rollback();
+ }
+
+ @Override
+ Session createSession(final Connection conn) throws JMSException {
+ return conn.createSession(true, jmsAcknowledgeMode);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c9a9cc1f/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
index 9f967f8..dbfba1b 100644
--- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
+++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
@@ -53,7 +53,6 @@ public class JmsSpoutTest {
spout.setJmsProvider(new MockJmsProvider());
spout.setJmsTupleProducer(new MockTupleProducer());
spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
- spout.setRecoveryPeriodMs(10); // Rapid recovery for testing.
spout.open(new HashMap<>(), null, collector);
ConnectionFactory connectionFactory = mockProvider.connectionFactory();
Destination destination = mockProvider.destination();
[2/2] storm git commit: Merge branch 'STORM-3035-merge'
Posted by ka...@apache.org.
Merge branch 'STORM-3035-merge'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85877154
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85877154
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85877154
Branch: refs/heads/master
Commit: 85877154f68e4cd6e59c6c5c0bff9b4c2eacf535
Parents: 3dbc67f c9a9cc1
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon May 14 11:03:40 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon May 14 11:03:40 2018 +0900
----------------------------------------------------------------------
.../storm/jms/example/ExampleJmsTopology.java | 1 -
.../org/apache/storm/jms/spout/JmsSpout.java | 590 ++++++++++---------
.../apache/storm/jms/spout/JmsSpoutTest.java | 1 -
3 files changed, 313 insertions(+), 279 deletions(-)
----------------------------------------------------------------------