You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/04/22 23:38:04 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5735 -
fix up semantics around lastDeliveredSequenceId
Repository: activemq
Updated Branches:
refs/heads/master 3a5f127d5 -> eb6c08263
https://issues.apache.org/jira/browse/AMQ-5735 - fix up semantics around lastDeliveredSequenceId
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eb6c0826
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eb6c0826
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eb6c0826
Branch: refs/heads/master
Commit: eb6c0826311a876769e56029dc1e261ee7519db5
Parents: 3a5f127
Author: gtully <ga...@gmail.com>
Authored: Wed Apr 22 16:32:17 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Apr 22 16:32:17 2015 +0100
----------------------------------------------------------------------
.../activemq/broker/TransportConnection.java | 3 +-
.../apache/activemq/broker/region/Queue.java | 17 +++---
.../org/apache/activemq/ActiveMQConnection.java | 4 +-
.../org/apache/activemq/ActiveMQSession.java | 5 +-
.../apache/activemq/command/ConsumerInfo.java | 2 +-
.../org/apache/activemq/command/RemoveInfo.java | 5 +-
.../java/org/apache/activemq/ra/MDBTest.java | 5 +-
.../activemq/ra/ServerSessionImplTest.java | 20 +++++--
.../org/apache/activemq/JmsRedeliveredTest.java | 53 ++++++++++++++++-
.../apache/activemq/RedeliveryPolicyTest.java | 7 ++-
.../org/apache/activemq/broker/BrokerTest.java | 60 ++------------------
11 files changed, 101 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 5c6307a..b5e1c55 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -67,6 +67,7 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
@@ -1187,7 +1188,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
cs.getContext().getStopping().set(true);
try {
LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
- processRemoveConnection(cs.getInfo().getConnectionId(), -1);
+ processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
} catch (Throwable ignore) {
ignore.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 3438dcc..8d12a8b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -66,6 +66,7 @@ import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
@@ -482,9 +483,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
@Override
- public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
+ public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
throws Exception {
- super.removeSubscription(context, sub, lastDeiveredSequenceId);
+ super.removeSubscription(context, sub, lastDeliveredSequenceId);
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
pagedInPendingDispatchLock.writeLock().lock();
@@ -492,7 +493,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{
getActiveMQDestination().getQualifiedName(),
sub,
- lastDeiveredSequenceId,
+ lastDeliveredSequenceId,
getDestinationStatistics().getDequeues().getCount(),
getDestinationStatistics().getDispatched().getCount(),
getDestinationStatistics().getInflight().getCount(),
@@ -536,12 +537,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
List<MessageReference> unAckedMessages = sub.remove(context, this);
// locate last redelivered in unconsumed list (list in delivery rather than seq order)
- if (lastDeiveredSequenceId > 0) {
+ if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
for (MessageReference ref : unAckedMessages) {
- if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
+ if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
lastDeliveredRef = ref;
markAsRedelivered = true;
- LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeiveredSequenceId, ref.getMessageId());
+ LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId());
break;
}
}
@@ -557,7 +558,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
qmr.unlock();
// have no delivery information
- if (lastDeiveredSequenceId == 0) {
+ if (lastDeliveredSequenceId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
qmr.incrementRedeliveryCounter();
} else {
if (markAsRedelivered) {
@@ -821,9 +822,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
checkUsage(context, producerExchange, message);
sendLock.lockInterruptibly();
try {
+ message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (store != null && message.isPersistent()) {
try {
- message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (messages.isCacheEnabled()) {
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
result.addListener(new PendingMarshalUsageTracker(message));
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 87e4c91..4d425a2 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -664,7 +664,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
- long lastDeliveredSequenceId = 0;
+ long lastDeliveredSequenceId = -1;
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.dispose();
@@ -683,7 +683,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
RemoveInfo removeCommand = info.createRemoveCommand();
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
try {
- doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
+ doSyncSendPacket(removeCommand, closeTimeout);
} catch (JMSException e) {
if (e.getCause() instanceof RequestTimedOutIOException) {
// expected
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 14c2869..1d2ae83 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -228,7 +228,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
private DeliveryListener deliveryListener;
private MessageTransformer transformer;
private BlobTransferPolicy blobTransferPolicy;
- private long lastDeliveredSequenceId;
+ private long lastDeliveredSequenceId = -2;
/**
* Construct the Session
@@ -878,7 +878,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
MessageDispatch messageDispatch;
while ((messageDispatch = executor.dequeueNoWait()) != null) {
final MessageDispatch md = messageDispatch;
- ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
+ final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
MessageAck earlyAck = null;
if (message.isExpired()) {
@@ -913,6 +913,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
md.setDeliverySequenceId(getNextDeliveryId());
+ lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
try {
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
index 09b6be5..0c1e691 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
@@ -62,7 +62,7 @@ public class ConsumerInfo extends BaseCommand {
// not marshalled, populated from RemoveInfo, the last message delivered, used
// to suppress redelivery on prefetched messages after close
- private transient long lastDeliveredSequenceId;
+ private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET;
private transient long assignedGroupCount;
// originated from a
// network connection
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java
index 3452104..118940f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java
@@ -29,9 +29,10 @@ import org.apache.activemq.state.CommandVisitor;
public class RemoveInfo extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO;
-
+ public static final int LAST_DELIVERED_UNSET = -1;
+ public static final int LAST_DELIVERED_UNKNOWN = -2;
protected DataStructure objectId;
- protected long lastDeliveredSequenceId;
+ protected long lastDeliveredSequenceId = LAST_DELIVERED_UNKNOWN;
public RemoveInfo() {
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
index 904dd18..af36389 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
@@ -310,6 +310,7 @@ public class MDBTest extends TestCase {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel().isGreaterOrEqual(Level.ERROR)) {
+ System.err.println("Event :" + event.getRenderedMessage());
errorMessage.set(event.getRenderedMessage());
}
}
@@ -389,7 +390,7 @@ public class MDBTest extends TestCase {
// Activate an Endpoint
adapter.endpointActivation(messageEndpointFactory, activationSpec);
- ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(1000);
+ ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(4000);
if (msg != null) {
assertEquals("Prefetch size hasn't been set", 0, ((ConsumerInfo)msg.getDataStructure()).getPrefetchSize());
} else {
@@ -410,7 +411,7 @@ public class MDBTest extends TestCase {
adapter.stop();
assertNotNull("We got an error message", errorMessage.get());
- assertTrue("correct message", errorMessage.get().contains("zero"));
+ assertTrue("correct message: " + errorMessage.get(), errorMessage.get().contains("zero"));
LogManager.getRootLogger().removeAppender(testAppender);
brokerService.stop();
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
index fb99330..9323d7b 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
@@ -34,8 +34,12 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.util.Wait;
import org.hamcrest.Description;
import org.jmock.Expectations;
import org.jmock.Mockery;
@@ -181,7 +185,11 @@ public class ServerSessionImplTest extends TestCase {
ActiveMQSession session1 = (ActiveMQSession) serverSession1.getSession();
for (int i=0; i<maxMessages; i++) {
MessageDispatch messageDispatch = new MessageDispatch();
- messageDispatch.setMessage(new ActiveMQTextMessage());
+ ActiveMQMessage message = new ActiveMQTextMessage();
+ message.setMessageId(new MessageId("0:0:0:" + i));
+ message.getMessageId().setBrokerSequenceId(i);
+ messageDispatch.setMessage(message);
+ messageDispatch.setConsumerId(new ConsumerId("0:0:0"));
session1.dispatch(messageDispatch);
}
@@ -199,9 +207,13 @@ public class ServerSessionImplTest extends TestCase {
}
});
- while (messageCount.getCount() > maxMessages - 10) {
- TimeUnit.MILLISECONDS.sleep(100);
- }
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return messageCount.getCount() < maxMessages - 10;
+ }
+ });
+ assertTrue("some messages consumed", messageCount.getCount() < maxMessages);
LOG.info("Closing pool on {}", messageCount.getCount());
pool.close();
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
index e5d90d6..4906963 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
@@ -32,6 +32,7 @@ import javax.jms.Topic;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
+import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.transport.vm.VMTransport;
import org.apache.activemq.util.Wait;
@@ -403,7 +404,7 @@ public class JmsRedeliveredTest extends TestCase {
session.close();
}
- public void testNoReceiveConsumerDisconnectDoesNotIncrementRedelivery() throws Exception {
+ public void testNoReceiveConsumerDisconnectDoesIncrementRedelivery() throws Exception {
connection.setClientID(getName());
connection.start();
@@ -425,7 +426,9 @@ public class JmsRedeliveredTest extends TestCase {
}
});
- // whack the connection - like a rebalance or tcp drop
+ // whack the connection - like a rebalance or tcp drop - consumer does not get to communicate
+ // a close and delivered sequence info to broker. So broker is in the dark and must increment
+ // redelivery to be safe
((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop();
session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@@ -434,6 +437,52 @@ public class JmsRedeliveredTest extends TestCase {
assertNotNull(msg);
msg.acknowledge();
+ assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
+ session.close();
+ keepBrokerAliveConnection.close();
+ }
+
+ public void testNoReceiveConsumerAbortDoesNotIncrementRedelivery() throws Exception {
+ connection.setClientID(getName());
+ connection.start();
+
+ Connection keepBrokerAliveConnection = createConnection();
+ keepBrokerAliveConnection.start();
+
+ Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue("queue-" + getName());
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageProducer producer = createProducer(session, queue);
+ producer.send(createTextMessage(session));
+ session.commit();
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 1;
+ }
+ });
+
+ // on abort via something like slowConsumerPolicy
+ ConsumerControl consumerControl = new ConsumerControl();
+ consumerControl.setConsumerId(((ActiveMQMessageConsumer)consumer).getConsumerId());
+ consumerControl.setClose(true);
+ ((ActiveMQConnection) connection).getTransport().narrow(VMTransport.class).getTransportListener().onCommand(consumerControl);
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 0;
+ }
+ });
+
+ session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message msg = messageConsumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
session.close();
keepBrokerAliveConnection.close();
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index b7a870a..659e982 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -37,8 +37,11 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RedeliveryPolicyTest extends JmsTestSupport {
+ static final Logger LOG = LoggerFactory.getLogger(RedeliveryPolicyTest.class);
public static Test suite() {
return suite(RedeliveryPolicyTest.class);
@@ -535,7 +538,6 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
final AtomicInteger receivedCount = new AtomicInteger(0);
for (int i=0;i<=maxRedeliveries+1;i++) {
-
connection = (ActiveMQConnection)factory.createConnection(userName, password);
connections.add(connection);
@@ -553,6 +555,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
public void onMessage(Message message) {
try {
ActiveMQTextMessage m = (ActiveMQTextMessage) message;
+ LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId());
assertEquals("1st", m.getText());
assertEquals(receivedCount.get(), m.getRedeliveryCounter());
receivedCount.incrementAndGet();
@@ -590,7 +593,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
session.run();
return done.await(10, TimeUnit.MILLISECONDS);
}
- });
+ }, 5000);
if (i<=maxRedeliveries) {
assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS));
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
index 2facb98..769cdbf 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
@@ -33,6 +33,7 @@ import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.SessionInfo;
public class BrokerTest extends BrokerTestSupport {
@@ -486,59 +487,6 @@ public class BrokerTest extends BrokerTestSupport {
assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
}
- public void initCombosForTestConsumerCloseCausesRedelivery() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
- }
-
- public void testConsumerCloseCausesRedelivery() throws Exception {
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
-
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- consumerInfo1.setPrefetchSize(100);
- connection1.request(consumerInfo1);
-
- // Send the messages
- connection1.send(createMessage(producerInfo1, destination, deliveryMode));
- connection1.send(createMessage(producerInfo1, destination, deliveryMode));
- connection1.send(createMessage(producerInfo1, destination, deliveryMode));
- connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-
- // Receive the messages.
- for (int i = 0; i < 4; i++) {
- Message m1 = receiveMessage(connection1);
- assertNotNull("m1 is null for index: " + i, m1);
- assertFalse(m1.isRedelivered());
- }
-
- // Close the consumer without acking.. this should cause re-delivery of
- // the messages.
- connection1.send(consumerInfo1.createRemoveCommand());
-
- // Create another consumer that should get the messages again.
- ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1, destination);
- consumerInfo2.setPrefetchSize(100);
- connection1.request(consumerInfo2);
-
- // Receive the messages.
- for (int i = 0; i < 4; i++) {
- Message m1 = receiveMessage(connection1);
- assertNotNull("m1 is null for index: " + i, m1);
- assertTrue(m1.isRedelivered());
- }
- assertNoMessagesLeft(connection1);
-
- }
-
public void testTopicDurableSubscriptionCanBeRestored() throws Exception {
ActiveMQDestination destination = new ActiveMQTopic("TEST");
@@ -1396,14 +1344,18 @@ public class BrokerTest extends BrokerTestSupport {
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
+ long lastDeliveredSeq = -1;
// Get the messages
for (int i = 0; i < 4; i++) {
Message m1 = receiveMessage(connection1);
assertNotNull(m1);
assertFalse(m1.isRedelivered());
+ lastDeliveredSeq = m1.getMessageId().getBrokerSequenceId();
}
// Close the consumer without sending any ACKS.
- connection1.send(closeConsumerInfo(consumerInfo1));
+ RemoveInfo removeInfo = closeConsumerInfo(consumerInfo1);
+ removeInfo.setLastDeliveredSequenceId(lastDeliveredSeq);
+ connection1.send(removeInfo);
// Drain any in flight messages..
while (connection1.getDispatchQueue().poll(0, TimeUnit.MILLISECONDS) != null) {