You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by arunmahadevan <gi...@git.apache.org> on 2018/04/19 17:58:44 UTC

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

GitHub user arunmahadevan opened a pull request:

    https://github.com/apache/storm/pull/2639

    STORM-3035: fix the issue in JmsSpout.ack when toCommit is empty

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/arunmahadevan/storm STORM-3035

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2639
    
----
commit f091f0ffda8bf6b1fc52981f38cca757d9c98559
Author: Arun Mahadevan <ar...@...>
Date:   2018-04-19T17:52:38Z

    STORM-3035: fix the issue in JmsSpout.ack when toCommit is empty

----


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2639


---

[GitHub] storm issue #2639: STORM-3035: fix the issue in JmsSpout.ack when toCommit i...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/2639
  
    Still +1. Thanks @arunmahadevan 


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187496374
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -403,50 +274,31 @@ public void ack(Object msgId) {
          * <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
          */
         @Override
    -    public void fail(Object msgId) {
    +    public void fail(final Object msgId) {
             LOG.warn("Message failed: " + msgId);
    -        this.pendingMessages.clear();
    -        this.toCommit.clear();
    -        synchronized (this.recoveryMutex) {
    -            this.hasFailures = true;
    -        }
    +        messageHandler.fail(msgId);
         }
     
         /**
    -     * Use the {@link #tupleProducer} to determine which fields are about to be emitted.
    +     * Use the {@link #tupleProducer} to determine which fields are about
    +     * to be emitted.
          *
    -     * <p>Note that {@link #nextTuple()} always emits to the default stream, and thus only fields declared
    -     * for this stream are used.
    +     * <p>Note that {@link #nextTuple()} always emits to the default stream,
    +     * and thus only fields declared for this stream are used.
          */
         @Override
    -    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    public void declareOutputFields(final OutputFieldsDeclarer declarer) {
             this.tupleProducer.declareOutputFields(declarer);
     
         }
     
         /**
    -     * Returns <code>true</code> if the spout has received failures from which it has not yet recovered.
    -     *
    -     * @return {@code true} if there were failures, {@code false} otherwise.
    -     */
    -    public boolean hasFailures() {
    -        return this.hasFailures;
    -    }
    -
    -    /**
    -     * Marks a healthy session state.
    -     */
    -    protected void recovered() {
    -        this.hasFailures = false;
    -    }
    -
    -    /**
    -     * Sets the periodicity of the timer task that checks for failures and recovers the JMS session.
    +     * Sets the periodicity of the timer task that
    +     * checks for failures and recovers the JMS session.
          *
          * @param period desired wait period
          */
    -    public void setRecoveryPeriodMs(long period) {
    -        this.recoveryPeriodMs = period;
    +    public void setRecoveryPeriodMs(final long period) {
    --- End diff --
    
    I just left it since its breaking the public API and if someone is using this in their code.


---

[GitHub] storm issue #2639: STORM-3035: fix the issue in JmsSpout.ack when toCommit i...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/2639
  
    This started out as a fix to handle the exceptions in "ack" when toCommit was empty. However during the review process and testing, figured out many more issues with the current approach. Also some JMS providers like Tibco supports ACK ing individual messages, which could not be handled with the existing code. The async mode of consuming the messages was also problematic to ensure at-least once delivery even with locks/synchronization since ack-ing an individual JMS message in CLIENT_ACK mode was going to ack the messages received in the listener (even if the listener did not return).
    
    To handle all the issues, I have refactored quite and bit and changed the approach of consuming the messages from async (listener based) to sync (receive) and introduced MessageHandlers to handle the emit/ack/fail in different ways based on the mode.
    
    @HeartSaVioR , can you review it again and let me know what you think?


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r183233219
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -339,26 +339,26 @@ public void nextTuple() {
          */
         @Override
         public void ack(Object msgId) {
    -
             Message msg = this.pendingMessages.remove(msgId);
    -        JmsMessageID oldest = this.toCommit.first();
    -        if (msgId.equals(oldest)) {
    -            if (msg != null) {
    -                try {
    -                    LOG.debug("Committing...");
    -                    msg.acknowledge();
    -                    LOG.debug("JMS Message acked: " + msgId);
    -                    this.toCommit.remove(msgId);
    -                } catch (JMSException e) {
    -                    LOG.warn("Error acknowldging JMS message: " + msgId, e);
    +        if (!toCommit.isEmpty()) {
    --- End diff --
    
    Might be better to leave the log message which messages are dropped in `fail()` too.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187487737
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -262,42 +189,26 @@ public void onMessage(Message msg) {
          * topic/queue.
          */
         @Override
    -    public void open(Map<String, Object> conf,
    -                     TopologyContext context,
    -                     SpoutOutputCollector collector) {
    +    public void open(final Map<String, Object> conf,
    +                     final TopologyContext context,
    +                     final SpoutOutputCollector spoutOutputCollector) {
     
    -        if (this.jmsProvider == null) {
    -            throw new IllegalStateException("JMS provider has not been set.");
    -        }
    -        if (this.tupleProducer == null) {
    -            throw new IllegalStateException("JMS Tuple Producer has not been set.");
    +        if (jmsProvider == null) {
    +            throw new IllegalStateException(
    +                "JMS provider has not been set.");
             }
    -        // TODO get the default value from storm instead of hard coding 30 secs
    -        Long topologyTimeout =
    -            ((Number) conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
    -        if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > this.recoveryPeriodMs) {
    -            LOG.warn("*** WARNING *** : "
    -                     + "Recovery period (" + this.recoveryPeriodMs + " ms.) is less then the configured "
    -                     + "'topology.message.timeout.secs' of " + topologyTimeout
    -                     + " secs. This could lead to a message replay flood!");
    +        if (tupleProducer == null) {
    +            throw new IllegalStateException(
    +                "JMS Tuple Producer has not been set.");
             }
    -        this.queue = new LinkedBlockingQueue<Message>();
    -        this.toCommit = new TreeSet<JmsMessageID>();
    -        this.pendingMessages = new HashMap<JmsMessageID, Message>();
    -        this.collector = collector;
    +        collector = spoutOutputCollector;
             try {
    -            ConnectionFactory cf = this.jmsProvider.connectionFactory();
    -            Destination dest = this.jmsProvider.destination();
    -            this.connection = cf.createConnection();
    -            this.session = connection.createSession(false, this.jmsAcknowledgeMode);
    -            MessageConsumer consumer = session.createConsumer(dest);
    -            consumer.setMessageListener(this);
    -            this.connection.start();
    -            if (this.isDurableSubscription() && this.recoveryPeriodMs > 0) {
    -                this.recoveryTimer = new Timer();
    -                this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), RECOVERY_DELAY_MS, this.recoveryPeriodMs);
    -            }
    -
    +            ConnectionFactory cf = jmsProvider.connectionFactory();
    +            Destination dest = jmsProvider.destination();
    +            connection = cf.createConnection();
    +            session = messageHandler.createSession(connection);
    --- End diff --
    
    We may want to consider the case: users provide mode which is not in JMS standard, and also setIndividualAck() is not called.
    
    Now the case is handled as same as AUTO_ACKNOWLEDGE because of providing default value of messageHandler. Is it intended? We prevented the case with IllegalArgumentException.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187484768
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -18,164 +18,124 @@
     
     package org.apache.storm.jms.spout;
     
    -import java.io.Serializable;
    -import java.util.HashMap;
    -import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
    -import java.util.TreeSet;
    -import java.util.concurrent.LinkedBlockingQueue;
    -import java.util.concurrent.TimeUnit;
    -import javax.jms.Connection;
    -import javax.jms.ConnectionFactory;
    -import javax.jms.Destination;
    -import javax.jms.JMSException;
    -import javax.jms.Message;
    -import javax.jms.MessageConsumer;
    -import javax.jms.MessageListener;
    -import javax.jms.Session;
    -import org.apache.storm.Config;
     import org.apache.storm.jms.JmsProvider;
     import org.apache.storm.jms.JmsTupleProducer;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    +import javax.jms.Connection;
    --- End diff --
    
    Reorganizing imports happens now, and it looks like one of checkstyle violation.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187495978
  
    --- Diff: external/storm-jms/pom.xml ---
    @@ -94,7 +94,7 @@
                     <artifactId>maven-checkstyle-plugin</artifactId>
                     <!--Note - the version would be inherited-->
                     <configuration>
    -                    <maxAllowedViolations>63</maxAllowedViolations>
    +                    <maxAllowedViolations>73</maxAllowedViolations>
    --- End diff --
    
    Actually I can see more violations in JMSSpout.java in master branch (43) vs the patch (2). I am not sure why. Will fix it anyways.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187485457
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -18,164 +18,124 @@
     
     package org.apache.storm.jms.spout;
     
    -import java.io.Serializable;
    -import java.util.HashMap;
    -import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
    -import java.util.TreeSet;
    -import java.util.concurrent.LinkedBlockingQueue;
    -import java.util.concurrent.TimeUnit;
    -import javax.jms.Connection;
    -import javax.jms.ConnectionFactory;
    -import javax.jms.Destination;
    -import javax.jms.JMSException;
    -import javax.jms.Message;
    -import javax.jms.MessageConsumer;
    -import javax.jms.MessageListener;
    -import javax.jms.Session;
    -import org.apache.storm.Config;
     import org.apache.storm.jms.JmsProvider;
     import org.apache.storm.jms.JmsTupleProducer;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    +import javax.jms.Connection;
    +import javax.jms.ConnectionFactory;
    +import javax.jms.Destination;
    +import javax.jms.JMSException;
    +import javax.jms.Message;
    +import javax.jms.MessageConsumer;
    +import javax.jms.Session;
    +import java.util.HashMap;
    +import java.util.Map;
    +
     
     /**
    - * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue and outputs tuples based on the messages it receives.
    + * A Storm <code>Spout</code> implementation that listens to a JMS topic or
    + * queue and outputs tuples based on the messages it receives.
      *
      * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
      * implementations to obtain the JMS
      * <code>ConnectionFactory</code> and <code>Destination</code> objects necessary
      * to connect to a JMS topic/queue.
      *
    - * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to an
    - * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the incoming message.
    + * <p>When a {@code JmsSpout} receives a JMS message, it delegates to an
    + * internal {@code JmsTupleProducer} instance to create a Storm tuple from
    + * the incoming message.
      *
      * <p>Typically, developers will supply a custom <code>JmsTupleProducer</code>
      * implementation appropriate for the expected message content.
      */
     @SuppressWarnings("serial")
    -public class JmsSpout extends BaseRichSpout implements MessageListener {
    +public class JmsSpout extends BaseRichSpout {
     
    -    /**
    -     * The logger object instance for this class.
    -     */
    +    /** The logger object instance for this class. */
         private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
     
    -    /**
    -     * The logger of the recovery task.
    -     */
    -    private static final Logger RECOVERY_TASK_LOG = LoggerFactory.getLogger(RecoveryTask.class);
    -
    -    /**
    -     * Time to sleep between queue polling attempts.
    -     */
    +    /** Time to sleep between queue polling attempts. */
         private static final int POLL_INTERVAL_MS = 50;
     
    -    /**
    -     * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
    -     */
    -    private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
    -
    -    /**
    -     * Time to wait before queuing the first recovery task.
    -     */
    -    private static final int RECOVERY_DELAY_MS = 10;
    -    /**
    -     * Used to safely recover failed JMS sessions across instances.
    -     */
    -    private final Serializable recoveryMutex = "RECOVERY_MUTEX";
         /**
          * The acknowledgment mode used for this instance.
          *
          * @see Session
          */
         private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
    -    /**
    -     * Indicates whether or not this spout should run as a singleton.
    -     */
    +
    +    /** Sets up the way we want to handle the emit, ack and fails. */
    +    private transient MessageHandler messageHandler = new MessageHandler();
    --- End diff --
    
    This must not be `transient`, because assigning `messageHandler` happens before serialization and Spout will lose the assigned value when deserialization happens. Are any of implementations of MessageHandler non-serializable?


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r183233200
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -339,26 +339,26 @@ public void nextTuple() {
          */
         @Override
         public void ack(Object msgId) {
    -
             Message msg = this.pendingMessages.remove(msgId);
    -        JmsMessageID oldest = this.toCommit.first();
    -        if (msgId.equals(oldest)) {
    -            if (msg != null) {
    -                try {
    -                    LOG.debug("Committing...");
    -                    msg.acknowledge();
    -                    LOG.debug("JMS Message acked: " + msgId);
    -                    this.toCommit.remove(msgId);
    -                } catch (JMSException e) {
    -                    LOG.warn("Error acknowldging JMS message: " + msgId, e);
    +        if (!toCommit.isEmpty()) {
    --- End diff --
    
    Could we leave the log message (at least DEBUG) so that we can see which messages are ignored while acking due to previous failure?


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187496121
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -18,164 +18,124 @@
     
     package org.apache.storm.jms.spout;
     
    -import java.io.Serializable;
    -import java.util.HashMap;
    -import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
    -import java.util.TreeSet;
    -import java.util.concurrent.LinkedBlockingQueue;
    -import java.util.concurrent.TimeUnit;
    -import javax.jms.Connection;
    -import javax.jms.ConnectionFactory;
    -import javax.jms.Destination;
    -import javax.jms.JMSException;
    -import javax.jms.Message;
    -import javax.jms.MessageConsumer;
    -import javax.jms.MessageListener;
    -import javax.jms.Session;
    -import org.apache.storm.Config;
     import org.apache.storm.jms.JmsProvider;
     import org.apache.storm.jms.JmsTupleProducer;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    +import javax.jms.Connection;
    +import javax.jms.ConnectionFactory;
    +import javax.jms.Destination;
    +import javax.jms.JMSException;
    +import javax.jms.Message;
    +import javax.jms.MessageConsumer;
    +import javax.jms.Session;
    +import java.util.HashMap;
    +import java.util.Map;
    +
     
     /**
    - * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue and outputs tuples based on the messages it receives.
    + * A Storm <code>Spout</code> implementation that listens to a JMS topic or
    + * queue and outputs tuples based on the messages it receives.
      *
      * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
      * implementations to obtain the JMS
      * <code>ConnectionFactory</code> and <code>Destination</code> objects necessary
      * to connect to a JMS topic/queue.
      *
    - * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to an
    - * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the incoming message.
    + * <p>When a {@code JmsSpout} receives a JMS message, it delegates to an
    + * internal {@code JmsTupleProducer} instance to create a Storm tuple from
    + * the incoming message.
      *
      * <p>Typically, developers will supply a custom <code>JmsTupleProducer</code>
      * implementation appropriate for the expected message content.
      */
     @SuppressWarnings("serial")
    -public class JmsSpout extends BaseRichSpout implements MessageListener {
    +public class JmsSpout extends BaseRichSpout {
     
    -    /**
    -     * The logger object instance for this class.
    -     */
    +    /** The logger object instance for this class. */
         private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
     
    -    /**
    -     * The logger of the recovery task.
    -     */
    -    private static final Logger RECOVERY_TASK_LOG = LoggerFactory.getLogger(RecoveryTask.class);
    -
    -    /**
    -     * Time to sleep between queue polling attempts.
    -     */
    +    /** Time to sleep between queue polling attempts. */
         private static final int POLL_INTERVAL_MS = 50;
     
    -    /**
    -     * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
    -     */
    -    private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
    -
    -    /**
    -     * Time to wait before queuing the first recovery task.
    -     */
    -    private static final int RECOVERY_DELAY_MS = 10;
    -    /**
    -     * Used to safely recover failed JMS sessions across instances.
    -     */
    -    private final Serializable recoveryMutex = "RECOVERY_MUTEX";
         /**
          * The acknowledgment mode used for this instance.
          *
          * @see Session
          */
         private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
    -    /**
    -     * Indicates whether or not this spout should run as a singleton.
    -     */
    +
    +    /** Sets up the way we want to handle the emit, ack and fails. */
    +    private transient MessageHandler messageHandler = new MessageHandler();
    +
    +    /** Indicates whether or not this spout should run as a singleton. */
         private boolean distributed = true;
    -    /**
    -     * Used to generate tuples from incoming messages.
    -     */
    +
    +    /** Used to generate tuples from incoming messages. */
         private JmsTupleProducer tupleProducer;
    -    /**
    -     * Encapsulates jms related classes needed to communicate with the mq.
    -     */
    +
    +    /** Encapsulates jms related classes needed to communicate with the mq. */
         private JmsProvider jmsProvider;
    -    /**
    -     * Stores incoming messages for later sending.
    -     */
    -    private LinkedBlockingQueue<Message> queue;
    -    /**
    -     * Contains all message ids of messages that were not yet acked.
    -     */
    -    private TreeSet<JmsMessageID> toCommit;
    -    /**
    -     * Maps between message ids of not-yet acked messages, and the messages.
    -     */
    -    private HashMap<JmsMessageID, Message> pendingMessages;
    -    /**
    -     * Counter of handled messages.
    -     */
    +
    +    /** Counter of handled messages. */
         private long messageSequence = 0;
    -    /**
    -     * The collector used to emit tuples.
    -     */
    +
    +    /** The collector used to emit tuples. */
         private SpoutOutputCollector collector;
    -    /**
    -     * Connection to the jms queue.
    -     */
    +
    +    /** Connection to the jms queue. */
         private transient Connection connection;
    -    /**
    -     * The active jms session.
    -     */
    +
    +    /** The active jms session. */
         private transient Session session;
    -    /**
    -     * Indicates whether or not a message failed to be processed.
    -     */
    -    private boolean hasFailures = false;
    -    /**
    -     * Schedules recovery tasks periodically.
    -     */
    -    private Timer recoveryTimer = null;
     
         /**
    -     * Time to wait between recovery attempts.
    +     * The message consumer.
          */
    -    private long recoveryPeriodMs = -1; // default to disabled
    +    private MessageConsumer consumer;
     
         /**
    -     * Translate the {@code int} value of an acknowledgment to a {@code String}.
    +     * Sets the JMS Session acknowledgement mode for the JMS session.
          *
    -     * @param deliveryMode the mode to translate.
    -     * @return its {@code String} explanation (name).
    +     * <p>Possible values:
    +     * <ul>
    +     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
    +     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
    +     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
    +     * </ul>
          *
    -     * @see Session
    +     * Any other vendor specific modes are not supported.
    +     *
    +     * @param mode JMS Session Acknowledgement mode
          */
    -    private static String toDeliveryModeString(int deliveryMode) {
    -        switch (deliveryMode) {
    +    public void setJmsAcknowledgeMode(final int mode) {
    +        switch (mode) {
                 case Session.AUTO_ACKNOWLEDGE:
    -                return "AUTO_ACKNOWLEDGE";
    -            case Session.CLIENT_ACKNOWLEDGE:
    -                return "CLIENT_ACKNOWLEDGE";
                 case Session.DUPS_OK_ACKNOWLEDGE:
    -                return "DUPS_OK_ACKNOWLEDGE";
    +                messageHandler = new MessageHandler();
    +                break;
    +            case Session.CLIENT_ACKNOWLEDGE:
    +                messageHandler = new ClientAckHandler();
    +                break;
    +            case Session.SESSION_TRANSACTED:
    +                messageHandler = new TransactedSessionMessageHandler();
    +                break;
                 default:
    -                return "UNKNOWN";
    -
    +                LOG.warn("Unsupported Acknowledge mode: "
    --- End diff --
    
    Yes. Its the only way to support provider extensions.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187500706
  
    --- Diff: external/storm-jms/pom.xml ---
    @@ -94,7 +94,7 @@
                     <artifactId>maven-checkstyle-plugin</artifactId>
                     <!--Note - the version would be inherited-->
                     <configuration>
    -                    <maxAllowedViolations>63</maxAllowedViolations>
    +                    <maxAllowedViolations>73</maxAllowedViolations>
    --- End diff --
    
    I cant figure out why checkstyle keeps complaining. Running "mvn checkstyle:check" doesnt throw any warnings for JMSSpout.java. I have set it to 64 for the build to pass.
    
    And the rules we have seems too restrictive. (Java-doc for each variable/method and line length of 80). Should probably relook so that its not a time waste.


---

[GitHub] storm issue #2639: STORM-3035: fix the issue in JmsSpout.ack when toCommit i...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/2639
  
    Looks good to me. +1
    
    I was wondering why you removed the `distributed` flag, and realized it was never properly implemented! For it to work, we would need the following method override:
    
    ```java
        @Override
        public Map<String, Object> getComponentConfiguration() {
            if(!_isDistributed) {
                Map<String, Object> ret = new HashMap<String, Object>();
                ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
                return ret;
            } else {
                return null;
            }
        }    
    ```
    
    I can see cases where that flag would be useful, for example when connecting to a topic vs. a queue. We may want to leave the flag there and fix the override.


---

[GitHub] storm issue #2639: STORM-3035: fix the issue in JmsSpout.ack when toCommit i...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/2639
  
    @HeartSaVioR , please check it again. 


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187486786
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -18,164 +18,124 @@
     
     package org.apache.storm.jms.spout;
     
    -import java.io.Serializable;
    -import java.util.HashMap;
    -import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
    -import java.util.TreeSet;
    -import java.util.concurrent.LinkedBlockingQueue;
    -import java.util.concurrent.TimeUnit;
    -import javax.jms.Connection;
    -import javax.jms.ConnectionFactory;
    -import javax.jms.Destination;
    -import javax.jms.JMSException;
    -import javax.jms.Message;
    -import javax.jms.MessageConsumer;
    -import javax.jms.MessageListener;
    -import javax.jms.Session;
    -import org.apache.storm.Config;
     import org.apache.storm.jms.JmsProvider;
     import org.apache.storm.jms.JmsTupleProducer;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    +import javax.jms.Connection;
    +import javax.jms.ConnectionFactory;
    +import javax.jms.Destination;
    +import javax.jms.JMSException;
    +import javax.jms.Message;
    +import javax.jms.MessageConsumer;
    +import javax.jms.Session;
    +import java.util.HashMap;
    +import java.util.Map;
    +
     
     /**
    - * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue and outputs tuples based on the messages it receives.
    + * A Storm <code>Spout</code> implementation that listens to a JMS topic or
    + * queue and outputs tuples based on the messages it receives.
      *
      * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
      * implementations to obtain the JMS
      * <code>ConnectionFactory</code> and <code>Destination</code> objects necessary
      * to connect to a JMS topic/queue.
      *
    - * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to an
    - * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the incoming message.
    + * <p>When a {@code JmsSpout} receives a JMS message, it delegates to an
    + * internal {@code JmsTupleProducer} instance to create a Storm tuple from
    + * the incoming message.
      *
      * <p>Typically, developers will supply a custom <code>JmsTupleProducer</code>
      * implementation appropriate for the expected message content.
      */
     @SuppressWarnings("serial")
    -public class JmsSpout extends BaseRichSpout implements MessageListener {
    +public class JmsSpout extends BaseRichSpout {
     
    -    /**
    -     * The logger object instance for this class.
    -     */
    +    /** The logger object instance for this class. */
         private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
     
    -    /**
    -     * The logger of the recovery task.
    -     */
    -    private static final Logger RECOVERY_TASK_LOG = LoggerFactory.getLogger(RecoveryTask.class);
    -
    -    /**
    -     * Time to sleep between queue polling attempts.
    -     */
    +    /** Time to sleep between queue polling attempts. */
         private static final int POLL_INTERVAL_MS = 50;
     
    -    /**
    -     * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
    -     */
    -    private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
    -
    -    /**
    -     * Time to wait before queuing the first recovery task.
    -     */
    -    private static final int RECOVERY_DELAY_MS = 10;
    -    /**
    -     * Used to safely recover failed JMS sessions across instances.
    -     */
    -    private final Serializable recoveryMutex = "RECOVERY_MUTEX";
         /**
          * The acknowledgment mode used for this instance.
          *
          * @see Session
          */
         private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
    -    /**
    -     * Indicates whether or not this spout should run as a singleton.
    -     */
    +
    +    /** Sets up the way we want to handle the emit, ack and fails. */
    +    private transient MessageHandler messageHandler = new MessageHandler();
    +
    +    /** Indicates whether or not this spout should run as a singleton. */
         private boolean distributed = true;
    -    /**
    -     * Used to generate tuples from incoming messages.
    -     */
    +
    +    /** Used to generate tuples from incoming messages. */
         private JmsTupleProducer tupleProducer;
    -    /**
    -     * Encapsulates jms related classes needed to communicate with the mq.
    -     */
    +
    +    /** Encapsulates jms related classes needed to communicate with the mq. */
         private JmsProvider jmsProvider;
    -    /**
    -     * Stores incoming messages for later sending.
    -     */
    -    private LinkedBlockingQueue<Message> queue;
    -    /**
    -     * Contains all message ids of messages that were not yet acked.
    -     */
    -    private TreeSet<JmsMessageID> toCommit;
    -    /**
    -     * Maps between message ids of not-yet acked messages, and the messages.
    -     */
    -    private HashMap<JmsMessageID, Message> pendingMessages;
    -    /**
    -     * Counter of handled messages.
    -     */
    +
    +    /** Counter of handled messages. */
         private long messageSequence = 0;
    -    /**
    -     * The collector used to emit tuples.
    -     */
    +
    +    /** The collector used to emit tuples. */
         private SpoutOutputCollector collector;
    -    /**
    -     * Connection to the jms queue.
    -     */
    +
    +    /** Connection to the jms queue. */
         private transient Connection connection;
    -    /**
    -     * The active jms session.
    -     */
    +
    +    /** The active jms session. */
         private transient Session session;
    -    /**
    -     * Indicates whether or not a message failed to be processed.
    -     */
    -    private boolean hasFailures = false;
    -    /**
    -     * Schedules recovery tasks periodically.
    -     */
    -    private Timer recoveryTimer = null;
     
         /**
    -     * Time to wait between recovery attempts.
    +     * The message consumer.
          */
    -    private long recoveryPeriodMs = -1; // default to disabled
    +    private MessageConsumer consumer;
     
         /**
    -     * Translate the {@code int} value of an acknowledgment to a {@code String}.
    +     * Sets the JMS Session acknowledgement mode for the JMS session.
          *
    -     * @param deliveryMode the mode to translate.
    -     * @return its {@code String} explanation (name).
    +     * <p>Possible values:
    +     * <ul>
    +     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
    +     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
    +     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
    +     * </ul>
          *
    -     * @see Session
    +     * Any other vendor specific modes are not supported.
    +     *
    +     * @param mode JMS Session Acknowledgement mode
          */
    -    private static String toDeliveryModeString(int deliveryMode) {
    -        switch (deliveryMode) {
    +    public void setJmsAcknowledgeMode(final int mode) {
    +        switch (mode) {
                 case Session.AUTO_ACKNOWLEDGE:
    -                return "AUTO_ACKNOWLEDGE";
    -            case Session.CLIENT_ACKNOWLEDGE:
    -                return "CLIENT_ACKNOWLEDGE";
                 case Session.DUPS_OK_ACKNOWLEDGE:
    -                return "DUPS_OK_ACKNOWLEDGE";
    +                messageHandler = new MessageHandler();
    +                break;
    +            case Session.CLIENT_ACKNOWLEDGE:
    +                messageHandler = new ClientAckHandler();
    +                break;
    +            case Session.SESSION_TRANSACTED:
    +                messageHandler = new TransactedSessionMessageHandler();
    +                break;
                 default:
    -                return "UNKNOWN";
    -
    +                LOG.warn("Unsupported Acknowledge mode: "
    --- End diff --
    
    So we are allowing non-JMS standard mode to parameter, right? I agree this is needed to support specific JMS implementation which **extends** session acknowledge mode.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187484538
  
    --- Diff: external/storm-jms/pom.xml ---
    @@ -94,7 +94,7 @@
                     <artifactId>maven-checkstyle-plugin</artifactId>
                     <!--Note - the version would be inherited-->
                     <configuration>
    -                    <maxAllowedViolations>63</maxAllowedViolations>
    +                    <maxAllowedViolations>73</maxAllowedViolations>
    --- End diff --
    
    We should not increase the number, decrease or no change is only allowed. Please fix checkstyle violations based on checkstyle report and keep the number.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187496258
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -262,42 +189,26 @@ public void onMessage(Message msg) {
          * topic/queue.
          */
         @Override
    -    public void open(Map<String, Object> conf,
    -                     TopologyContext context,
    -                     SpoutOutputCollector collector) {
    +    public void open(final Map<String, Object> conf,
    +                     final TopologyContext context,
    +                     final SpoutOutputCollector spoutOutputCollector) {
     
    -        if (this.jmsProvider == null) {
    -            throw new IllegalStateException("JMS provider has not been set.");
    -        }
    -        if (this.tupleProducer == null) {
    -            throw new IllegalStateException("JMS Tuple Producer has not been set.");
    +        if (jmsProvider == null) {
    +            throw new IllegalStateException(
    +                "JMS provider has not been set.");
             }
    -        // TODO get the default value from storm instead of hard coding 30 secs
    -        Long topologyTimeout =
    -            ((Number) conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
    -        if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > this.recoveryPeriodMs) {
    -            LOG.warn("*** WARNING *** : "
    -                     + "Recovery period (" + this.recoveryPeriodMs + " ms.) is less then the configured "
    -                     + "'topology.message.timeout.secs' of " + topologyTimeout
    -                     + " secs. This could lead to a message replay flood!");
    +        if (tupleProducer == null) {
    +            throw new IllegalStateException(
    +                "JMS Tuple Producer has not been set.");
             }
    -        this.queue = new LinkedBlockingQueue<Message>();
    -        this.toCommit = new TreeSet<JmsMessageID>();
    -        this.pendingMessages = new HashMap<JmsMessageID, Message>();
    -        this.collector = collector;
    +        collector = spoutOutputCollector;
             try {
    -            ConnectionFactory cf = this.jmsProvider.connectionFactory();
    -            Destination dest = this.jmsProvider.destination();
    -            this.connection = cf.createConnection();
    -            this.session = connection.createSession(false, this.jmsAcknowledgeMode);
    -            MessageConsumer consumer = session.createConsumer(dest);
    -            consumer.setMessageListener(this);
    -            this.connection.start();
    -            if (this.isDurableSubscription() && this.recoveryPeriodMs > 0) {
    -                this.recoveryTimer = new Timer();
    -                this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), RECOVERY_DELAY_MS, this.recoveryPeriodMs);
    -            }
    -
    +            ConnectionFactory cf = jmsProvider.connectionFactory();
    +            Destination dest = jmsProvider.destination();
    +            connection = cf.createConnection();
    +            session = messageHandler.createSession(connection);
    --- End diff --
    
    We can throw an exception if `setIndividualAck` is invoked and the ACK mode is still the standard ones.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r183531789
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -339,26 +339,26 @@ public void nextTuple() {
          */
         @Override
         public void ack(Object msgId) {
    -
             Message msg = this.pendingMessages.remove(msgId);
    -        JmsMessageID oldest = this.toCommit.first();
    -        if (msgId.equals(oldest)) {
    -            if (msg != null) {
    -                try {
    -                    LOG.debug("Committing...");
    -                    msg.acknowledge();
    -                    LOG.debug("JMS Message acked: " + msgId);
    -                    this.toCommit.remove(msgId);
    -                } catch (JMSException e) {
    -                    LOG.warn("Error acknowldging JMS message: " + msgId, e);
    +        if (!toCommit.isEmpty()) {
    +            JmsMessageID oldest = this.toCommit.first();
    +            if (msgId.equals(oldest)) {
    +                if (msg != null) {
    +                    try {
    +                        LOG.debug("Committing...");
    +                        msg.acknowledge();
    --- End diff --
    
    This piece of code was already there. I am guessing its based on the JMS acknowledgement mode.
    
    See - https://docs.oracle.com/cd/E19798-01/821-1841/bncfw/index.html
    
    In `Session.CLIENT_ACKNOWLEDGE` - Acknowledging a consumed message automatically acknowledges the receipt of all messages that have been consumed by its session, so this logic seems fine. 
    
    The spout is ignoring Auto acknowledgement mode, but I am not sure about the other modes like `DUPS_OK_ACKNOWLEDGE` or `SESSION_TRANSACTED` work. cc @ptgoetz who might have more context around this.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r183233537
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -339,26 +339,26 @@ public void nextTuple() {
          */
         @Override
         public void ack(Object msgId) {
    -
             Message msg = this.pendingMessages.remove(msgId);
    -        JmsMessageID oldest = this.toCommit.first();
    -        if (msgId.equals(oldest)) {
    -            if (msg != null) {
    -                try {
    -                    LOG.debug("Committing...");
    -                    msg.acknowledge();
    -                    LOG.debug("JMS Message acked: " + msgId);
    -                    this.toCommit.remove(msgId);
    -                } catch (JMSException e) {
    -                    LOG.warn("Error acknowldging JMS message: " + msgId, e);
    +        if (!toCommit.isEmpty()) {
    +            JmsMessageID oldest = this.toCommit.first();
    +            if (msgId.equals(oldest)) {
    +                if (msg != null) {
    +                    try {
    +                        LOG.debug("Committing...");
    +                        msg.acknowledge();
    --- End diff --
    
    I'm sorry I'm not familiar with JMS, but could you explain how this approach guarantee ack are done for all messages? Looks like it just removes the message from `toCommit` if late ack messages come earlier.


---

[GitHub] storm issue #2639: STORM-3035: fix the issue in JmsSpout.ack when toCommit i...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/2639
  
    @ptgoetz , thanks for the suggestion.
    
    Yes, it was clear why the distributed flag was needed when it was not used anywhere else in the code. I was hoping that the user will set the right parallelism while consuming from topics. I can add it back.


---

[GitHub] storm issue #2639: STORM-3035: fix the issue in JmsSpout.ack when toCommit i...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/2639
  
    @ptgoetz , can you take a look again?


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187496062
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -18,164 +18,124 @@
     
     package org.apache.storm.jms.spout;
     
    -import java.io.Serializable;
    -import java.util.HashMap;
    -import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
    -import java.util.TreeSet;
    -import java.util.concurrent.LinkedBlockingQueue;
    -import java.util.concurrent.TimeUnit;
    -import javax.jms.Connection;
    -import javax.jms.ConnectionFactory;
    -import javax.jms.Destination;
    -import javax.jms.JMSException;
    -import javax.jms.Message;
    -import javax.jms.MessageConsumer;
    -import javax.jms.MessageListener;
    -import javax.jms.Session;
    -import org.apache.storm.Config;
     import org.apache.storm.jms.JmsProvider;
     import org.apache.storm.jms.JmsTupleProducer;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    +import javax.jms.Connection;
    +import javax.jms.ConnectionFactory;
    +import javax.jms.Destination;
    +import javax.jms.JMSException;
    +import javax.jms.Message;
    +import javax.jms.MessageConsumer;
    +import javax.jms.Session;
    +import java.util.HashMap;
    +import java.util.Map;
    +
     
     /**
    - * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue and outputs tuples based on the messages it receives.
    + * A Storm <code>Spout</code> implementation that listens to a JMS topic or
    + * queue and outputs tuples based on the messages it receives.
      *
      * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
      * implementations to obtain the JMS
      * <code>ConnectionFactory</code> and <code>Destination</code> objects necessary
      * to connect to a JMS topic/queue.
      *
    - * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to an
    - * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the incoming message.
    + * <p>When a {@code JmsSpout} receives a JMS message, it delegates to an
    + * internal {@code JmsTupleProducer} instance to create a Storm tuple from
    + * the incoming message.
      *
      * <p>Typically, developers will supply a custom <code>JmsTupleProducer</code>
      * implementation appropriate for the expected message content.
      */
     @SuppressWarnings("serial")
    -public class JmsSpout extends BaseRichSpout implements MessageListener {
    +public class JmsSpout extends BaseRichSpout {
     
    -    /**
    -     * The logger object instance for this class.
    -     */
    +    /** The logger object instance for this class. */
         private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
     
    -    /**
    -     * The logger of the recovery task.
    -     */
    -    private static final Logger RECOVERY_TASK_LOG = LoggerFactory.getLogger(RecoveryTask.class);
    -
    -    /**
    -     * Time to sleep between queue polling attempts.
    -     */
    +    /** Time to sleep between queue polling attempts. */
         private static final int POLL_INTERVAL_MS = 50;
     
    -    /**
    -     * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
    -     */
    -    private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
    -
    -    /**
    -     * Time to wait before queuing the first recovery task.
    -     */
    -    private static final int RECOVERY_DELAY_MS = 10;
    -    /**
    -     * Used to safely recover failed JMS sessions across instances.
    -     */
    -    private final Serializable recoveryMutex = "RECOVERY_MUTEX";
         /**
          * The acknowledgment mode used for this instance.
          *
          * @see Session
          */
         private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
    -    /**
    -     * Indicates whether or not this spout should run as a singleton.
    -     */
    +
    +    /** Sets up the way we want to handle the emit, ack and fails. */
    +    private transient MessageHandler messageHandler = new MessageHandler();
    --- End diff --
    
    Good catch. Yes I should make the MessageHandler serializable.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187554061
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -403,50 +274,31 @@ public void ack(Object msgId) {
          * <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
          */
         @Override
    -    public void fail(Object msgId) {
    +    public void fail(final Object msgId) {
             LOG.warn("Message failed: " + msgId);
    -        this.pendingMessages.clear();
    -        this.toCommit.clear();
    -        synchronized (this.recoveryMutex) {
    -            this.hasFailures = true;
    -        }
    +        messageHandler.fail(msgId);
         }
     
         /**
    -     * Use the {@link #tupleProducer} to determine which fields are about to be emitted.
    +     * Use the {@link #tupleProducer} to determine which fields are about
    +     * to be emitted.
          *
    -     * <p>Note that {@link #nextTuple()} always emits to the default stream, and thus only fields declared
    -     * for this stream are used.
    +     * <p>Note that {@link #nextTuple()} always emits to the default stream,
    +     * and thus only fields declared for this stream are used.
          */
         @Override
    -    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    public void declareOutputFields(final OutputFieldsDeclarer declarer) {
             this.tupleProducer.declareOutputFields(declarer);
     
         }
     
         /**
    -     * Returns <code>true</code> if the spout has received failures from which it has not yet recovered.
    -     *
    -     * @return {@code true} if there were failures, {@code false} otherwise.
    -     */
    -    public boolean hasFailures() {
    -        return this.hasFailures;
    -    }
    -
    -    /**
    -     * Marks a healthy session state.
    -     */
    -    protected void recovered() {
    -        this.hasFailures = false;
    -    }
    -
    -    /**
    -     * Sets the periodicity of the timer task that checks for failures and recovers the JMS session.
    +     * Sets the periodicity of the timer task that
    +     * checks for failures and recovers the JMS session.
          *
          * @param period desired wait period
          */
    -    public void setRecoveryPeriodMs(long period) {
    -        this.recoveryPeriodMs = period;
    +    public void setRecoveryPeriodMs(final long period) {
    --- End diff --
    
    Could we have patch for 1.x-branch with deprecating the method? I'm OK to remove the method in 2.0.0, but you're right for 1.x.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187555046
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -127,8 +130,14 @@ public void setJmsAcknowledgeMode(final int mode) {
                     messageHandler = new TransactedSessionMessageHandler();
                     break;
                 default:
    -                LOG.warn("Unsupported Acknowledge mode: "
    -                    + mode + " (See javax.jms.Session for valid values)");
    +                // individual message ack-ing needs vendor specific mode
    +                if (individualAcks) {
    --- End diff --
    
    This change requires end users to call individualAcks earlier than this method. Could we guide this in javadoc or in exception message? Or can we fix it? (Maybe it defers verification step in open() so violating fail-fast...)


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187488104
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -403,50 +274,31 @@ public void ack(Object msgId) {
          * <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
          */
         @Override
    -    public void fail(Object msgId) {
    +    public void fail(final Object msgId) {
             LOG.warn("Message failed: " + msgId);
    -        this.pendingMessages.clear();
    -        this.toCommit.clear();
    -        synchronized (this.recoveryMutex) {
    -            this.hasFailures = true;
    -        }
    +        messageHandler.fail(msgId);
         }
     
         /**
    -     * Use the {@link #tupleProducer} to determine which fields are about to be emitted.
    +     * Use the {@link #tupleProducer} to determine which fields are about
    +     * to be emitted.
          *
    -     * <p>Note that {@link #nextTuple()} always emits to the default stream, and thus only fields declared
    -     * for this stream are used.
    +     * <p>Note that {@link #nextTuple()} always emits to the default stream,
    +     * and thus only fields declared for this stream are used.
          */
         @Override
    -    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    public void declareOutputFields(final OutputFieldsDeclarer declarer) {
             this.tupleProducer.declareOutputFields(declarer);
     
         }
     
         /**
    -     * Returns <code>true</code> if the spout has received failures from which it has not yet recovered.
    -     *
    -     * @return {@code true} if there were failures, {@code false} otherwise.
    -     */
    -    public boolean hasFailures() {
    -        return this.hasFailures;
    -    }
    -
    -    /**
    -     * Marks a healthy session state.
    -     */
    -    protected void recovered() {
    -        this.hasFailures = false;
    -    }
    -
    -    /**
    -     * Sets the periodicity of the timer task that checks for failures and recovers the JMS session.
    +     * Sets the periodicity of the timer task that
    +     * checks for failures and recovers the JMS session.
          *
          * @param period desired wait period
          */
    -    public void setRecoveryPeriodMs(long period) {
    -        this.recoveryPeriodMs = period;
    +    public void setRecoveryPeriodMs(final long period) {
    --- End diff --
    
    This method can be removed.


---

[GitHub] storm issue #2639: STORM-3035: fix the issue in JmsSpout.ack when toCommit i...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/2639
  
    @HeartSaVioR , I made some changes so that the order of the methods does not matter and the final validation happens in "open". I also ran the example topology and things look fine.


---

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r183539214
  
    --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -339,26 +339,26 @@ public void nextTuple() {
          */
         @Override
         public void ack(Object msgId) {
    -
             Message msg = this.pendingMessages.remove(msgId);
    -        JmsMessageID oldest = this.toCommit.first();
    -        if (msgId.equals(oldest)) {
    -            if (msg != null) {
    -                try {
    -                    LOG.debug("Committing...");
    -                    msg.acknowledge();
    -                    LOG.debug("JMS Message acked: " + msgId);
    -                    this.toCommit.remove(msgId);
    -                } catch (JMSException e) {
    -                    LOG.warn("Error acknowldging JMS message: " + msgId, e);
    +        if (!toCommit.isEmpty()) {
    +            JmsMessageID oldest = this.toCommit.first();
    +            if (msgId.equals(oldest)) {
    +                if (msg != null) {
    +                    try {
    +                        LOG.debug("Committing...");
    +                        msg.acknowledge();
    --- End diff --
    
    I am not sure acking the oldest message in JMS is correct even for `CLIENT_ACKNOWLEDGE`. This would ack the new messages that have been consumed in the session (and possibly emitted) even before the spout received the ACK for the message. I guess we should keep removing the message from `toCommit` and invoke the JMS ack when its the last message in `toCommit`. (assuming we dont consume any other message in the meanwhile).


---