You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/05/14 02:03:55 UTC

[1/2] storm git commit: STORM-3035: fix the issue in JmsSpout.ack when toCommit is empty

Repository: storm
Updated Branches:
  refs/heads/master 3dbc67f4c -> 85877154f


STORM-3035: fix the issue in JmsSpout.ack when toCommit is empty

This closes #2639


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9a9cc1f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9a9cc1f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9a9cc1f

Branch: refs/heads/master
Commit: c9a9cc1fcdbc9efc9635124f620c146e118fff92
Parents: 3dbc67f
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Apr 19 10:52:38 2018 -0700
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon May 14 11:03:08 2018 +0900

----------------------------------------------------------------------
 .../storm/jms/example/ExampleJmsTopology.java   |   1 -
 .../org/apache/storm/jms/spout/JmsSpout.java    | 590 ++++++++++---------
 .../apache/storm/jms/spout/JmsSpoutTest.java    |   1 -
 3 files changed, 313 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c9a9cc1f/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
index accb052..096a16e 100644
--- a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
@@ -62,7 +62,6 @@ public class ExampleJmsTopology {
         queueSpout.setJmsProvider(jmsQueueProvider);
         queueSpout.setJmsTupleProducer(producer);
         queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
-        queueSpout.setDistributed(true); // allow multiple instances
 
         TopologyBuilder builder = new TopologyBuilder();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c9a9cc1f/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
index 41d5636..96226d6 100644
--- a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
@@ -21,19 +21,16 @@ package org.apache.storm.jms.spout;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.Collections;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
 import javax.jms.Session;
+
 import org.apache.storm.Config;
 import org.apache.storm.jms.JmsProvider;
 import org.apache.storm.jms.JmsTupleProducer;
@@ -42,57 +39,36 @@ import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue and outputs tuples based on the messages it receives.
+ * A Storm <code>Spout</code> implementation that listens to a JMS topic or
+ * queue and outputs tuples based on the messages it receives.
  *
  * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
  * implementations to obtain the JMS
  * <code>ConnectionFactory</code> and <code>Destination</code> objects necessary
  * to connect to a JMS topic/queue.
  *
- * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to an
- * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the incoming message.
+ * <p>When a {@code JmsSpout} receives a JMS message, it delegates to an
+ * internal {@code JmsTupleProducer} instance to create a Storm tuple from
+ * the incoming message.
  *
  * <p>Typically, developers will supply a custom <code>JmsTupleProducer</code>
  * implementation appropriate for the expected message content.
  */
 @SuppressWarnings("serial")
-public class JmsSpout extends BaseRichSpout implements MessageListener {
+public class JmsSpout extends BaseRichSpout {
 
-    /**
-     * The logger object instance for this class.
-     */
+    /** The logger object instance for this class. */
     private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
 
-    /**
-     * The logger of the recovery task.
-     */
-    private static final Logger RECOVERY_TASK_LOG = LoggerFactory.getLogger(RecoveryTask.class);
-
-    /**
-     * Time to sleep between queue polling attempts.
-     */
+    /** Time to sleep between queue polling attempts. */
     private static final int POLL_INTERVAL_MS = 50;
 
     /**
-     * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
-     */
-    private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
-
-    /**
-     * Time to wait before queuing the first recovery task.
-     */
-    private static final int RECOVERY_DELAY_MS = 10;
-    /**
-     * Used to safely recover failed JMS sessions across instances.
-     */
-    private final Serializable recoveryMutex = "RECOVERY_MUTEX";
-    /**
      * The acknowledgment mode used for this instance.
      *
      * @see Session
@@ -102,80 +78,96 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
      * Indicates whether or not this spout should run as a singleton.
      */
     private boolean distributed = true;
-    /**
-     * Used to generate tuples from incoming messages.
-     */
+
+    /** Sets up the way we want to handle the emit, ack and fails. */
+    private MessageHandler messageHandler = new MessageHandler();
+
+    /** Used to generate tuples from incoming messages. */
     private JmsTupleProducer tupleProducer;
-    /**
-     * Encapsulates jms related classes needed to communicate with the mq.
-     */
+
+    /** Encapsulates jms related classes needed to communicate with the mq. */
     private JmsProvider jmsProvider;
-    /**
-     * Stores incoming messages for later sending.
-     */
-    private LinkedBlockingQueue<Message> queue;
-    /**
-     * Contains all message ids of messages that were not yet acked.
-     */
-    private TreeSet<JmsMessageID> toCommit;
-    /**
-     * Maps between message ids of not-yet acked messages, and the messages.
-     */
-    private HashMap<JmsMessageID, Message> pendingMessages;
-    /**
-     * Counter of handled messages.
-     */
+
+    /** Counter of handled messages. */
     private long messageSequence = 0;
-    /**
-     * The collector used to emit tuples.
-     */
+
+    /** The collector used to emit tuples. */
     private SpoutOutputCollector collector;
-    /**
-     * Connection to the jms queue.
-     */
+
+    /** Connection to the jms queue. */
     private transient Connection connection;
-    /**
-     * The active jms session.
-     */
+
+    /** The active jms session. */
     private transient Session session;
+
     /**
-     * Indicates whether or not a message failed to be processed.
-     */
-    private boolean hasFailures = false;
-    /**
-     * Schedules recovery tasks periodically.
+     * The message consumer.
      */
-    private Timer recoveryTimer = null;
+    private MessageConsumer consumer;
+
 
     /**
-     * Time to wait between recovery attempts.
+     * If JMS provider supports ack-ing individual messages.
      */
-    private long recoveryPeriodMs = -1; // default to disabled
+    private boolean individualAcks;
 
     /**
-     * Translate the {@code int} value of an acknowledgment to a {@code String}.
+     * Sets the JMS Session acknowledgement mode for the JMS session.
      *
-     * @param deliveryMode the mode to translate.
-     * @return its {@code String} explanation (name).
+     * <p>Possible values:
+     * <ul>
+     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+     * </ul>
      *
-     * @see Session
+     * <p>Any other vendor specific modes are not supported.
+     *
+     * @param mode JMS Session Acknowledgement mode
      */
-    private static String toDeliveryModeString(int deliveryMode) {
-        switch (deliveryMode) {
+    public void setJmsAcknowledgeMode(final int mode) {
+        switch (mode) {
             case Session.AUTO_ACKNOWLEDGE:
-                return "AUTO_ACKNOWLEDGE";
-            case Session.CLIENT_ACKNOWLEDGE:
-                return "CLIENT_ACKNOWLEDGE";
             case Session.DUPS_OK_ACKNOWLEDGE:
-                return "DUPS_OK_ACKNOWLEDGE";
+                messageHandler = new MessageHandler();
+                break;
+            case Session.CLIENT_ACKNOWLEDGE:
+                messageHandler = new ClientAckHandler();
+                break;
+            case Session.SESSION_TRANSACTED:
+                messageHandler = new TransactedSessionMessageHandler();
+                break;
             default:
-                return "UNKNOWN";
+                LOG.warn("Unsupported Acknowledge mode: "
+                    + mode + " (See javax.jms.Session for valid values)");
+        }
+        jmsAcknowledgeMode = mode;
+    }
 
+    /**
+     * Validates the unsupported vendor specific ack mode.
+     */
+    private void validateJmsAckMode() {
+        if (jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE
+            && jmsAcknowledgeMode != Session.DUPS_OK_ACKNOWLEDGE
+            && jmsAcknowledgeMode != Session.CLIENT_ACKNOWLEDGE
+            && jmsAcknowledgeMode != Session.SESSION_TRANSACTED) {
+            LOG.warn("Unsupported Acknowledge mode: " + jmsAcknowledgeMode
+                + " (See javax.jms.Session for valid values)");
+
+            if (individualAcks) {
+                LOG.warn("Allowing vendor specific mode due "
+                    + "to setIndividualAcks");
+            } else {
+                throw new IllegalArgumentException("Unsupported"
+                    + "Acknowledge mode: " + jmsAcknowledgeMode);
+            }
         }
     }
 
     /**
-     * Returns the JMS Session acknowledgement mode for the JMS session associated with this spout. Can be either of:
+     * Returns the JMS Session acknowledgement mode for the JMS session
+     * associated with this spout. Can be either of:
      * <ul>
      * <li>{@link Session#AUTO_ACKNOWLEDGE}</li>
      * <li>{@link Session#CLIENT_ACKNOWLEDGE}</li>
@@ -190,37 +182,11 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
     }
 
     /**
-     * Sets the JMS Session acknowledgement mode for the JMS session.
-     *
-     * <p>Possible values:
-     * <ul>
-     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
-     * </ul>
-     *
-     * @param mode JMS Session Acknowledgement mode
-     * @throws IllegalArgumentException if the mode is not recognized.
-     */
-    public void setJmsAcknowledgeMode(final int mode) {
-        switch (mode) {
-            case Session.AUTO_ACKNOWLEDGE:
-            case Session.CLIENT_ACKNOWLEDGE:
-            case Session.DUPS_OK_ACKNOWLEDGE:
-                break;
-            default:
-                throw new IllegalArgumentException(
-                    "Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
-
-        }
-        this.jmsAcknowledgeMode = mode;
-    }
-
-    /**
      * Set {@link #jmsProvider}.
      *
      * <p>Set the <code>JmsProvider</code>
-     * implementation that this Spout will use to connect to a JMS <code>javax.jms.Desination</code>
+     * implementation that this Spout will use to connect to
+     * a JMS <code>javax.jms.Desination</code>
      *
      * @param provider the provider to use
      */
@@ -229,30 +195,24 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
     }
 
     /**
-     * Set the <code>JmsTupleProducer</code> implementation that will convert <code>javax.jms.Message</code> object to
-     * <code>org.apache.storm.tuple.Values</code> objects to be emitted.
+     * Set the <code>JmsTupleProducer</code>
+     * implementation that will convert <code>javax.jms.Message</code>
+     * object to <code>org.apache.storm.tuple.Values</code> objects
+     * to be emitted.
      *
      * @param producer the producer instance to use
      */
-    public void setJmsTupleProducer(JmsTupleProducer producer) {
+    public void setJmsTupleProducer(final JmsTupleProducer producer) {
         this.tupleProducer = producer;
     }
 
     /**
-     * <code>javax.jms.MessageListener</code> implementation.
-     *
-     * <p>Stored the JMS message in an internal queue for processing
-     * by the <code>nextTuple()</code> method.
-     *
-     * @param msg the message to handle
+     * Set if JMS vendor supports ack-ing individual messages. The appropriate
+     * mode must be set via {{@link #setJmsAcknowledgeMode(int)}}.
      */
-    public void onMessage(Message msg) {
-        try {
-            LOG.debug("Queuing msg [" + msg.getJMSMessageID() + "]");
-        } catch (JMSException ignored) {
-            // Do nothing
-        }
-        this.queue.offer(msg);
+    public void setIndividualAcks() {
+        individualAcks = true;
+        messageHandler = new MessageAckHandler();
     }
 
     /**
@@ -262,42 +222,27 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
      * topic/queue.
      */
     @Override
-    public void open(Map<String, Object> conf,
-                     TopologyContext context,
-                     SpoutOutputCollector collector) {
+    public void open(final Map<String, Object> conf,
+                     final TopologyContext context,
+                     final SpoutOutputCollector spoutOutputCollector) {
 
-        if (this.jmsProvider == null) {
-            throw new IllegalStateException("JMS provider has not been set.");
-        }
-        if (this.tupleProducer == null) {
-            throw new IllegalStateException("JMS Tuple Producer has not been set.");
+        if (jmsProvider == null) {
+            throw new IllegalStateException(
+                "JMS provider has not been set.");
         }
-        // TODO get the default value from storm instead of hard coding 30 secs
-        Long topologyTimeout =
-            ((Number) conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
-        if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > this.recoveryPeriodMs) {
-            LOG.warn("*** WARNING *** : "
-                     + "Recovery period (" + this.recoveryPeriodMs + " ms.) is less then the configured "
-                     + "'topology.message.timeout.secs' of " + topologyTimeout
-                     + " secs. This could lead to a message replay flood!");
+        if (tupleProducer == null) {
+            throw new IllegalStateException(
+                "JMS Tuple Producer has not been set.");
         }
-        this.queue = new LinkedBlockingQueue<Message>();
-        this.toCommit = new TreeSet<JmsMessageID>();
-        this.pendingMessages = new HashMap<JmsMessageID, Message>();
-        this.collector = collector;
+        validateJmsAckMode();
+        collector = spoutOutputCollector;
         try {
-            ConnectionFactory cf = this.jmsProvider.connectionFactory();
-            Destination dest = this.jmsProvider.destination();
-            this.connection = cf.createConnection();
-            this.session = connection.createSession(false, this.jmsAcknowledgeMode);
-            MessageConsumer consumer = session.createConsumer(dest);
-            consumer.setMessageListener(this);
-            this.connection.start();
-            if (this.isDurableSubscription() && this.recoveryPeriodMs > 0) {
-                this.recoveryTimer = new Timer();
-                this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), RECOVERY_DELAY_MS, this.recoveryPeriodMs);
-            }
-
+            ConnectionFactory cf = jmsProvider.connectionFactory();
+            Destination dest = jmsProvider.destination();
+            connection = cf.createConnection();
+            session = messageHandler.createSession(connection);
+            consumer = session.createConsumer(dest);
+            connection.start();
         } catch (Exception e) {
             LOG.warn("Error creating JMS connection.", e);
         }
@@ -307,13 +252,14 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
     /**
      * Close the {@link #session} and {@link #connection}.
      *
-     * <p>When overridden, should always call {@code super} to finalize the active connections.
+     * <p>When overridden, should always call {@code super}
+     * to finalize the active connections.
      */
     public void close() {
         try {
             LOG.debug("Closing JMS connection.");
-            this.session.close();
-            this.connection.close();
+            session.close();
+            connection.close();
         } catch (JMSException e) {
             LOG.warn("Error closing JMS connection.", e);
         }
@@ -324,74 +270,33 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
      * Generate the next tuple from a message.
      *
      * <p>This method polls the queue that's being filled asynchronously by the
-     * jms connection, every {@link #POLL_INTERVAL_MS} seconds. When a message arrives, a {@link Values} (tuple) is generated using {@link
-     * #tupleProducer}. It is emitted, and the message is saved to {@link #toCommit} and {@link #pendingMessages} for later handling.
+     * jms connection, every {@link #POLL_INTERVAL_MS} seconds.
      */
     public void nextTuple() {
-        Message msg = this.queue.poll();
-        if (msg == null) {
-            Utils.sleep(POLL_INTERVAL_MS);
-        } else {
-
-            LOG.debug("sending tuple: " + msg);
-            // get the tuple from the handler
-            try {
-                Values vals = this.tupleProducer.toTuple(msg);
-                // ack if we're not in AUTO_ACKNOWLEDGE mode,
-                // or the message requests ACKNOWLEDGE
-                LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode()));
-                LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode));
-                if (this.isDurableSubscription()) {
-                    LOG.debug("Requesting acks.");
-                    JmsMessageID messageId = new JmsMessageID(this.messageSequence++, msg.getJMSMessageID());
-                    this.collector.emit(vals, messageId);
-
-                    // at this point we successfully emitted. Store
-                    // the message and message ID so we can do a
-                    // JMS acknowledge later
-                    this.pendingMessages.put(messageId, msg);
-                    this.toCommit.add(messageId);
-                } else {
-                    this.collector.emit(vals);
-                }
-            } catch (JMSException e) {
-                LOG.warn("Unable to convert JMS message: " + msg);
+        try {
+            Message msg = consumer.receive(POLL_INTERVAL_MS);
+            if (msg != null) {
+                LOG.debug("sending tuple {}", msg);
+                messageHandler.emit(msg);
             }
-
+        } catch (JMSException ex) {
+            LOG.warn("Got error trying to process tuple", ex);
         }
-
     }
 
     /**
      * Ack a successfully handled message by the matching {@link JmsMessageID}.
      *
      * <p>Acking means removing the message from the pending messages
-     * collections, and if it was the oldest pending message - ack it to the mq as well, so that it's the only one acked.
+     * collections, and if it was the oldest pending message -
+     * ack it to the mq as well, so that it's the only one acked.
      *
      * <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE.
      */
     @Override
-    public void ack(Object msgId) {
-
-        Message msg = this.pendingMessages.remove(msgId);
-        JmsMessageID oldest = this.toCommit.first();
-        if (msgId.equals(oldest)) {
-            if (msg != null) {
-                try {
-                    LOG.debug("Committing...");
-                    msg.acknowledge();
-                    LOG.debug("JMS Message acked: " + msgId);
-                    this.toCommit.remove(msgId);
-                } catch (JMSException e) {
-                    LOG.warn("Error acknowldging JMS message: " + msgId, e);
-                }
-            } else {
-                LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId);
-            }
-        } else {
-            this.toCommit.remove(msgId);
-        }
-
+    public void ack(final Object msgId) {
+        LOG.debug("Received ACK for message: {}", msgId);
+        messageHandler.ack(msgId);
     }
 
     /**
@@ -403,57 +308,31 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
      * <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
      */
     @Override
-    public void fail(Object msgId) {
-        LOG.warn("Message failed: " + msgId);
-        this.pendingMessages.clear();
-        this.toCommit.clear();
-        synchronized (this.recoveryMutex) {
-            this.hasFailures = true;
-        }
+    public void fail(final Object msgId) {
+        LOG.warn("Received fail for message {}", msgId);
+        messageHandler.fail(msgId);
     }
 
     /**
-     * Use the {@link #tupleProducer} to determine which fields are about to be emitted.
+     * Use the {@link #tupleProducer} to determine which fields are about
+     * to be emitted.
      *
-     * <p>Note that {@link #nextTuple()} always emits to the default stream, and thus only fields declared
-     * for this stream are used.
+     * <p>Note that {@link #nextTuple()} always emits to the default stream,
+     * and thus only fields declared for this stream are used.
      */
     @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    public void declareOutputFields(final OutputFieldsDeclarer declarer) {
         this.tupleProducer.declareOutputFields(declarer);
 
     }
 
     /**
-     * Returns <code>true</code> if the spout has received failures from which it has not yet recovered.
+     * Returns if the spout is distributed.
      *
-     * @return {@code true} if there were failures, {@code false} otherwise.
-     */
-    public boolean hasFailures() {
-        return this.hasFailures;
-    }
-
-    /**
-     * Marks a healthy session state.
-     */
-    protected void recovered() {
-        this.hasFailures = false;
-    }
-
-    /**
-     * Sets the periodicity of the timer task that checks for failures and recovers the JMS session.
-     *
-     * @param period desired wait period
-     */
-    public void setRecoveryPeriodMs(long period) {
-        this.recoveryPeriodMs = period;
-    }
-
-    /**
      * @return {@link #distributed}.
      */
     public boolean isDistributed() {
-        return this.distributed;
+        return distributed;
     }
 
     /**
@@ -471,50 +350,207 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
      * @param isDistributed {@code true} if should be distributed, {@code false} otherwise.
      */
     public void setDistributed(boolean isDistributed) {
-        this.distributed = isDistributed;
+        distributed = isDistributed;
     }
 
     /**
-     * @return The currently active session.
+     * Returns the currently active session.
+     *
+     * @return The currently active session
      */
     protected Session getSession() {
-        return this.session;
+        return session;
     }
 
-    /**
-     * Check if the subscription requires messages to be acked.
-     *
-     * @return {@code true} if there is a pending messages state, {@code false} otherwise.
-     */
-    private boolean isDurableSubscription() {
-        return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE);
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return distributed ? null :
+            Collections.singletonMap(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
     }
 
-
     /**
-     * The periodic task used to try and recover failed sessions.
+     * Handles messages in JMS AUTO or DUPS_OK ack mode.
      */
-    private class RecoveryTask extends TimerTask {
+    private class MessageHandler implements Serializable {
 
         /**
-         * Try to recover a failed active session.
+         * Emit a message.
          *
-         * <p>If there is no active recovery task, and the session is failed,
-         * try to recover the session.
+         * @param msg the message
          */
-        public void run() {
-            synchronized (JmsSpout.this.recoveryMutex) {
-                if (JmsSpout.this.hasFailures()) {
+        void emit(final Message msg) {
+            LOG.debug("Received msg {}", msg);
+            try {
+                Values vals = tupleProducer.toTuple(msg);
+                collector.emit(vals);
+            } catch (JMSException ex) {
+                LOG.warn("Error processing message {}", msg);
+            }
+        }
+
+        /**
+         * Ack a message.
+         *
+         * @param msgId the message id
+         */
+        void ack(final Object msgId) {
+            // NOOP
+        }
+
+        /**
+         * Fail a message.
+         *
+         * @param msgId the message id
+         */
+        void fail(final Object msgId) {
+            // NOOP
+        }
+
+        /**
+         * Create a session.
+         *
+         * @param conn the connection
+         * @return the session
+         * @throws JMSException the JMS exception in case of error
+         */
+        Session createSession(final Connection conn) throws JMSException {
+            return conn.createSession(false, jmsAcknowledgeMode);
+        }
+    }
+
+    /**
+     * JMS mode where individual messages can be ack-ed.
+     */
+    private class MessageAckHandler extends MessageHandler {
+        /**
+         * Maps between message ids of not-yet acked messages and the messages.
+         */
+        private Map<JmsMessageID, Message> pendingAcks = new HashMap<>();
+
+        @Override
+        void emit(final Message msg) {
+            LOG.debug("Received msg {}, Requesting acks.", msg);
+            try {
+                JmsMessageID messageId = new JmsMessageID(messageSequence++,
+                    msg.getJMSMessageID());
+                Values vals = tupleProducer.toTuple(msg);
+                collector.emit(vals, messageId);
+                pendingAcks.put(messageId, msg);
+            } catch (JMSException ex) {
+                LOG.warn("Error processing message {}", msg);
+            }
+        }
+
+        @Override
+        void ack(final Object msgId) {
+            if (pendingAcks.isEmpty()) {
+                LOG.debug("Not processing the ACK, pendingAcks is empty");
+            } else {
+                Message msg = pendingAcks.remove(msgId);
+                if (msg != null) {
                     try {
-                        RECOVERY_TASK_LOG.info("Recovering from a message failure.");
-                        JmsSpout.this.getSession().recover();
-                        JmsSpout.this.recovered();
+                        doAck(msg);
                     } catch (JMSException e) {
-                        RECOVERY_TASK_LOG.warn("Could not recover jms session.", e);
+                        LOG.warn("Error acknowledging JMS message: {}",
+                            msgId, e);
                     }
+                } else {
+                    LOG.warn("Couldn't acknowledge unknown JMS message: {}",
+                        msgId);
                 }
             }
         }
 
+        @Override
+        void fail(final Object msgId) {
+            try {
+                // all the JMS un-acked messages are going to be re-delivered
+                // so clear the pendingAcks
+                if (!pendingAcks.isEmpty()) {
+                    pendingAcks.clear();
+                    doFail();
+                }
+            } catch (JMSException ex) {
+                LOG.warn("Error during session recovery", ex);
+            }
+        }
+
+        /**
+         * Ack the message.
+         *
+         * @param msg the message
+         * @throws JMSException the JMS exception in case of error
+         */
+        protected void doAck(final Message msg) throws JMSException {
+            msg.acknowledge();
+            LOG.debug("JMS message acked");
+        }
+
+        /**
+         * Fail the messages.
+         *
+         * @throws JMSException in case of error
+         */
+        protected void doFail() throws JMSException {
+            LOG.info("Triggering session recovery");
+            getSession().recover();
+        }
+
+        /**
+         * Returns the pending acks.
+         *
+         * @return the pending acks
+         */
+        protected Map<JmsMessageID, Message> getPendingAcks() {
+            return pendingAcks;
+        }
+    }
+
+    /**
+     * JMS CLIENT_ACKNOWLEDGE mode where acking a message
+     * acks all consumed messages in the session.
+     */
+    private class ClientAckHandler extends MessageAckHandler {
+        @Override
+        protected void doAck(final Message msg) throws JMSException {
+            // if there are no more pending consumed messages
+            // and storm delivered ack for all
+            if (getPendingAcks().isEmpty()) {
+                msg.acknowledge();
+                LOG.debug("JMS message acked");
+            } else {
+                LOG.debug("Not acknowledging the JMS message "
+                    + "since there are pending messages in the session");
+            }
+        }
+    }
+
+    /**
+     * JMS SESSION_TRANSACTED mode.
+     */
+    private class TransactedSessionMessageHandler extends MessageAckHandler {
+        @Override
+        protected void doAck(final Message msg) throws JMSException {
+            // if there are no more pending consumed messages
+            // and storm delivered ack for all
+            if (getPendingAcks().isEmpty()) {
+                session.commit();
+                LOG.debug("JMS session committed");
+            } else {
+                LOG.debug("Not committing the session "
+                    + "since there are pending messages in the session");
+            }
+        }
+
+        @Override
+        protected void doFail() throws JMSException {
+            LOG.info("Triggering session rollback");
+            session.rollback();
+        }
+
+        @Override
+        Session createSession(final Connection conn) throws JMSException {
+            return conn.createSession(true, jmsAcknowledgeMode);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9a9cc1f/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
index 9f967f8..dbfba1b 100644
--- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
+++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
@@ -53,7 +53,6 @@ public class JmsSpoutTest {
             spout.setJmsProvider(new MockJmsProvider());
             spout.setJmsTupleProducer(new MockTupleProducer());
             spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
-            spout.setRecoveryPeriodMs(10); // Rapid recovery for testing.
             spout.open(new HashMap<>(), null, collector);
             ConnectionFactory connectionFactory = mockProvider.connectionFactory();
             Destination destination = mockProvider.destination();


[2/2] storm git commit: Merge branch 'STORM-3035-merge'

Posted by ka...@apache.org.
Merge branch 'STORM-3035-merge'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85877154
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85877154
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85877154

Branch: refs/heads/master
Commit: 85877154f68e4cd6e59c6c5c0bff9b4c2eacf535
Parents: 3dbc67f c9a9cc1
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon May 14 11:03:40 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon May 14 11:03:40 2018 +0900

----------------------------------------------------------------------
 .../storm/jms/example/ExampleJmsTopology.java   |   1 -
 .../org/apache/storm/jms/spout/JmsSpout.java    | 590 ++++++++++---------
 .../apache/storm/jms/spout/JmsSpoutTest.java    |   1 -
 3 files changed, 313 insertions(+), 279 deletions(-)
----------------------------------------------------------------------