You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/07/22 21:00:19 UTC

[1/3] storm git commit: STORM-2652: fix error in open method of JmsSpout

Repository: storm
Updated Branches:
  refs/heads/master e86ac7541 -> caf39096b


STORM-2652: fix error in open method of JmsSpout

I changed the casting to `Number` instead of `Integer`
since it is actually a `Long`, and it caused a `ClassCastException`
to be thrown. Now it is referred to as a `Number` so that it won't
happen again.
Also resolved style issues in `JmsSpout`. Added javadoc, renamed
fields etc...


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d6e5e6d4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d6e5e6d4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d6e5e6d4

Branch: refs/heads/master
Commit: d6e5e6d4e0a20c4c9f0ce0e3000e730dcb4700da
Parents: e38f936
Author: Omer Hadari <ha...@gmail.com>
Authored: Fri Jul 21 10:21:23 2017 +0300
Committer: Omer Hadari <ha...@gmail.com>
Committed: Sat Jul 22 13:34:25 2017 +0300

----------------------------------------------------------------------
 external/storm-jms/pom.xml                      |   2 +-
 .../org/apache/storm/jms/spout/JmsSpout.java    | 292 ++++++++++++++-----
 .../apache/storm/jms/spout/JmsSpoutTest.java    |  79 +++--
 3 files changed, 269 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d6e5e6d4/external/storm-jms/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-jms/pom.xml b/external/storm-jms/pom.xml
index 15323cb..89161d6 100644
--- a/external/storm-jms/pom.xml
+++ b/external/storm-jms/pom.xml
@@ -94,7 +94,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>273</maxAllowedViolations>
+                    <maxAllowedViolations>232</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/d6e5e6d4/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
index 042e643..8973dbf 100644
--- a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,12 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.jms.spout;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
 import java.util.concurrent.LinkedBlockingQueue;
-
+import java.util.concurrent.TimeUnit;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -30,65 +35,109 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.storm.Config;
 import org.apache.storm.jms.JmsProvider;
 import org.apache.storm.jms.JmsTupleProducer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
 
 /**
- * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue
- * and outputs tuples based on the messages it receives.
- * <p>
- * <code>JmsSpout</code> instances rely on <code>JmsProducer</code> implementations
- * to obtain the JMS <code>ConnectionFactory</code> and <code>Destination</code> objects
- * necessary to connect to a JMS topic/queue.
- * <p>
- * When a <code>JmsSpout</code> receives a JMS message, it delegates to an
- * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the
- * incoming message.
- * <p>
- * Typically, developers will supply a custom <code>JmsTupleProducer</code> implementation
- * appropriate for the expected message content.
+ * A Storm <code>Spout</code> implementation that listens to a JMS topic or
+ * queue and outputs tuples based on the messages it receives.
+ *
+ * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
+ * implementations to obtain the JMS
+ * <code>ConnectionFactory</code> and <code>Destination</code> objects necessary
+ * to connect to a JMS topic/queue.
+ *
+ * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to an
+ * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from
+ * the incoming message.
+ *
+ * <p>Typically, developers will supply a custom <code>JmsTupleProducer</code>
+ * implementation appropriate for the expected message content.
  */
 @SuppressWarnings("serial")
 public class JmsSpout extends BaseRichSpout implements MessageListener {
+
+    /** The logger object instance for this class. */
     private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
 
-    // JMS options
+    /** The logger of the recovery task. */
+    private static final Logger RECOVERY_TASK_LOG = LoggerFactory.getLogger(RecoveryTask.class);
+
+    /** Time to sleep between queue polling attempts. */
+    private static final int POLL_INTERVAL_MS = 50;
+
+    /**
+     * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
+     */
+    private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
+
+    /** Time to wait before queuing the first recovery task. */
+    private static final int RECOVERY_DELAY_MS = 10;
+
+    /**
+     * The acknowledgment mode used for this instance.
+     *
+     * @see Session
+     */
     private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
 
+    /** Indicates whether or not this spout should run as a singleton. */
     private boolean distributed = true;
 
+    /** Used to generate tuples from incoming messages. */
     private JmsTupleProducer tupleProducer;
 
+    /** Encapsulates jms related classes needed to communicate with the mq. */
     private JmsProvider jmsProvider;
 
+    /** Stores incoming messages for later sending. */
     private LinkedBlockingQueue<Message> queue;
+
+    /** Contains all message ids of messages that were not yet acked. */
     private TreeSet<JmsMessageID> toCommit;
+
+    /** Maps between message ids of not-yet acked messages, and the messages. */
     private HashMap<JmsMessageID, Message> pendingMessages;
+
+    /** Counter of handled messages. */
     private long messageSequence = 0;
 
+    /** The collector used to emit tuples. */
     private SpoutOutputCollector collector;
 
+    /** Connection to the jms queue. */
     private transient Connection connection;
+
+    /** The active jms session. */
     private transient Session session;
 
+    /** Indicates whether or not a message failed to be processed. */
     private boolean hasFailures = false;
-    public final Serializable recoveryMutex = "RECOVERY_MUTEX";
+
+    /** Used to safely recover failed JMS sessions across instances. */
+    private final Serializable recoveryMutex = "RECOVERY_MUTEX";
+
+    /** Schedules recovery tasks periodically. */
     private Timer recoveryTimer = null;
-    private long recoveryPeriod = -1; // default to disabled
+
+    /** Time to wait between recovery attempts. */
+    private long recoveryPeriodMs = -1; // default to disabled
 
     /**
-     * Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
-     * <p>
-     * Possible values:
+     * Sets the JMS Session acknowledgement mode for the JMS session.
+     *
+     * <p>Possible values:
      * <ul>
      * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
      * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
@@ -98,36 +147,46 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
      * @param mode JMS Session Acknowledgement mode
      * @throws IllegalArgumentException if the mode is not recognized.
      */
-    public void setJmsAcknowledgeMode(int mode) {
+    public void setJmsAcknowledgeMode(final int mode) {
         switch (mode) {
             case Session.AUTO_ACKNOWLEDGE:
             case Session.CLIENT_ACKNOWLEDGE:
             case Session.DUPS_OK_ACKNOWLEDGE:
                 break;
             default:
-                throw new IllegalArgumentException("Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
+                throw new IllegalArgumentException(
+                        "Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
 
         }
         this.jmsAcknowledgeMode = mode;
     }
 
     /**
-     * Returns the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
+     * Returns the JMS Session acknowledgement mode for the JMS session
+     * associated with this spout. Can be either of:
+     * <ul>
+     * <li>{@link Session#AUTO_ACKNOWLEDGE}</li>
+     * <li>{@link Session#CLIENT_ACKNOWLEDGE}</li>
+     * <li>{@link Session#DUPS_OK_ACKNOWLEDGE}</li>
+     * <li>{@link Session#SESSION_TRANSACTED}</li>
+     * </ul>
      *
-     * @return
+     * @return the int value of the acknowledgment mode.
      */
     public int getJmsAcknowledgeMode() {
         return this.jmsAcknowledgeMode;
     }
 
     /**
-     * Set the <code>JmsProvider</code>
+     * Set {@link #jmsProvider}.
+     *
+     * <p>Set the <code>JmsProvider</code>
      * implementation that this Spout will use to connect to
      * a JMS <code>javax.jms.Desination</code>
      *
-     * @param provider
+     * @param provider the provider to use
      */
-    public void setJmsProvider(JmsProvider provider) {
+    public void setJmsProvider(final JmsProvider provider) {
         this.jmsProvider = provider;
     }
 
@@ -137,7 +196,7 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
      * object to <code>org.apache.storm.tuple.Values</code> objects
      * to be emitted.
      *
-     * @param producer
+     * @param producer the producer instance to use
      */
     public void setJmsTupleProducer(JmsTupleProducer producer) {
         this.tupleProducer = producer;
@@ -145,41 +204,46 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
 
     /**
      * <code>javax.jms.MessageListener</code> implementation.
-     * <p>
-     * Stored the JMS message in an internal queue for processing
+     *
+     * <p>Stored the JMS message in an internal queue for processing
      * by the <code>nextTuple()</code> method.
+     *
+     * @param msg the message to handle
      */
     public void onMessage(Message msg) {
         try {
             LOG.debug("Queuing msg [" + msg.getJMSMessageID() + "]");
-        } catch (JMSException e) {
+        } catch (JMSException ignored) {
+            // Do nothing
         }
         this.queue.offer(msg);
     }
 
     /**
      * <code>ISpout</code> implementation.
-     * <p>
-     * Connects the JMS spout to the configured JMS destination
+     *
+     * <p>Connects the JMS spout to the configured JMS destination
      * topic/queue.
      */
-    @SuppressWarnings("rawtypes")
-    public void open(Map<String, Object> conf, TopologyContext context,
+    @Override
+    public void open(Map<String, Object> conf,
+                     TopologyContext context,
                      SpoutOutputCollector collector) {
+
         if (this.jmsProvider == null) {
             throw new IllegalStateException("JMS provider has not been set.");
         }
         if (this.tupleProducer == null) {
             throw new IllegalStateException("JMS Tuple Producer has not been set.");
         }
-        Integer topologyTimeout = (Integer) conf.get("topology.message.timeout.secs");
-        // TODO fine a way to get the default timeout from storm, so we're not hard-coding to 30 seconds (it could change)
-        topologyTimeout = topologyTimeout == null ? 30 : topologyTimeout;
-        if ((topologyTimeout.intValue() * 1000) > this.recoveryPeriod) {
-            LOG.warn("*** WARNING *** : " +
-                    "Recovery period (" + this.recoveryPeriod + " ms.) is less then the configured " +
-                    "'topology.message.timeout.secs' of " + topologyTimeout +
-                    " secs. This could lead to a message replay flood!");
+        // TODO get the default value from storm instead of hard coding 30 secs
+        Long topologyTimeout =
+                ((Number) conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
+        if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > this.recoveryPeriodMs) {
+            LOG.warn("*** WARNING *** : "
+                    + "Recovery period (" + this.recoveryPeriodMs + " ms.) is less then the configured "
+                    + "'topology.message.timeout.secs' of " + topologyTimeout
+                    + " secs. This could lead to a message replay flood!");
         }
         this.queue = new LinkedBlockingQueue<Message>();
         this.toCommit = new TreeSet<JmsMessageID>();
@@ -189,14 +253,13 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
             ConnectionFactory cf = this.jmsProvider.connectionFactory();
             Destination dest = this.jmsProvider.destination();
             this.connection = cf.createConnection();
-            this.session = connection.createSession(false,
-                    this.jmsAcknowledgeMode);
+            this.session = connection.createSession(false, this.jmsAcknowledgeMode);
             MessageConsumer consumer = session.createConsumer(dest);
             consumer.setMessageListener(this);
             this.connection.start();
-            if (this.isDurableSubscription() && this.recoveryPeriod > 0) {
+            if (this.isDurableSubscription() && this.recoveryPeriodMs > 0) {
                 this.recoveryTimer = new Timer();
-                this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 10, this.recoveryPeriod);
+                this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), RECOVERY_DELAY_MS, this.recoveryPeriodMs);
             }
 
         } catch (Exception e) {
@@ -205,6 +268,11 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
 
     }
 
+    /**
+     * Close the {@link #session} and {@link #connection}.
+     *
+     * <p>When overridden, should always call {@code super} to finalize the active connections.
+     */
     public void close() {
         try {
             LOG.debug("Closing JMS connection.");
@@ -216,17 +284,27 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
 
     }
 
+    /**
+     * Generate the next tuple from a message.
+     *
+     * <p>This method polls the queue that's being filled asynchronously by the
+     * jms connection, every {@link #POLL_INTERVAL_MS} seconds. When a message
+     * arrives, a {@link Values} (tuple) is generated using
+     * {@link #tupleProducer}. It is emitted, and the message is saved to
+     * {@link #toCommit} and {@link #pendingMessages} for later handling.
+     */
     public void nextTuple() {
         Message msg = this.queue.poll();
         if (msg == null) {
-            Utils.sleep(50);
+            Utils.sleep(POLL_INTERVAL_MS);
         } else {
 
             LOG.debug("sending tuple: " + msg);
             // get the tuple from the handler
             try {
                 Values vals = this.tupleProducer.toTuple(msg);
-                // ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE
+                // ack if we're not in AUTO_ACKNOWLEDGE mode,
+                // or the message requests ACKNOWLEDGE
                 LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode()));
                 LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode));
                 if (this.isDurableSubscription()) {
@@ -250,9 +328,16 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
 
     }
 
-    /*
-     * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
+    /**
+     * Ack a successfully handled message by the matching {@link JmsMessageID}.
+     *
+     * <p>Acking means removing the message from the pending messages
+     * collections, and if it was the oldest pending message -
+     * ack it to the mq as well, so that it's the only one acked.
+     *
+     * <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE.
      */
+    @Override
     public void ack(Object msgId) {
 
         Message msg = this.pendingMessages.remove(msgId);
@@ -276,9 +361,15 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
 
     }
 
-    /*
-     * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
+    /**
+     * Fail an unsuccessfully handled message by its {@link JmsMessageID}.
+     *
+     * <p>Failing means dropping all pending messages and queueing a recovery
+     * attempt.
+     *
+     * <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
      */
+    @Override
     public void fail(Object msgId) {
         LOG.warn("Message failed: " + msgId);
         this.pendingMessages.clear();
@@ -288,6 +379,13 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
         }
     }
 
+    /**
+     * Use the {@link #tupleProducer} to determine which fields are about to be emitted.
+     *
+     * <p>Note that {@link #nextTuple()} always emits to the default stream, and thus only fields declared
+     * for this stream are used.
+     */
+    @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         this.tupleProducer.declareOutputFields(declarer);
 
@@ -296,11 +394,16 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
     /**
      * Returns <code>true</code> if the spout has received failures
      * from which it has not yet recovered.
+     *
+     * @return {@code true} if there were failures, {@code false} otherwise.
      */
     public boolean hasFailures() {
         return this.hasFailures;
     }
 
+    /**
+     * Marks a healthy session state.
+     */
     protected void recovered() {
         this.hasFailures = false;
     }
@@ -309,36 +412,48 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
      * Sets the periodicity of the timer task that
      * checks for failures and recovers the JMS session.
      *
-     * @param period
+     * @param period desired wait period
      */
-    public void setRecoveryPeriod(long period) {
-        this.recoveryPeriod = period;
+    public void setRecoveryPeriodMs(long period) {
+        this.recoveryPeriodMs = period;
     }
 
+    /**
+     * @return {@link #distributed}.
+     */
     public boolean isDistributed() {
         return this.distributed;
     }
 
     /**
      * Sets the "distributed" mode of this spout.
-     * <p>
-     * If <code>true</code> multiple instances of this spout <i>may</i> be
-     * created across the cluster (depending on the "parallelism_hint" in the topology configuration).
-     * <p>
-     * Setting this value to <code>false</code> essentially means this spout will run as a singleton
-     * within the cluster ("parallelism_hint" will be ignored).
-     * <p>
-     * In general, this should be set to <code>false</code> if the underlying JMS destination is a
-     * topic, and <code>true</code> if it is a JMS queue.
      *
-     * @param distributed
+     * <p>If <code>true</code> multiple instances of this spout <i>may</i> be
+     * created across the cluster
+     * (depending on the "parallelism_hint" in the topology configuration).
+     *
+     * <p>Setting this value to <code>false</code> essentially means this spout
+     * will run as a singleton within the cluster
+     * ("parallelism_hint" will be ignored).
+     *
+     * <p>In general, this should be set to <code>false</code> if the underlying
+     * JMS destination is a topic, and <code>true</code> if it is a JMS queue.
+     *
+     * @param isDistributed {@code true} if should be distributed, {@code false}
+     *                      otherwise.
      */
-    public void setDistributed(boolean distributed) {
-        this.distributed = distributed;
+    public void setDistributed(boolean isDistributed) {
+        this.distributed = isDistributed;
     }
 
-
-    private static final String toDeliveryModeString(int deliveryMode) {
+    /**
+     * Translate the {@code int} value of an acknowledgment to a {@code String}.
+     *
+     * @param deliveryMode the mode to translate.
+     * @return its {@code String} explanation (name).
+     * @see Session
+     */
+    private static String toDeliveryModeString(int deliveryMode) {
         switch (deliveryMode) {
             case Session.AUTO_ACKNOWLEDGE:
                 return "AUTO_ACKNOWLEDGE";
@@ -352,27 +467,44 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
         }
     }
 
+    /**
+     * @return The currently active session.
+     */
     protected Session getSession() {
         return this.session;
     }
 
+    /**
+     * Check if the subscription requires messages to be acked.
+     *
+     * @return {@code true} if there is a pending messages state, {@code false}
+     *         otherwise.
+     */
     private boolean isDurableSubscription() {
         return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE);
     }
 
 
+    /**
+     * The periodic task used to try and recover failed sessions.
+     */
     private class RecoveryTask extends TimerTask {
-        private final Logger LOG = LoggerFactory.getLogger(RecoveryTask.class);
 
+        /**
+         * Try to recover a failed active session.
+         *
+         * <p>If there is no active recovery task, and the session is failed,
+         * try to recover the session.
+         */
         public void run() {
             synchronized (JmsSpout.this.recoveryMutex) {
                 if (JmsSpout.this.hasFailures()) {
                     try {
-                        LOG.info("Recovering from a message failure.");
+                        RECOVERY_TASK_LOG.info("Recovering from a message failure.");
                         JmsSpout.this.getSession().recover();
                         JmsSpout.this.recovered();
                     } catch (JMSException e) {
-                        LOG.warn("Could not recover jms session.", e);
+                        RECOVERY_TASK_LOG.warn("Could not recover jms session.", e);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/d6e5e6d4/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
index 9f89384..da312da 100644
--- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
+++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
@@ -17,10 +17,13 @@
  */
 package org.apache.storm.jms.spout;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
+import org.apache.storm.Config;
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -29,35 +32,36 @@ import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
 
 public class JmsSpoutTest {
-    private static final Logger LOG = LoggerFactory.getLogger(JmsSpoutTest.class);
+    private static final Logger LOG =
+            LoggerFactory.getLogger(JmsSpoutTest.class);
 
     @Test
-    public void testFailure() throws JMSException, Exception{
+    public void testFailure() throws JMSException, Exception {
         JmsSpout spout = new JmsSpout();
         JmsProvider mockProvider = new MockJmsProvider();
         MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector();
-        SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
+        SpoutOutputCollector collector =
+                new SpoutOutputCollector(mockCollector);
         spout.setJmsProvider(new MockJmsProvider());
         spout.setJmsTupleProducer(new MockTupleProducer());
         spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
-        spout.setRecoveryPeriod(10); // Rapid recovery for testing.
+        spout.setRecoveryPeriodMs(10); // Rapid recovery for testing.
         spout.open(new HashMap<>(), null, collector);
-        Message msg = this.sendMessage(mockProvider.connectionFactory(), mockProvider.destination());
+        ConnectionFactory connectionFactory = mockProvider.connectionFactory();
+        Destination destination = mockProvider.destination();
+        Message msg = this.sendMessage(connectionFactory, destination);
         Thread.sleep(100);
         spout.nextTuple(); // Pretend to be storm.
         Assert.assertTrue(mockCollector.emitted);
-        
-        mockCollector.reset();        
+
+        mockCollector.reset();
         spout.fail(msg.getJMSMessageID()); // Mock failure
         Thread.sleep(5000);
         spout.nextTuple(); // Pretend to be storm.
@@ -66,7 +70,7 @@ public class JmsSpoutTest {
     }
 
     @Test
-    public void testSerializability() throws IOException{
+    public void testSerializability() throws IOException {
         JmsSpout spout = new JmsSpout();
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(out);
@@ -74,9 +78,38 @@ public class JmsSpoutTest {
         oos.close();
         Assert.assertTrue(out.toByteArray().length > 0);
     }
-    
-    public Message sendMessage(ConnectionFactory connectionFactory, Destination destination) throws JMSException {        
-        Session mySess = connectionFactory.createConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+    /**
+     * Make sure that {@link JmsSpout#open} returns correctly regardless of
+     * the type of {@link Number} that is the value of
+     * {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
+     */
+    @Test
+    public void testOpenWorksMultipleTypesOfNumberObjects() throws Exception {
+        JmsSpout spout = new JmsSpout();
+        spout.setJmsProvider(new MockJmsProvider());
+        spout.setJmsTupleProducer(new MockTupleProducer());
+        Map<String, Object> configuration = new HashMap<String, Object>();
+        MockSpoutOutputCollector delegateCollector =
+                new MockSpoutOutputCollector();
+        SpoutOutputCollector collector =
+                new SpoutOutputCollector(delegateCollector);
+
+        // Test with long value
+        configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000L);
+        spout.open(configuration, null, collector);
+
+        // Test with integer value
+        configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000);
+        spout.open(configuration, null, collector);
+    }
+
+    public Message sendMessage(ConnectionFactory connectionFactory,
+                               Destination destination) throws JMSException {
+
+        Session mySess = connectionFactory.createConnection().createSession(
+                false,
+                Session.CLIENT_ACKNOWLEDGE);
         MessageProducer producer = mySess.createProducer(destination);
         TextMessage msg = mySess.createTextMessage();
         msg.setText("Hello World");


[3/3] storm git commit: Changelog: STORM-2652

Posted by sr...@apache.org.
Changelog: STORM-2652


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/caf39096
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/caf39096
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/caf39096

Branch: refs/heads/master
Commit: caf39096be1c32fd1643fbb881eeed01a0208f10
Parents: 7cf48f6
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Jul 22 22:56:38 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sat Jul 22 22:56:38 2017 +0200

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/caf39096/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f3c7ecd..60b7cc3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2652: fix error in open method of JmsSpout
  * STORM-2644: Show message when result not found on deep search page
  * STORM-2643: Add a confirmation dialog for profiling and debugging actions on Storm UI
  * STORM-2641: Make storm.py print output from subprocess on error on Windows, make storm.ps1 return the right exit code


[2/3] storm git commit: Merge branch 'bugfix/fix-configuration-cast-exception' of https://github.com/omerhadari/storm into STORM-2652-merge

Posted by sr...@apache.org.
Merge branch 'bugfix/fix-configuration-cast-exception' of https://github.com/omerhadari/storm into STORM-2652-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7cf48f65
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7cf48f65
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7cf48f65

Branch: refs/heads/master
Commit: 7cf48f6535fb5b473865c208255b7d9004c09ff4
Parents: e86ac75 d6e5e6d
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Jul 22 22:55:37 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sat Jul 22 22:55:37 2017 +0200

----------------------------------------------------------------------
 external/storm-jms/pom.xml                      |   2 +-
 .../org/apache/storm/jms/spout/JmsSpout.java    | 292 ++++++++++++++-----
 .../apache/storm/jms/spout/JmsSpoutTest.java    |  79 +++--
 3 files changed, 269 insertions(+), 104 deletions(-)
----------------------------------------------------------------------