You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/04/22 23:38:04 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5735 - fix up semantics around lastDeliveredSequenceId

Repository: activemq
Updated Branches:
  refs/heads/master 3a5f127d5 -> eb6c08263


https://issues.apache.org/jira/browse/AMQ-5735 - fix up semantics around lastDeliveredSequenceId


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eb6c0826
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eb6c0826
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eb6c0826

Branch: refs/heads/master
Commit: eb6c0826311a876769e56029dc1e261ee7519db5
Parents: 3a5f127
Author: gtully <ga...@gmail.com>
Authored: Wed Apr 22 16:32:17 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Apr 22 16:32:17 2015 +0100

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |  3 +-
 .../apache/activemq/broker/region/Queue.java    | 17 +++---
 .../org/apache/activemq/ActiveMQConnection.java |  4 +-
 .../org/apache/activemq/ActiveMQSession.java    |  5 +-
 .../apache/activemq/command/ConsumerInfo.java   |  2 +-
 .../org/apache/activemq/command/RemoveInfo.java |  5 +-
 .../java/org/apache/activemq/ra/MDBTest.java    |  5 +-
 .../activemq/ra/ServerSessionImplTest.java      | 20 +++++--
 .../org/apache/activemq/JmsRedeliveredTest.java | 53 ++++++++++++++++-
 .../apache/activemq/RedeliveryPolicyTest.java   |  7 ++-
 .../org/apache/activemq/broker/BrokerTest.java  | 60 ++------------------
 11 files changed, 101 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 5c6307a..b5e1c55 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -67,6 +67,7 @@ import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
@@ -1187,7 +1188,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
                 cs.getContext().getStopping().set(true);
                 try {
                     LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
-                    processRemoveConnection(cs.getInfo().getConnectionId(), -1);
+                    processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
                 } catch (Throwable ignore) {
                     ignore.printStackTrace();
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 3438dcc..8d12a8b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -66,6 +66,7 @@ import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
@@ -482,9 +483,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
     }
 
     @Override
-    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
+    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
             throws Exception {
-        super.removeSubscription(context, sub, lastDeiveredSequenceId);
+        super.removeSubscription(context, sub, lastDeliveredSequenceId);
         // synchronize with dispatch method so that no new messages are sent
         // while removing up a subscription.
         pagedInPendingDispatchLock.writeLock().lock();
@@ -492,7 +493,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{
                     getActiveMQDestination().getQualifiedName(),
                     sub,
-                    lastDeiveredSequenceId,
+                    lastDeliveredSequenceId,
                     getDestinationStatistics().getDequeues().getCount(),
                     getDestinationStatistics().getDispatched().getCount(),
                     getDestinationStatistics().getInflight().getCount(),
@@ -536,12 +537,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                 List<MessageReference> unAckedMessages = sub.remove(context, this);
 
                 // locate last redelivered in unconsumed list (list in delivery rather than seq order)
-                if (lastDeiveredSequenceId > 0) {
+                if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
                     for (MessageReference ref : unAckedMessages) {
-                        if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
+                        if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
                             lastDeliveredRef = ref;
                             markAsRedelivered = true;
-                            LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeiveredSequenceId, ref.getMessageId());
+                            LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId());
                             break;
                         }
                     }
@@ -557,7 +558,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                         qmr.unlock();
 
                         // have no delivery information
-                        if (lastDeiveredSequenceId == 0) {
+                        if (lastDeliveredSequenceId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
                             qmr.incrementRedeliveryCounter();
                         } else {
                             if (markAsRedelivered) {
@@ -821,9 +822,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
         checkUsage(context, producerExchange, message);
         sendLock.lockInterruptibly();
         try {
+            message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
             if (store != null && message.isPersistent()) {
                 try {
-                    message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                     if (messages.isCacheEnabled()) {
                         result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
                         result.addListener(new PendingMarshalUsageTracker(message));

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 87e4c91..4d425a2 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -664,7 +664,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
                         }
                     }
 
-                    long lastDeliveredSequenceId = 0;
+                    long lastDeliveredSequenceId = -1;
                     for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
                         ActiveMQSession s = i.next();
                         s.dispose();
@@ -683,7 +683,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
                         RemoveInfo removeCommand = info.createRemoveCommand();
                         removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
                         try {
-                            doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
+                            doSyncSendPacket(removeCommand, closeTimeout);
                         } catch (JMSException e) {
                             if (e.getCause() instanceof RequestTimedOutIOException) {
                                 // expected

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 14c2869..1d2ae83 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -228,7 +228,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
     private DeliveryListener deliveryListener;
     private MessageTransformer transformer;
     private BlobTransferPolicy blobTransferPolicy;
-    private long lastDeliveredSequenceId;
+    private long lastDeliveredSequenceId = -2;
 
     /**
      * Construct the Session
@@ -878,7 +878,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
         MessageDispatch messageDispatch;
         while ((messageDispatch = executor.dequeueNoWait()) != null) {
             final MessageDispatch md = messageDispatch;
-            ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
+            final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
 
             MessageAck earlyAck = null;
             if (message.isExpired()) {
@@ -913,6 +913,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
             }
 
             md.setDeliverySequenceId(getNextDeliveryId());
+            lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
 
             final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
             try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
index 09b6be5..0c1e691 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
@@ -62,7 +62,7 @@ public class ConsumerInfo extends BaseCommand {
 
     // not marshalled, populated from RemoveInfo, the last message delivered, used
     // to suppress redelivery on prefetched messages after close
-    private transient long lastDeliveredSequenceId;
+    private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET;
     private transient long assignedGroupCount;
     // originated from a
     // network connection

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java
index 3452104..118940f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/RemoveInfo.java
@@ -29,9 +29,10 @@ import org.apache.activemq.state.CommandVisitor;
 public class RemoveInfo extends BaseCommand {
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO;
-
+    public static final int LAST_DELIVERED_UNSET = -1;
+    public static final int LAST_DELIVERED_UNKNOWN = -2;
     protected DataStructure objectId;
-    protected long lastDeliveredSequenceId;
+    protected long lastDeliveredSequenceId = LAST_DELIVERED_UNKNOWN;
 
     public RemoveInfo() {
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
index 904dd18..af36389 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
@@ -310,6 +310,7 @@ public class MDBTest extends TestCase {
             @Override
             public void doAppend(LoggingEvent event) {
                 if (event.getLevel().isGreaterOrEqual(Level.ERROR)) {
+                    System.err.println("Event :" + event.getRenderedMessage());
                     errorMessage.set(event.getRenderedMessage());
                 }
             }
@@ -389,7 +390,7 @@ public class MDBTest extends TestCase {
         // Activate an Endpoint
         adapter.endpointActivation(messageEndpointFactory, activationSpec);
 
-        ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(1000);
+        ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(4000);
         if (msg != null) {
             assertEquals("Prefetch size hasn't been set", 0, ((ConsumerInfo)msg.getDataStructure()).getPrefetchSize());
         } else {
@@ -410,7 +411,7 @@ public class MDBTest extends TestCase {
         adapter.stop();
 
         assertNotNull("We got an error message", errorMessage.get());
-        assertTrue("correct message", errorMessage.get().contains("zero"));
+        assertTrue("correct message: " +  errorMessage.get(), errorMessage.get().contains("zero"));
 
         LogManager.getRootLogger().removeAppender(testAppender);
         brokerService.stop();

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
index fb99330..9323d7b 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
@@ -34,8 +34,12 @@ import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.util.Wait;
 import org.hamcrest.Description;
 import org.jmock.Expectations;
 import org.jmock.Mockery;
@@ -181,7 +185,11 @@ public class ServerSessionImplTest extends TestCase {
         ActiveMQSession session1 = (ActiveMQSession) serverSession1.getSession();
         for (int i=0; i<maxMessages; i++) {
             MessageDispatch messageDispatch = new  MessageDispatch();
-            messageDispatch.setMessage(new ActiveMQTextMessage());
+            ActiveMQMessage message = new ActiveMQTextMessage();
+            message.setMessageId(new MessageId("0:0:0:" + i));
+            message.getMessageId().setBrokerSequenceId(i);
+            messageDispatch.setMessage(message);
+            messageDispatch.setConsumerId(new ConsumerId("0:0:0"));
             session1.dispatch(messageDispatch);
         }
 
@@ -199,9 +207,13 @@ public class ServerSessionImplTest extends TestCase {
             }
         });
 
-        while (messageCount.getCount() > maxMessages - 10) {
-            TimeUnit.MILLISECONDS.sleep(100);
-        }
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return messageCount.getCount() < maxMessages - 10;
+            }
+        });
+        assertTrue("some messages consumed", messageCount.getCount() < maxMessages);
         LOG.info("Closing pool on {}", messageCount.getCount());
         pool.close();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
index e5d90d6..4906963 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
@@ -32,6 +32,7 @@ import javax.jms.Topic;
 import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
+import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.transport.vm.VMTransport;
 import org.apache.activemq.util.Wait;
 
@@ -403,7 +404,7 @@ public class JmsRedeliveredTest extends TestCase {
         session.close();
     }
 
-    public void testNoReceiveConsumerDisconnectDoesNotIncrementRedelivery() throws Exception {
+    public void testNoReceiveConsumerDisconnectDoesIncrementRedelivery() throws Exception {
         connection.setClientID(getName());
         connection.start();
 
@@ -425,7 +426,9 @@ public class JmsRedeliveredTest extends TestCase {
             }
         });
 
-        // whack the connection - like a rebalance or tcp drop
+        // whack the connection - like a rebalance or tcp drop - consumer does not get to communicate
+        // a close and delivered sequence info to broker. So broker is in the dark and must increment
+        // redelivery to be safe
         ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop();
 
         session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@@ -434,6 +437,52 @@ public class JmsRedeliveredTest extends TestCase {
         assertNotNull(msg);
         msg.acknowledge();
 
+        assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
+        session.close();
+        keepBrokerAliveConnection.close();
+    }
+
+    public void testNoReceiveConsumerAbortDoesNotIncrementRedelivery() throws Exception {
+        connection.setClientID(getName());
+        connection.start();
+
+        Connection keepBrokerAliveConnection = createConnection();
+        keepBrokerAliveConnection.start();
+
+        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        final MessageConsumer consumer = session.createConsumer(queue);
+
+        MessageProducer producer = createProducer(session, queue);
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 1;
+            }
+        });
+
+        // on abort via something like slowConsumerPolicy
+        ConsumerControl consumerControl = new ConsumerControl();
+        consumerControl.setConsumerId(((ActiveMQMessageConsumer)consumer).getConsumerId());
+        consumerControl.setClose(true);
+        ((ActiveMQConnection) connection).getTransport().narrow(VMTransport.class).getTransportListener().onCommand(consumerControl);
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 0;
+            }
+        });
+
+        session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer messageConsumer = session.createConsumer(queue);
+        Message msg = messageConsumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
         assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
         session.close();
         keepBrokerAliveConnection.close();

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index b7a870a..659e982 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -37,8 +37,11 @@ import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RedeliveryPolicyTest extends JmsTestSupport {
+    static final Logger LOG = LoggerFactory.getLogger(RedeliveryPolicyTest.class);
 
     public static Test suite() {
         return suite(RedeliveryPolicyTest.class);
@@ -535,7 +538,6 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
         final AtomicInteger receivedCount = new AtomicInteger(0);
 
         for (int i=0;i<=maxRedeliveries+1;i++) {
-
             connection = (ActiveMQConnection)factory.createConnection(userName, password);
             connections.add(connection);
 
@@ -553,6 +555,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
                 public void onMessage(Message message) {
                     try {
                         ActiveMQTextMessage m = (ActiveMQTextMessage) message;
+                        LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId());
                         assertEquals("1st", m.getText());
                         assertEquals(receivedCount.get(), m.getRedeliveryCounter());
                         receivedCount.incrementAndGet();
@@ -590,7 +593,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
                     session.run();
                     return done.await(10, TimeUnit.MILLISECONDS);
                 }
-            });
+            }, 5000);
 
             if (i<=maxRedeliveries) {
                 assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS));

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb6c0826/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
index 2facb98..769cdbf 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
@@ -33,6 +33,7 @@ import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.SessionInfo;
 
 public class BrokerTest extends BrokerTestSupport {
@@ -486,59 +487,6 @@ public class BrokerTest extends BrokerTestSupport {
         assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
     }
 
-    public void initCombosForTestConsumerCloseCausesRedelivery() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
-    }
-
-    public void testConsumerCloseCausesRedelivery() throws Exception {
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo1);
-
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        consumerInfo1.setPrefetchSize(100);
-        connection1.request(consumerInfo1);
-
-        // Send the messages
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-
-        // Receive the messages.
-        for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull("m1 is null for index: " + i, m1);
-            assertFalse(m1.isRedelivered());
-        }
-
-        // Close the consumer without acking.. this should cause re-delivery of
-        // the messages.
-        connection1.send(consumerInfo1.createRemoveCommand());
-
-        // Create another consumer that should get the messages again.
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo1, destination);
-        consumerInfo2.setPrefetchSize(100);
-        connection1.request(consumerInfo2);
-
-        // Receive the messages.
-        for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull("m1 is null for index: " + i, m1);
-            assertTrue(m1.isRedelivered());
-        }
-        assertNoMessagesLeft(connection1);
-
-    }
-
     public void testTopicDurableSubscriptionCanBeRestored() throws Exception {
 
         ActiveMQDestination destination = new ActiveMQTopic("TEST");
@@ -1396,14 +1344,18 @@ public class BrokerTest extends BrokerTestSupport {
         connection1.send(createMessage(producerInfo, destination, deliveryMode));
         connection1.send(createMessage(producerInfo, destination, deliveryMode));
 
+        long lastDeliveredSeq = -1;
         // Get the messages
         for (int i = 0; i < 4; i++) {
             Message m1 = receiveMessage(connection1);
             assertNotNull(m1);
             assertFalse(m1.isRedelivered());
+            lastDeliveredSeq = m1.getMessageId().getBrokerSequenceId();
         }
         // Close the consumer without sending any ACKS.
-        connection1.send(closeConsumerInfo(consumerInfo1));
+        RemoveInfo removeInfo = closeConsumerInfo(consumerInfo1);
+        removeInfo.setLastDeliveredSequenceId(lastDeliveredSeq);
+        connection1.send(removeInfo);
 
         // Drain any in flight messages..
         while (connection1.getDispatchQueue().poll(0, TimeUnit.MILLISECONDS) != null) {