You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/09/28 12:42:11 UTC

svn commit: r580293 [2/2] - in /incubator/qpid/branches/M2: ./ java/ java/broker/ java/broker/etc/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/queue/ java/broker/src/test/java/org/apache/qpid/serve...

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Fri Sep 28 03:41:49 2007
@@ -149,19 +149,21 @@
     {
         int port = details.getPort();
 
-        if (!_inVmPipeAddress.containsKey(port))
+        synchronized (_inVmPipeAddress)
         {
-            if (AutoCreate)
+            if (!_inVmPipeAddress.containsKey(port))
             {
-                createVMBroker(port);
-            }
-            else
-            {
-                throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
-                    + " does not exist. Auto create disabled.", null);
+                if (AutoCreate)
+                {
+                    createVMBroker(port);
+                }
+                else
+                {
+                    throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+                        + " does not exist. Auto create disabled.", null);
+                }
             }
         }
-
         return new VmPipeTransportConnection(port);
     }
 
@@ -176,69 +178,71 @@
             config.setThreadModel(ReadWriteThreadModel.getInstance());
         }
 
-        if (!_inVmPipeAddress.containsKey(port))
+        synchronized (_inVmPipeAddress)
         {
-            _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
-            IoHandlerAdapter provider = null;
-            try
+            if (!_inVmPipeAddress.containsKey(port))
             {
-                VmPipeAddress pipe = new VmPipeAddress(port);
-
-                provider = createBrokerInstance(port);
-
-                _acceptor.bind(pipe, provider);
-
-                _inVmPipeAddress.put(port, pipe);
-                _logger.info("Created InVM Qpid.AMQP listening on port " + port);
-            }
-            catch (IOException e)
-            {
-                _logger.error("Got IOException.", e);
-
-                // Try and unbind provider
+                _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
+                IoHandlerAdapter provider = null;
                 try
                 {
                     VmPipeAddress pipe = new VmPipeAddress(port);
 
-                    try
-                    {
-                        _acceptor.unbind(pipe);
-                    }
-                    catch (Exception ignore)
-                    {
-                        // ignore
-                    }
-
-                    if (provider == null)
-                    {
-                        provider = createBrokerInstance(port);
-                    }
+                    provider = createBrokerInstance(port);
 
                     _acceptor.bind(pipe, provider);
+
                     _inVmPipeAddress.put(port, pipe);
                     _logger.info("Created InVM Qpid.AMQP listening on port " + port);
                 }
-                catch (IOException justUseFirstException)
+                catch (IOException e)
                 {
-                    String because;
-                    if (e.getCause() == null)
+                    _logger.error("Got IOException.", e);
+
+                    // Try and unbind provider
+                    try
                     {
-                        because = e.toString();
+                        VmPipeAddress pipe = new VmPipeAddress(port);
+
+                        try
+                        {
+                            _acceptor.unbind(pipe);
+                        }
+                        catch (Exception ignore)
+                        {
+                            // ignore
+                        }
+
+                        if (provider == null)
+                        {
+                            provider = createBrokerInstance(port);
+                        }
+
+                        _acceptor.bind(pipe, provider);
+                        _inVmPipeAddress.put(port, pipe);
+                        _logger.info("Created InVM Qpid.AMQP listening on port " + port);
                     }
-                    else
+                    catch (IOException justUseFirstException)
                     {
-                        because = e.getCause().toString();
-                    }
+                        String because;
+                        if (e.getCause() == null)
+                        {
+                            because = e.toString();
+                        }
+                        else
+                        {
+                            because = e.getCause().toString();
+                        }
 
-                    throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+                        throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+                    }
                 }
             }
+            else
+            {
+                _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
+            }
         }
-        else
-        {
-            _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
-        }
-
     }
 
     private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException
@@ -285,25 +289,29 @@
     {
         _logger.info("Killing all VM Brokers");
         _acceptor.unbindAll();
-
-        Iterator keys = _inVmPipeAddress.keySet().iterator();
-
-        while (keys.hasNext())
+        synchronized (_inVmPipeAddress)
         {
-            int id = (Integer) keys.next();
-            _inVmPipeAddress.remove(id);
-        }
+            Iterator keys = _inVmPipeAddress.keySet().iterator();
 
+            while (keys.hasNext())
+            {
+                int id = (Integer) keys.next();
+                _inVmPipeAddress.remove(id);
+            }
+        }
     }
 
     public static void killVMBroker(int port)
     {
-        VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
-        if (pipe != null)
+        synchronized (_inVmPipeAddress)
         {
-            _logger.info("Killing VM Broker:" + port);
-            _inVmPipeAddress.remove(port);
-            _acceptor.unbind(pipe);
+            VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
+            if (pipe != null)
+            {
+                _logger.info("Killing VM Broker:" + port);
+                _inVmPipeAddress.remove(port);
+                _acceptor.unbind(pipe);
+            }
         }
     }
 

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java Fri Sep 28 03:41:49 2007
@@ -68,6 +68,7 @@
     protected void tearDown() throws Exception
     {
         super.tearDown();
+        TransportConnection.killAllVMBrokers();
     }
 
     private void init(AMQConnection connection) throws Exception

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java Fri Sep 28 03:41:49 2007
@@ -48,6 +48,7 @@
     protected void tearDown() throws Exception
     {
         super.tearDown();
+        TransportConnection.killAllVMBrokers();
     }
 
     /**

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java Fri Sep 28 03:41:49 2007
@@ -125,6 +125,7 @@
     protected void tearDown() throws Exception
     {
         closeConnection();
+        TransportConnection.killAllVMBrokers();
         super.tearDown();
     }
 

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java Fri Sep 28 03:41:49 2007
@@ -20,8 +20,11 @@
  */
 package org.apache.qpid.test.unit.client.connection;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -30,14 +33,20 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-import junit.framework.TestCase;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.transport.TransportConnection;
-
+/**
+ * ConnectionStartTest:
+ * This test verifies that a fresh connection is not started and no messages are delivered until the connection is
+ * started.
+ *
+ * After the connection is started then the message should be there, and the connection started.
+ *
+ * This Test verifies that using receive() and a messageListener does not cause message delivery before start is called.   
+ *
+ */
 public class ConnectionStartTest extends TestCase
 {
 
@@ -54,11 +63,18 @@
 
         try
         {
+            // Create Consumer Connection
+            _connection = new AMQConnection(_broker, "guest", "guest", "fred", "test");
 
+            _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
 
-            AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test");
+            Queue queue = _consumerSess.createQueue("ConnectionStartTest");
 
-            AMQQueue queue = new AMQQueue(pubCon,"ConnectionStartTest");
+            _consumer = _consumerSess.createConsumer(queue);
+
+
+            // Create Producer Connection to send message
+            AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test");
 
             Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
 
@@ -66,12 +82,6 @@
 
             pub.send(pubSess.createTextMessage("Initial Message"));
 
-            _connection = new AMQConnection(_broker, "guest", "guest", "fred", "test");
-
-            _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
-
-            _consumer = _consumerSess.createConsumer(queue);
-
             pubCon.close();
 
         }
@@ -85,6 +95,7 @@
     {
         _connection.close();
         TransportConnection.killVMBroker(1);
+        super.tearDown();
     }
 
     public void testSimpleReceiveConnection()
@@ -94,9 +105,9 @@
             assertTrue("Connection should not be started", !_connection.started());
             //Note that this next line will start the dispatcher in the session
             // should really not be called before _connection start
-            assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null);
+            assertNull("There should not be messages waiting for the consumer", _consumer.receiveNoWait());
             _connection.start();
-            assertTrue("There should be messages waiting for the consumer", _consumer.receive(1000) == null);
+            assertNotNull("There should be messages waiting for the consumer", _consumer.receive(1000));
             assertTrue("Connection should be started", _connection.started());
 
         }
@@ -131,7 +142,11 @@
                 }
             });
 
+            // Ensure that setting a ML doesn't start the connection
             assertTrue("Connection should not be started", !_connection.started());
+            // Ensure that the message wasn't delivered while the connection was stopped.
+            assertEquals("Message latch should still be set",1,_gotMessage.getCount());
+
             _connection.start();
             assertTrue("Connection should be started", _connection.started());
 

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Fri Sep 28 03:41:49 2007
@@ -434,6 +434,13 @@
         verifyMessages(_consumer.receive(1000));
     }
 
+    /**
+     * This test sends two messages receives on of them but doesn't ack it.
+     * The consumer is then closed
+     * the first message should be returned as redelivered.
+     *  the second message should be delivered normally. 
+     * @throws Exception
+     */
     public void testSend2ThenCloseAfter1andTryAgain() throws Exception
     {
         assertTrue("session is not transacted", _session.getTransacted());

Modified: incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Fri Sep 28 03:41:49 2007
@@ -711,7 +711,10 @@
         if (trace)
         {
             _logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "...");
-            _logger.trace(_properties.toString());
+            if (_properties != null)
+            {
+                _logger.trace(_properties.toString());
+            }
         }
 
         EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize());

Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Fri Sep 28 03:41:49 2007
@@ -885,24 +885,8 @@
                         synchronized (_sendPauseMonitor)
                         {
                             if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
-                            // && (_sendPauseBarrier.getNumberWaiting() == 1))
                             {
-                                // log.debug("unreceived size estimate under limit = " + unreceivedSize);
-
-                                // Wait on the send pause barrier for the limit to be re-established.
-                                /*try
-                                {*/
-                                // _sendPauseBarrier.await();
                                 _sendPauseMonitor.notify();
-                                /*}
-                                catch (InterruptedException e)
-                                {
-                                    throw new RuntimeException(e);
-                                }
-                                catch (BrokenBarrierException e)
-                                {
-                                    throw new RuntimeException(e);
-                                }*/
                             }
                         }
 
@@ -1159,12 +1143,23 @@
         // If necessary, wait until the max pending message size comes within its limit.
         synchronized (_sendPauseMonitor)
         {
+            // Used to keep track of the number of times that send has to wait.
+            int numWaits = 0;
+
+            // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with
+            // the test timeout.
+            int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
+
             while ((_maxPendingSize > 0))
             {
                 // Get the size estimate of sent but not yet received messages.
                 int unreceived = _unreceived.get();
                 int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
 
+                // log.debug("unreceived = " + unreceived);
+                // log.debug("unreceivedSize = " + unreceivedSize);
+                // log.debug("_maxPendingSize = " + _maxPendingSize);
+
                 if (unreceivedSize > _maxPendingSize)
                 {
                     // log.debug("unreceived size estimate over limit = " + unreceivedSize);
@@ -1172,8 +1167,8 @@
                     // Wait on the send pause barrier for the limit to be re-established.
                     try
                     {
-                        // _sendPauseBarrier.await();
-                        _sendPauseMonitor.wait(1000);
+                        _sendPauseMonitor.wait(10000);
+                        numWaits++;
                     }
                     catch (InterruptedException e)
                     {
@@ -1181,10 +1176,17 @@
                         Thread.currentThread().interrupt();
                         throw new RuntimeException(e);
                     }
-                    /*catch (BrokenBarrierException e)
+
+                    // Fail the test if the send has had to wait more than the maximum allowed number of times.
+                    if (numWaits >= waitLimit)
                     {
-                        throw new RuntimeException(e);
-                    }*/
+                        String errorMessage =
+                            "Send has had to wait for the unreceivedSize (" + unreceivedSize
+                            + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit
+                            + " times.";
+                        log.warn(errorMessage);
+                        throw new RuntimeException(errorMessage);
+                    }
                 }
                 else
                 {

Modified: incubator/qpid/branches/M2/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/pom.xml?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/pom.xml (original)
+++ incubator/qpid/branches/M2/java/pom.xml Fri Sep 28 03:41:49 2007
@@ -506,7 +506,7 @@
             <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-simple</artifactId>
-                <version>1.0</version>
+                <version>1.4.3</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.mina</groupId>

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Sep 28 03:41:49 2007
@@ -33,6 +33,14 @@
  */
 public class TestableMemoryMessageStore extends MemoryMessageStore
 {
+
+    MemoryMessageStore _mms = null;
+
+    public TestableMemoryMessageStore(MemoryMessageStore mms)
+    {
+        _mms = mms;
+    }
+
     public TestableMemoryMessageStore()
     {
         _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
@@ -41,11 +49,25 @@
 
     public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
     {
-        return _metaDataMap;
+        if (_mms != null)
+        {
+            return _mms._metaDataMap;
+        }
+        else
+        {
+            return _metaDataMap;
+        }
     }
 
     public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
     {
-        return _contentBodyMap;
+        if (_mms != null)
+        {
+            return _mms._contentBodyMap;
+        }
+        else
+        {
+            return _contentBodyMap;
+        }
     }
 }

Modified: incubator/qpid/branches/M2/python/tests/basic.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/python/tests/basic.py?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/python/tests/basic.py (original)
+++ incubator/qpid/branches/M2/python/tests/basic.py Fri Sep 28 03:41:49 2007
@@ -339,9 +339,11 @@
         channel = self.channel
         channel.queue_declare(queue="test-get", exclusive=True)
         
-        #publish some messages (no_ack=True)
+        #publish some messages (no_ack=True) with persistent messaging
         for i in range(1, 11):
-            channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+            msg=Content("Message %d" % i)
+            msg["delivery mode"] = 2
+            channel.basic_publish(routing_key="test-get",content=msg )
 
         #use basic_get to read back the messages, and check that we get an empty at the end
         for i in range(1, 11):
@@ -354,18 +356,53 @@
         self.assertEqual(reply.method.klass.name, "basic")
         self.assertEqual(reply.method.name, "get-empty")
 
-        #repeat for no_ack=False
+
+        #publish some messages (no_ack=True) transient messaging
         for i in range(11, 21):
             channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
 
+        #use basic_get to read back the messages, and check that we get an empty at the end
         for i in range(11, 21):
+            reply = channel.basic_get(no_ack=True)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get-ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get-empty")
+
+        #repeat for no_ack=False
+
+        #publish some messages (no_ack=False) with persistent messaging
+        for i in range(21, 31):
+            msg=Content("Message %d" % i)
+            msg["delivery mode"] = 2
+            channel.basic_publish(routing_key="test-get",content=msg )
+
+        #use basic_get to read back the messages, and check that we get an empty at the end
+        for i in range(21, 31):
+            reply = channel.basic_get(no_ack=False)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get-ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get-empty")
+
+        #public some messages (no_ack=False) with transient messaging 
+        for i in range(31, 41):
+            channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+        for i in range(31, 41):
             reply = channel.basic_get(no_ack=False)
             self.assertEqual(reply.method.klass.name, "basic")
             self.assertEqual(reply.method.name, "get-ok")
             self.assertEqual("Message %d" % i, reply.content.body)
-            if(i == 13):
+            if(i == 33):
                 channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
-            if(i in [15, 17, 19]):
+            if(i in [35, 37, 39]):
                 channel.basic_ack(delivery_tag=reply.delivery_tag)
 
         reply = channel.basic_get(no_ack=True)
@@ -375,8 +412,8 @@
         #recover(requeue=True)
         channel.basic_recover(requeue=True)
         
-        #get the unacked messages again (14, 16, 18, 20)
-        for i in [14, 16, 18, 20]:
+        #get the unacked messages again (34, 36, 38, 40)
+        for i in [34, 36, 38, 40]:
             reply = channel.basic_get(no_ack=False)
             self.assertEqual(reply.method.klass.name, "basic")
             self.assertEqual(reply.method.name, "get-ok")