You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/07/22 21:00:19 UTC
[1/3] storm git commit: STORM-2652: fix error in open method of
JmsSpout
Repository: storm
Updated Branches:
refs/heads/master e86ac7541 -> caf39096b
STORM-2652: fix error in open method of JmsSpout
I changed the casting to `Number` instead of `Integer`
since it is actually a `Long`, and it caused a `ClassCastException`
to be thrown. Now it is referred to as a `Number` so that it won't
happen again.
Also resolved style issues in `JmsSpout`. Added javadoc, renamed
fields etc...
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d6e5e6d4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d6e5e6d4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d6e5e6d4
Branch: refs/heads/master
Commit: d6e5e6d4e0a20c4c9f0ce0e3000e730dcb4700da
Parents: e38f936
Author: Omer Hadari <ha...@gmail.com>
Authored: Fri Jul 21 10:21:23 2017 +0300
Committer: Omer Hadari <ha...@gmail.com>
Committed: Sat Jul 22 13:34:25 2017 +0300
----------------------------------------------------------------------
external/storm-jms/pom.xml | 2 +-
.../org/apache/storm/jms/spout/JmsSpout.java | 292 ++++++++++++++-----
.../apache/storm/jms/spout/JmsSpoutTest.java | 79 +++--
3 files changed, 269 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d6e5e6d4/external/storm-jms/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-jms/pom.xml b/external/storm-jms/pom.xml
index 15323cb..89161d6 100644
--- a/external/storm-jms/pom.xml
+++ b/external/storm-jms/pom.xml
@@ -94,7 +94,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>273</maxAllowedViolations>
+ <maxAllowedViolations>232</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/storm/blob/d6e5e6d4/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
index 042e643..8973dbf 100644
--- a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,12 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.jms.spout;
import java.io.Serializable;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
-
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -30,65 +35,109 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.storm.Config;
import org.apache.storm.jms.JmsProvider;
import org.apache.storm.jms.JmsTupleProducer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
/**
- * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue
- * and outputs tuples based on the messages it receives.
- * <p>
- * <code>JmsSpout</code> instances rely on <code>JmsProducer</code> implementations
- * to obtain the JMS <code>ConnectionFactory</code> and <code>Destination</code> objects
- * necessary to connect to a JMS topic/queue.
- * <p>
- * When a <code>JmsSpout</code> receives a JMS message, it delegates to an
- * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the
- * incoming message.
- * <p>
- * Typically, developers will supply a custom <code>JmsTupleProducer</code> implementation
- * appropriate for the expected message content.
+ * A Storm <code>Spout</code> implementation that listens to a JMS topic or
+ * queue and outputs tuples based on the messages it receives.
+ *
+ * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
+ * implementations to obtain the JMS
+ * <code>ConnectionFactory</code> and <code>Destination</code> objects necessary
+ * to connect to a JMS topic/queue.
+ *
+ * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to an
+ * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from
+ * the incoming message.
+ *
+ * <p>Typically, developers will supply a custom <code>JmsTupleProducer</code>
+ * implementation appropriate for the expected message content.
*/
@SuppressWarnings("serial")
public class JmsSpout extends BaseRichSpout implements MessageListener {
+
+ /** The logger object instance for this class. */
private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
- // JMS options
+ /** The logger of the recovery task. */
+ private static final Logger RECOVERY_TASK_LOG = LoggerFactory.getLogger(RecoveryTask.class);
+
+ /** Time to sleep between queue polling attempts. */
+ private static final int POLL_INTERVAL_MS = 50;
+
+ /**
+ * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
+ */
+ private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
+
+ /** Time to wait before queuing the first recovery task. */
+ private static final int RECOVERY_DELAY_MS = 10;
+
+ /**
+ * The acknowledgment mode used for this instance.
+ *
+ * @see Session
+ */
private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+ /** Indicates whether or not this spout should run as a singleton. */
private boolean distributed = true;
+ /** Used to generate tuples from incoming messages. */
private JmsTupleProducer tupleProducer;
+ /** Encapsulates jms related classes needed to communicate with the mq. */
private JmsProvider jmsProvider;
+ /** Stores incoming messages for later sending. */
private LinkedBlockingQueue<Message> queue;
+
+ /** Contains all message ids of messages that were not yet acked. */
private TreeSet<JmsMessageID> toCommit;
+
+ /** Maps between message ids of not-yet acked messages, and the messages. */
private HashMap<JmsMessageID, Message> pendingMessages;
+
+ /** Counter of handled messages. */
private long messageSequence = 0;
+ /** The collector used to emit tuples. */
private SpoutOutputCollector collector;
+ /** Connection to the jms queue. */
private transient Connection connection;
+
+ /** The active jms session. */
private transient Session session;
+ /** Indicates whether or not a message failed to be processed. */
private boolean hasFailures = false;
- public final Serializable recoveryMutex = "RECOVERY_MUTEX";
+
+ /** Used to safely recover failed JMS sessions across instances. */
+ private final Serializable recoveryMutex = "RECOVERY_MUTEX";
+
+ /** Schedules recovery tasks periodically. */
private Timer recoveryTimer = null;
- private long recoveryPeriod = -1; // default to disabled
+
+ /** Time to wait between recovery attempts. */
+ private long recoveryPeriodMs = -1; // default to disabled
/**
- * Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
- * <p>
- * Possible values:
+ * Sets the JMS Session acknowledgement mode for the JMS session.
+ *
+ * <p>Possible values:
* <ul>
* <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
* <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
@@ -98,36 +147,46 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
* @param mode JMS Session Acknowledgement mode
* @throws IllegalArgumentException if the mode is not recognized.
*/
- public void setJmsAcknowledgeMode(int mode) {
+ public void setJmsAcknowledgeMode(final int mode) {
switch (mode) {
case Session.AUTO_ACKNOWLEDGE:
case Session.CLIENT_ACKNOWLEDGE:
case Session.DUPS_OK_ACKNOWLEDGE:
break;
default:
- throw new IllegalArgumentException("Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
+ throw new IllegalArgumentException(
+ "Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
}
this.jmsAcknowledgeMode = mode;
}
/**
- * Returns the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
+ * Returns the JMS Session acknowledgement mode for the JMS session
+ * associated with this spout. Can be either of:
+ * <ul>
+ * <li>{@link Session#AUTO_ACKNOWLEDGE}</li>
+ * <li>{@link Session#CLIENT_ACKNOWLEDGE}</li>
+ * <li>{@link Session#DUPS_OK_ACKNOWLEDGE}</li>
+ * <li>{@link Session#SESSION_TRANSACTED}</li>
+ * </ul>
*
- * @return
+ * @return the int value of the acknowledgment mode.
*/
public int getJmsAcknowledgeMode() {
return this.jmsAcknowledgeMode;
}
/**
- * Set the <code>JmsProvider</code>
+ * Set {@link #jmsProvider}.
+ *
+ * <p>Set the <code>JmsProvider</code>
* implementation that this Spout will use to connect to
* a JMS <code>javax.jms.Desination</code>
*
- * @param provider
+ * @param provider the provider to use
*/
- public void setJmsProvider(JmsProvider provider) {
+ public void setJmsProvider(final JmsProvider provider) {
this.jmsProvider = provider;
}
@@ -137,7 +196,7 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
* object to <code>org.apache.storm.tuple.Values</code> objects
* to be emitted.
*
- * @param producer
+ * @param producer the producer instance to use
*/
public void setJmsTupleProducer(JmsTupleProducer producer) {
this.tupleProducer = producer;
@@ -145,41 +204,46 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
/**
* <code>javax.jms.MessageListener</code> implementation.
- * <p>
- * Stored the JMS message in an internal queue for processing
+ *
+ * <p>Stored the JMS message in an internal queue for processing
* by the <code>nextTuple()</code> method.
+ *
+ * @param msg the message to handle
*/
public void onMessage(Message msg) {
try {
LOG.debug("Queuing msg [" + msg.getJMSMessageID() + "]");
- } catch (JMSException e) {
+ } catch (JMSException ignored) {
+ // Do nothing
}
this.queue.offer(msg);
}
/**
* <code>ISpout</code> implementation.
- * <p>
- * Connects the JMS spout to the configured JMS destination
+ *
+ * <p>Connects the JMS spout to the configured JMS destination
* topic/queue.
*/
- @SuppressWarnings("rawtypes")
- public void open(Map<String, Object> conf, TopologyContext context,
+ @Override
+ public void open(Map<String, Object> conf,
+ TopologyContext context,
SpoutOutputCollector collector) {
+
if (this.jmsProvider == null) {
throw new IllegalStateException("JMS provider has not been set.");
}
if (this.tupleProducer == null) {
throw new IllegalStateException("JMS Tuple Producer has not been set.");
}
- Integer topologyTimeout = (Integer) conf.get("topology.message.timeout.secs");
- // TODO fine a way to get the default timeout from storm, so we're not hard-coding to 30 seconds (it could change)
- topologyTimeout = topologyTimeout == null ? 30 : topologyTimeout;
- if ((topologyTimeout.intValue() * 1000) > this.recoveryPeriod) {
- LOG.warn("*** WARNING *** : " +
- "Recovery period (" + this.recoveryPeriod + " ms.) is less then the configured " +
- "'topology.message.timeout.secs' of " + topologyTimeout +
- " secs. This could lead to a message replay flood!");
+ // TODO get the default value from storm instead of hard coding 30 secs
+ Long topologyTimeout =
+ ((Number) conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
+ if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > this.recoveryPeriodMs) {
+ LOG.warn("*** WARNING *** : "
+ + "Recovery period (" + this.recoveryPeriodMs + " ms.) is less then the configured "
+ + "'topology.message.timeout.secs' of " + topologyTimeout
+ + " secs. This could lead to a message replay flood!");
}
this.queue = new LinkedBlockingQueue<Message>();
this.toCommit = new TreeSet<JmsMessageID>();
@@ -189,14 +253,13 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
ConnectionFactory cf = this.jmsProvider.connectionFactory();
Destination dest = this.jmsProvider.destination();
this.connection = cf.createConnection();
- this.session = connection.createSession(false,
- this.jmsAcknowledgeMode);
+ this.session = connection.createSession(false, this.jmsAcknowledgeMode);
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(this);
this.connection.start();
- if (this.isDurableSubscription() && this.recoveryPeriod > 0) {
+ if (this.isDurableSubscription() && this.recoveryPeriodMs > 0) {
this.recoveryTimer = new Timer();
- this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 10, this.recoveryPeriod);
+ this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), RECOVERY_DELAY_MS, this.recoveryPeriodMs);
}
} catch (Exception e) {
@@ -205,6 +268,11 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
}
+ /**
+ * Close the {@link #session} and {@link #connection}.
+ *
+ * <p>When overridden, should always call {@code super} to finalize the active connections.
+ */
public void close() {
try {
LOG.debug("Closing JMS connection.");
@@ -216,17 +284,27 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
}
+ /**
+ * Generate the next tuple from a message.
+ *
+ * <p>This method polls the queue that's being filled asynchronously by the
+ * jms connection, every {@link #POLL_INTERVAL_MS} seconds. When a message
+ * arrives, a {@link Values} (tuple) is generated using
+ * {@link #tupleProducer}. It is emitted, and the message is saved to
+ * {@link #toCommit} and {@link #pendingMessages} for later handling.
+ */
public void nextTuple() {
Message msg = this.queue.poll();
if (msg == null) {
- Utils.sleep(50);
+ Utils.sleep(POLL_INTERVAL_MS);
} else {
LOG.debug("sending tuple: " + msg);
// get the tuple from the handler
try {
Values vals = this.tupleProducer.toTuple(msg);
- // ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE
+ // ack if we're not in AUTO_ACKNOWLEDGE mode,
+ // or the message requests ACKNOWLEDGE
LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode()));
LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode));
if (this.isDurableSubscription()) {
@@ -250,9 +328,16 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
}
- /*
- * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
+ /**
+ * Ack a successfully handled message by the matching {@link JmsMessageID}.
+ *
+ * <p>Acking means removing the message from the pending messages
+ * collections, and if it was the oldest pending message -
+ * ack it to the mq as well, so that it's the only one acked.
+ *
+ * <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE.
*/
+ @Override
public void ack(Object msgId) {
Message msg = this.pendingMessages.remove(msgId);
@@ -276,9 +361,15 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
}
- /*
- * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
+ /**
+ * Fail an unsuccessfully handled message by its {@link JmsMessageID}.
+ *
+ * <p>Failing means dropping all pending messages and queueing a recovery
+ * attempt.
+ *
+ * <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
*/
+ @Override
public void fail(Object msgId) {
LOG.warn("Message failed: " + msgId);
this.pendingMessages.clear();
@@ -288,6 +379,13 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
}
}
+ /**
+ * Use the {@link #tupleProducer} to determine which fields are about to be emitted.
+ *
+ * <p>Note that {@link #nextTuple()} always emits to the default stream, and thus only fields declared
+ * for this stream are used.
+ */
+ @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
this.tupleProducer.declareOutputFields(declarer);
@@ -296,11 +394,16 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
/**
* Returns <code>true</code> if the spout has received failures
* from which it has not yet recovered.
+ *
+ * @return {@code true} if there were failures, {@code false} otherwise.
*/
public boolean hasFailures() {
return this.hasFailures;
}
+ /**
+ * Marks a healthy session state.
+ */
protected void recovered() {
this.hasFailures = false;
}
@@ -309,36 +412,48 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
* Sets the periodicity of the timer task that
* checks for failures and recovers the JMS session.
*
- * @param period
+ * @param period desired wait period
*/
- public void setRecoveryPeriod(long period) {
- this.recoveryPeriod = period;
+ public void setRecoveryPeriodMs(long period) {
+ this.recoveryPeriodMs = period;
}
+ /**
+ * @return {@link #distributed}.
+ */
public boolean isDistributed() {
return this.distributed;
}
/**
* Sets the "distributed" mode of this spout.
- * <p>
- * If <code>true</code> multiple instances of this spout <i>may</i> be
- * created across the cluster (depending on the "parallelism_hint" in the topology configuration).
- * <p>
- * Setting this value to <code>false</code> essentially means this spout will run as a singleton
- * within the cluster ("parallelism_hint" will be ignored).
- * <p>
- * In general, this should be set to <code>false</code> if the underlying JMS destination is a
- * topic, and <code>true</code> if it is a JMS queue.
*
- * @param distributed
+ * <p>If <code>true</code> multiple instances of this spout <i>may</i> be
+ * created across the cluster
+ * (depending on the "parallelism_hint" in the topology configuration).
+ *
+ * <p>Setting this value to <code>false</code> essentially means this spout
+ * will run as a singleton within the cluster
+ * ("parallelism_hint" will be ignored).
+ *
+ * <p>In general, this should be set to <code>false</code> if the underlying
+ * JMS destination is a topic, and <code>true</code> if it is a JMS queue.
+ *
+ * @param isDistributed {@code true} if should be distributed, {@code false}
+ * otherwise.
*/
- public void setDistributed(boolean distributed) {
- this.distributed = distributed;
+ public void setDistributed(boolean isDistributed) {
+ this.distributed = isDistributed;
}
-
- private static final String toDeliveryModeString(int deliveryMode) {
+ /**
+ * Translate the {@code int} value of an acknowledgment to a {@code String}.
+ *
+ * @param deliveryMode the mode to translate.
+ * @return its {@code String} explanation (name).
+ * @see Session
+ */
+ private static String toDeliveryModeString(int deliveryMode) {
switch (deliveryMode) {
case Session.AUTO_ACKNOWLEDGE:
return "AUTO_ACKNOWLEDGE";
@@ -352,27 +467,44 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
}
}
+ /**
+ * @return The currently active session.
+ */
protected Session getSession() {
return this.session;
}
+ /**
+ * Check if the subscription requires messages to be acked.
+ *
+ * @return {@code true} if there is a pending messages state, {@code false}
+ * otherwise.
+ */
private boolean isDurableSubscription() {
return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE);
}
+ /**
+ * The periodic task used to try and recover failed sessions.
+ */
private class RecoveryTask extends TimerTask {
- private final Logger LOG = LoggerFactory.getLogger(RecoveryTask.class);
+ /**
+ * Try to recover a failed active session.
+ *
+ * <p>If there is no active recovery task, and the session is failed,
+ * try to recover the session.
+ */
public void run() {
synchronized (JmsSpout.this.recoveryMutex) {
if (JmsSpout.this.hasFailures()) {
try {
- LOG.info("Recovering from a message failure.");
+ RECOVERY_TASK_LOG.info("Recovering from a message failure.");
JmsSpout.this.getSession().recover();
JmsSpout.this.recovered();
} catch (JMSException e) {
- LOG.warn("Could not recover jms session.", e);
+ RECOVERY_TASK_LOG.warn("Could not recover jms session.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d6e5e6d4/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
index 9f89384..da312da 100644
--- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
+++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
@@ -17,10 +17,13 @@
*/
package org.apache.storm.jms.spout;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
+import org.apache.storm.Config;
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -29,35 +32,36 @@ import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
public class JmsSpoutTest {
- private static final Logger LOG = LoggerFactory.getLogger(JmsSpoutTest.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(JmsSpoutTest.class);
@Test
- public void testFailure() throws JMSException, Exception{
+ public void testFailure() throws JMSException, Exception {
JmsSpout spout = new JmsSpout();
JmsProvider mockProvider = new MockJmsProvider();
MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector();
- SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
+ SpoutOutputCollector collector =
+ new SpoutOutputCollector(mockCollector);
spout.setJmsProvider(new MockJmsProvider());
spout.setJmsTupleProducer(new MockTupleProducer());
spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
- spout.setRecoveryPeriod(10); // Rapid recovery for testing.
+ spout.setRecoveryPeriodMs(10); // Rapid recovery for testing.
spout.open(new HashMap<>(), null, collector);
- Message msg = this.sendMessage(mockProvider.connectionFactory(), mockProvider.destination());
+ ConnectionFactory connectionFactory = mockProvider.connectionFactory();
+ Destination destination = mockProvider.destination();
+ Message msg = this.sendMessage(connectionFactory, destination);
Thread.sleep(100);
spout.nextTuple(); // Pretend to be storm.
Assert.assertTrue(mockCollector.emitted);
-
- mockCollector.reset();
+
+ mockCollector.reset();
spout.fail(msg.getJMSMessageID()); // Mock failure
Thread.sleep(5000);
spout.nextTuple(); // Pretend to be storm.
@@ -66,7 +70,7 @@ public class JmsSpoutTest {
}
@Test
- public void testSerializability() throws IOException{
+ public void testSerializability() throws IOException {
JmsSpout spout = new JmsSpout();
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(out);
@@ -74,9 +78,38 @@ public class JmsSpoutTest {
oos.close();
Assert.assertTrue(out.toByteArray().length > 0);
}
-
- public Message sendMessage(ConnectionFactory connectionFactory, Destination destination) throws JMSException {
- Session mySess = connectionFactory.createConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ /**
+ * Make sure that {@link JmsSpout#open} returns correctly regardless of
+ * the type of {@link Number} that is the value of
+ * {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
+ */
+ @Test
+ public void testOpenWorksMultipleTypesOfNumberObjects() throws Exception {
+ JmsSpout spout = new JmsSpout();
+ spout.setJmsProvider(new MockJmsProvider());
+ spout.setJmsTupleProducer(new MockTupleProducer());
+ Map<String, Object> configuration = new HashMap<String, Object>();
+ MockSpoutOutputCollector delegateCollector =
+ new MockSpoutOutputCollector();
+ SpoutOutputCollector collector =
+ new SpoutOutputCollector(delegateCollector);
+
+ // Test with long value
+ configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000L);
+ spout.open(configuration, null, collector);
+
+ // Test with integer value
+ configuration.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 1000);
+ spout.open(configuration, null, collector);
+ }
+
+ public Message sendMessage(ConnectionFactory connectionFactory,
+ Destination destination) throws JMSException {
+
+ Session mySess = connectionFactory.createConnection().createSession(
+ false,
+ Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = mySess.createProducer(destination);
TextMessage msg = mySess.createTextMessage();
msg.setText("Hello World");
[3/3] storm git commit: Changelog: STORM-2652
Posted by sr...@apache.org.
Changelog: STORM-2652
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/caf39096
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/caf39096
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/caf39096
Branch: refs/heads/master
Commit: caf39096be1c32fd1643fbb881eeed01a0208f10
Parents: 7cf48f6
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Jul 22 22:56:38 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sat Jul 22 22:56:38 2017 +0200
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/caf39096/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f3c7ecd..60b7cc3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-2652: fix error in open method of JmsSpout
* STORM-2644: Show message when result not found on deep search page
* STORM-2643: Add a confirmation dialog for profiling and debugging actions on Storm UI
* STORM-2641: Make storm.py print output from subprocess on error on Windows, make storm.ps1 return the right exit code
[2/3] storm git commit: Merge branch
'bugfix/fix-configuration-cast-exception' of
https://github.com/omerhadari/storm into STORM-2652-merge
Posted by sr...@apache.org.
Merge branch 'bugfix/fix-configuration-cast-exception' of https://github.com/omerhadari/storm into STORM-2652-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7cf48f65
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7cf48f65
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7cf48f65
Branch: refs/heads/master
Commit: 7cf48f6535fb5b473865c208255b7d9004c09ff4
Parents: e86ac75 d6e5e6d
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Jul 22 22:55:37 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sat Jul 22 22:55:37 2017 +0200
----------------------------------------------------------------------
external/storm-jms/pom.xml | 2 +-
.../org/apache/storm/jms/spout/JmsSpout.java | 292 ++++++++++++++-----
.../apache/storm/jms/spout/JmsSpoutTest.java | 79 +++--
3 files changed, 269 insertions(+), 104 deletions(-)
----------------------------------------------------------------------