You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/05/09 15:47:01 UTC

svn commit: r536528 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/messageStore/ systests/src/main/java/org/apache/qpid/server/messageStore/ systests/src/main/java/org/apache/qpid/server/queue/

Author: gsim
Date: Wed May  9 06:47:00 2007
New Revision: 536528

URL: http://svn.apache.org/viewvc?view=rev&rev=536528
Log:
Applied patch from Arnaud Simon (asimon@redhat.com) to add back the AckTest and get it working again.


Added:
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/messageStore/
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/messageStore/TestableMemoryMessageStore.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java?view=diff&rev=536528&r1=536527&r2=536528
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java Wed May  9 06:47:00 2007
@@ -1,275 +1,275 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.messageStore;
-
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.txn.TransactionRecord;
-import org.apache.qpid.server.txn.MemoryEnqueueRecord;
-import org.apache.qpid.server.txn.MemoryDequeueRecord;
-import org.apache.qpid.server.exception.*;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-
-import javax.transaction.xa.Xid;
-import java.util.*;
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-
-/**
- * This a simple in-memory implementation of a message store i.e. nothing is persisted
- * <p/>
- * Created by Arnaud Simon
- * Date: 26-Apr-2007
- * Time: 08:23:45
- */
-public class MemoryMessageStore implements MessageStore
-{
-    //========================================================================
-    // Static Constants
-    //========================================================================
-    // The logger for this class
-    private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
-
-    // The table of message with its corresponding stream containing the message body
-    private Map<StorableMessage, ByteArrayOutputStream> _stagedMessages;
-    // The queue/messages association
-    private Map<StorableQueue, List<StorableMessage>> _queueMap;
-    // the message ID
-    private long _messageID = 0;
-    // The transaction manager
-    private TransactionManager _txm;
-
-    //========================================================================
-    // Interface MessageStore
-    //========================================================================
-
-    public void removeExchange(Exchange exchange)
-            throws
-            InternalErrorException
-    {
-        // do nothing this is inmemory 
-    }
-
-    public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
-            throws
-            InternalErrorException
-    {
-        // do nothing this is inmemory
-    }
-
-    public void createExchange(Exchange exchange)
-            throws
-            InternalErrorException
-    {
-        // do nothing this is inmemory
-    }
-
-    public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
-            throws
-            InternalErrorException
-    {
-        // do nothing this is inmemory
-    }
-
-    public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
-            throws
-            InternalErrorException,
-            IllegalArgumentException
-    {
-        _log.info("Configuring memory message store");
-        // Initialise the maps
-        _stagedMessages = new HashMap<StorableMessage, ByteArrayOutputStream>();
-        _queueMap = new HashMap<StorableQueue, List<StorableMessage>>();
-        _txm = tm;
-        _txm.configure(this, "txn", config);
-    }
-
-    public void close()
-            throws
-            InternalErrorException
-    {
-        _log.info("Closing memory message store");
-        _stagedMessages.clear();
-        _queueMap.clear();
-    }
-
-    public void createQueue(StorableQueue queue)
-            throws
-            InternalErrorException,
-            QueueAlreadyExistsException
-    {
-        if (_queueMap.containsKey(queue))
-        {
-            throw new QueueAlreadyExistsException("queue " + queue + " already exists");
-        }
-        // add this queue into the map
-        _queueMap.put(queue, new LinkedList<StorableMessage>());
-    }
-
-    public void destroyQueue(StorableQueue queue)
-            throws
-            InternalErrorException,
-            QueueDoesntExistException
-    {
-        if (!_queueMap.containsKey(queue))
-        {
-            throw new QueueDoesntExistException("queue " + queue + " does not exist");
-        }
-        // remove this queue from the map
-        _queueMap.remove(queue);
-    }
-
-    public void stage(StorableMessage m)
-            throws
-            InternalErrorException,
-            MessageAlreadyStagedException
-    {
-        if (_stagedMessages.containsKey(m))
-        {
-            throw new MessageAlreadyStagedException("message " + m + " already staged");
-        }
-        _stagedMessages.put(m, new ByteArrayOutputStream());
-        m.staged();
-    }
-
-    public void appendContent(StorableMessage m, byte[] data, int offset, int size)
-            throws
-            InternalErrorException,
-            MessageDoesntExistException
-    {
-        if (!_stagedMessages.containsKey(m))
-        {
-            throw new MessageDoesntExistException("message " + m + " has not been staged");
-        }
-        _stagedMessages.get(m).write(data, offset, size);
-    }
-
-    public byte[] loadContent(StorableMessage m, int offset, int size)
-            throws
-            InternalErrorException,
-            MessageDoesntExistException
-    {
-        if (!_stagedMessages.containsKey(m))
-        {
-            throw new MessageDoesntExistException("message " + m + " has not been staged");
-        }
-        byte[] result = new byte[size];
-        ByteBuffer buf = ByteBuffer.allocate(size);
-        buf.put(_stagedMessages.get(m).toByteArray(), offset, size);
-        buf.get(result);
-        return result;
-    }
-
-    public void destroy(StorableMessage m)
-            throws
-            InternalErrorException,
-            MessageDoesntExistException
-    {
-        if (!_stagedMessages.containsKey(m))
-        {
-            throw new MessageDoesntExistException("message " + m + " has not been staged");
-        }
-        _stagedMessages.remove(m);
-    }
-
-    public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
-            throws
-            InternalErrorException,
-            QueueDoesntExistException,
-            InvalidXidException,
-            UnknownXidException,
-            MessageDoesntExistException
-    {
-        if (xid != null)
-        {
-            // this is a tx operation
-            TransactionRecord enqueueRecord = new MemoryEnqueueRecord(m, queue);
-            _txm.getTransaction(xid).addRecord(enqueueRecord);
-        } else
-        {
-            if (!_stagedMessages.containsKey(m))
-            {
-                try
-                {
-                    stage(m);
-                } catch (MessageAlreadyStagedException e)
-                {
-                    throw new InternalErrorException(e);
-                }
-                appendContent(m, m.getData(), 0, m.getPayloadSize());
-            }
-            if (!_queueMap.containsKey(queue))
-            {
-                throw new QueueDoesntExistException("queue " + queue + " dos not exist");
-            }
-            _queueMap.get(queue).add(m);
-            m.enqueue(queue);
-        }
-    }
-
-    public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
-            throws
-            InternalErrorException,
-            QueueDoesntExistException,
-            InvalidXidException,
-            UnknownXidException
-    {
-        if (xid != null)
-        {
-            // this is a tx operation
-            TransactionRecord dequeueRecord = new MemoryDequeueRecord(m, queue);
-            _txm.getTransaction(xid).addRecord(dequeueRecord);
-        } else
-        {
-            if (!_queueMap.containsKey(queue))
-            {
-                throw new QueueDoesntExistException("queue " + queue + " dos not exist");
-            }
-            m.dequeue(queue);
-            _queueMap.get(queue).remove(m);
-            if (!m.isEnqueued())
-            {
-                // we can delete this message
-                _stagedMessages.remove(m);
-            }
-        }
-    }
-
-    public Collection<StorableQueue> getAllQueues()
-            throws
-            InternalErrorException
-    {
-        return _queueMap.keySet();
-    }
-
-    public Collection<StorableMessage> getAllMessages(StorableQueue queue)
-            throws
-            InternalErrorException
-    {
-        return _queueMap.get(queue);
-    }
-
-    public long getNewMessageId()
-    {
-        return _messageID++;
-    }
-}
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.txn.TransactionRecord;
+import org.apache.qpid.server.txn.MemoryEnqueueRecord;
+import org.apache.qpid.server.txn.MemoryDequeueRecord;
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+
+import javax.transaction.xa.Xid;
+import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This a simple in-memory implementation of a message store i.e. nothing is persisted
+ * <p/>
+ * Created by Arnaud Simon
+ * Date: 26-Apr-2007
+ * Time: 08:23:45
+ */
+public class MemoryMessageStore implements MessageStore
+{
+    //========================================================================
+    // Static Constants
+    //========================================================================
+    // The logger for this class
+    private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
+
+    // The table of message with its corresponding stream containing the message body
+    private Map<StorableMessage, ByteArrayOutputStream> _stagedMessages;
+    // The queue/messages association
+    protected Map<StorableQueue, List<StorableMessage>> _queueMap;
+    // the message ID
+    private long _messageID = 0;
+    // The transaction manager
+    private TransactionManager _txm;
+
+    //========================================================================
+    // Interface MessageStore
+    //========================================================================
+
+    public void removeExchange(Exchange exchange)
+            throws
+            InternalErrorException
+    {
+        // do nothing this is inmemory 
+    }
+
+    public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+            throws
+            InternalErrorException
+    {
+        // do nothing this is inmemory
+    }
+
+    public void createExchange(Exchange exchange)
+            throws
+            InternalErrorException
+    {
+        // do nothing this is inmemory
+    }
+
+    public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+            throws
+            InternalErrorException
+    {
+        // do nothing this is inmemory
+    }
+
+    public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+            throws
+            InternalErrorException,
+            IllegalArgumentException
+    {
+        _log.info("Configuring memory message store");
+        // Initialise the maps
+        _stagedMessages = new HashMap<StorableMessage, ByteArrayOutputStream>();
+        _queueMap = new HashMap<StorableQueue, List<StorableMessage>>();
+        _txm = tm;
+        _txm.configure(this, "txn", config);
+    }
+
+    public void close()
+            throws
+            InternalErrorException
+    {
+        _log.info("Closing memory message store");
+        _stagedMessages.clear();
+        _queueMap.clear();
+    }
+
+    public void createQueue(StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueAlreadyExistsException
+    {
+        if (_queueMap.containsKey(queue))
+        {
+            throw new QueueAlreadyExistsException("queue " + queue + " already exists");
+        }
+        // add this queue into the map
+        _queueMap.put(queue, new LinkedList<StorableMessage>());
+    }
+
+    public void destroyQueue(StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException
+    {
+        if (!_queueMap.containsKey(queue))
+        {
+            throw new QueueDoesntExistException("queue " + queue + " does not exist");
+        }
+        // remove this queue from the map
+        _queueMap.remove(queue);
+    }
+
+    public void stage(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageAlreadyStagedException
+    {
+        if (_stagedMessages.containsKey(m))
+        {
+            throw new MessageAlreadyStagedException("message " + m + " already staged");
+        }
+        _stagedMessages.put(m, new ByteArrayOutputStream());
+        m.staged();
+    }
+
+    public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException
+    {
+        if (!_stagedMessages.containsKey(m))
+        {
+            throw new MessageDoesntExistException("message " + m + " has not been staged");
+        }
+        _stagedMessages.get(m).write(data, offset, size);
+    }
+
+    public byte[] loadContent(StorableMessage m, int offset, int size)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException
+    {
+        if (!_stagedMessages.containsKey(m))
+        {
+            throw new MessageDoesntExistException("message " + m + " has not been staged");
+        }
+        byte[] result = new byte[size];
+        ByteBuffer buf = ByteBuffer.allocate(size);
+        buf.put(_stagedMessages.get(m).toByteArray(), offset, size);
+        buf.get(result);
+        return result;
+    }
+
+    public void destroy(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException
+    {
+        if (!_stagedMessages.containsKey(m))
+        {
+            throw new MessageDoesntExistException("message " + m + " has not been staged");
+        }
+        _stagedMessages.remove(m);
+    }
+
+    public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException,
+            MessageDoesntExistException
+    {
+        if (xid != null)
+        {
+            // this is a tx operation
+            TransactionRecord enqueueRecord = new MemoryEnqueueRecord(m, queue);
+            _txm.getTransaction(xid).addRecord(enqueueRecord);
+        } else
+        {
+            if (!_stagedMessages.containsKey(m))
+            {
+                try
+                {
+                    stage(m);
+                } catch (MessageAlreadyStagedException e)
+                {
+                    throw new InternalErrorException(e);
+                }
+                appendContent(m, m.getData(), 0, m.getPayloadSize());
+            }
+            if (!_queueMap.containsKey(queue))
+            {
+                throw new QueueDoesntExistException("queue " + queue + " dos not exist");
+            }
+            _queueMap.get(queue).add(m);
+            m.enqueue(queue);
+        }
+    }
+
+    public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException
+    {
+        if (xid != null)
+        {
+            // this is a tx operation
+            TransactionRecord dequeueRecord = new MemoryDequeueRecord(m, queue);
+            _txm.getTransaction(xid).addRecord(dequeueRecord);
+        } else
+        {
+            if (!_queueMap.containsKey(queue))
+            {
+                throw new QueueDoesntExistException("queue " + queue + " dos not exist");
+            }
+            m.dequeue(queue);
+            _queueMap.get(queue).remove(m);
+            if (!m.isEnqueued())
+            {
+                // we can delete this message
+                _stagedMessages.remove(m);
+            }
+        }
+    }
+
+    public Collection<StorableQueue> getAllQueues()
+            throws
+            InternalErrorException
+    {
+        return _queueMap.keySet();
+    }
+
+    public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+            throws
+            InternalErrorException
+    {
+        return _queueMap.get(queue);
+    }
+
+    public long getNewMessageId()
+    {
+        return _messageID++;
+    }
+}

Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/messageStore/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/messageStore/TestableMemoryMessageStore.java?view=auto&rev=536528
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/messageStore/TestableMemoryMessageStore.java (added)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/messageStore/TestableMemoryMessageStore.java Wed May  9 06:47:00 2007
@@ -0,0 +1,41 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import java.util.List;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 09-May-2007
+ * Time: 11:45:18
+ */
+public class TestableMemoryMessageStore extends MemoryMessageStore
+{
+    public int getNumberStoredMessages()
+    {
+        int res = 0;
+        if (_queueMap != null)
+        {
+            for (List<StorableMessage> l : _queueMap.values())
+            {
+                res = res + l.size();
+            }
+        }
+        return res;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/messageStore/TestableMemoryMessageStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/messageStore/TestableMemoryMessageStore.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=536528&r1=536527&r2=536528
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Wed May  9 06:47:00 2007
@@ -0,0 +1,359 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.messageStore.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.MemoryTransactionManager;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.HashSet;
+
+/**
+ * Tests that acknowledgements are handled correctly.
+ */
+public class AckTest extends TestCase
+{
+    private static final Logger _log = Logger.getLogger(AckTest.class);
+
+    private SubscriptionImpl _subscription;
+
+    private MockProtocolSession _protocolSession;
+
+    private TestableMemoryMessageStore _messageStore;
+
+    private MemoryTransactionManager _txm;
+
+    private StoreContext _storeContext = new StoreContext();
+
+    private AMQChannel _channel;
+
+    private SubscriptionSet _subscriptionManager;
+
+    private AMQQueue _queue;
+
+    private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag");
+
+    public AckTest() throws Exception
+    {
+        ApplicationRegistry.initialise(new NullApplicationRegistry());
+    }
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _messageStore = new TestableMemoryMessageStore();
+        _txm = new MemoryTransactionManager();
+        _protocolSession = new MockProtocolSession(_messageStore);
+        _channel = new AMQChannel(_protocolSession,5,_txm, _messageStore, null/*dont need exchange registry*/);
+
+        _protocolSession.addChannel(_channel);
+        _subscriptionManager = new SubscriptionSet();
+        _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager);
+    }
+
+    private void publishMessages(int count) throws AMQException
+    {
+        publishMessages(count, false);
+    }
+
+    private void publishMessages(int count, boolean persistent) throws AMQException
+    {
+        TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
+                                                                      new LinkedList<RequiredDeliveryException>(),
+                                                                      new HashSet<Long>());
+        MessageHandleFactory factory = new MessageHandleFactory();
+        for (int i = 1; i <= count; i++)
+        {
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Establish some way to determine the version for the test.
+            MessagePublishInfo publishBody = new MessagePublishInfo()
+            {
+
+                public AMQShortString getExchange()
+                {
+                    return new AMQShortString("someExchange");
+                }
+
+                public boolean isImmediate()
+                {
+                    return false;
+                }
+
+                public boolean isMandatory()
+                {
+                    return false;
+                }
+
+                public AMQShortString getRoutingKey()
+                {
+                    return new AMQShortString("rk");
+                }
+            };
+            AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext);
+            if (persistent)
+            {
+                BasicContentHeaderProperties b = new BasicContentHeaderProperties();
+                //This is DeliveryMode.PERSISTENT
+                b.setDeliveryMode((byte) 2);
+                ContentHeaderBody cb = new ContentHeaderBody();
+                cb.properties = b;
+                msg.setContentHeaderBody(cb);
+            }
+            else
+            {
+                msg.setContentHeaderBody(new ContentHeaderBody());
+            }
+            // we increment the reference here since we are not delivering the messaging to any queues, which is where
+            // the reference is normally incremented. The test is easier to construct if we have direct access to the
+            // subscription
+            msg.incrementReference();
+            msg.routingComplete(_messageStore, _storeContext, factory);
+            // we manually send the message to the subscription
+            _subscription.send(msg, _queue);
+        }
+    }
+
+    /**
+     * Tests that the acknowledgements are correctly associated with a channel and
+     * order is preserved when acks are enabled
+     */
+    public void testAckChannelAssociationTest() throws AMQException
+    {
+        _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+        final int msgCount = 10;
+        publishMessages(msgCount, true);
+
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == msgCount);
+ //       assertTrue(_messageStore.getNumberStoredMessages() == msgCount);
+
+        Set<Long> deliveryTagSet = map.getDeliveryTags();
+        int i = 1;
+        for (long deliveryTag : deliveryTagSet)
+        {
+            assertTrue(deliveryTag == i);
+            i++;
+            UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.queue == _queue);
+        }
+
+        assertTrue(map.size() == msgCount);
+//        assertTrue(_messageStore.getNumberStoredMessages() == msgCount);
+    }
+
+    /**
+     * Tests that in no-ack mode no messages are retained
+     */
+    public void testNoAckMode() throws AMQException
+    {
+        // false arg means no acks expected
+        _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false);
+        final int msgCount = 10;
+        publishMessages(msgCount);
+
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == 0);
+        assertTrue(_messageStore.getNumberStoredMessages() == 0);
+    }
+
+    /**
+     * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+     * set case)
+     */
+    public void testSingleAckReceivedTest() throws AMQException
+    {
+        _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+        final int msgCount = 10;
+        publishMessages(msgCount);
+
+        _channel.acknowledgeMessage(5, false);
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == msgCount - 1);
+
+        Set<Long> deliveryTagSet = map.getDeliveryTags();
+        int i = 1;
+        for (long deliveryTag : deliveryTagSet)
+        {
+            assertTrue(deliveryTag == i);
+            UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.queue == _queue);
+            // 5 is the delivery tag of the message that *should* be removed
+            if (++i == 5)
+            {
+                ++i;
+            }
+        }
+    }
+
+    /**
+     * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
+     * set case)
+     */
+    public void testMultiAckReceivedTest() throws AMQException
+    {
+        _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+        final int msgCount = 10;
+        publishMessages(msgCount);
+
+        _channel.acknowledgeMessage(5, true);
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == 5);
+
+        Set<Long> deliveryTagSet = map.getDeliveryTags();
+        int i = 1;
+        for (long deliveryTag : deliveryTagSet)
+        {
+            assertTrue(deliveryTag == i + 5);
+            UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.queue == _queue);
+            ++i;
+        }
+    }
+
+    /**
+     * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
+     */
+    public void testMultiAckAllReceivedTest() throws AMQException
+    {
+        _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+        final int msgCount = 10;
+        publishMessages(msgCount);
+
+        _channel.acknowledgeMessage(0, true);
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == 0);
+
+        Set<Long> deliveryTagSet = map.getDeliveryTags();
+        int i = 1;
+        for (long deliveryTag : deliveryTagSet)
+        {
+            assertTrue(deliveryTag == i + 5);
+            UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.queue == _queue);
+            ++i;
+        }
+    }
+
+    public void testPrefetchHighLow() throws AMQException
+    {
+        int lowMark = 5;
+        int highMark = 10;
+
+        _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+        _channel.setPrefetchLowMarkCount(lowMark);
+        _channel.setPrefetchHighMarkCount(highMark);
+
+        assertTrue(_channel.getPrefetchLowMarkCount() == lowMark);
+        assertTrue(_channel.getPrefetchHighMarkCount() == highMark);
+
+        publishMessages(highMark);
+
+        // at this point we should have sent out only highMark messages
+        // which have not bee received so will be queued up in the channel
+        // which should be suspended
+        assertTrue(_subscription.isSuspended());
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == highMark);
+
+        //acknowledge messages so we are just above lowMark
+        _channel.acknowledgeMessage(lowMark - 1, true);
+
+        //we should still be suspended
+        assertTrue(_subscription.isSuspended());
+        assertTrue(map.size() == lowMark + 1);
+
+        //acknowledge one more message
+        _channel.acknowledgeMessage(lowMark, true);
+
+        //and suspension should be lifted
+        assertTrue(!_subscription.isSuspended());
+
+        //pubilsh more msgs so we are just below the limit
+        publishMessages(lowMark - 1);
+
+        //we should not be suspended
+        assertTrue(!_subscription.isSuspended());
+
+        //acknowledge all messages
+        _channel.acknowledgeMessage(0, true);
+        try
+        {
+            Thread.sleep(3000);
+        }
+        catch (InterruptedException e)
+        {
+            _log.error("Error: " + e, e);
+        }
+        //map will be empty
+        assertTrue(map.size() == 0);
+    }
+
+    public void testPrefetch() throws AMQException
+    {
+        _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true);
+        _channel.setPrefetchCount(5);
+
+        assertTrue(_channel.getPrefetchCount() == 5);
+
+        final int msgCount = 5;
+        publishMessages(msgCount);
+
+        // at this point we should have sent out only 5 messages with a further 5 queued
+        // up in the channel which should now be suspended
+        assertTrue(_subscription.isSuspended());
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
+        assertTrue(map.size() == 5);
+        _channel.acknowledgeMessage(5, true);
+        assertTrue(!_subscription.isSuspended());
+        try
+        {
+            Thread.sleep(3000);
+        }
+        catch (InterruptedException e)
+        {
+            _log.error("Error: " + e, e);
+        }
+        assertTrue(map.size() == 0);
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(AckTest.class);
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=536528&r1=536527&r2=536528
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Wed May  9 06:47:00 2007
@@ -30,7 +30,7 @@
 import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
 
 import javax.security.sasl.SaslServer;
 import java.util.HashMap;