You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/08/07 17:37:36 UTC

svn commit: r683632 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid...

Author: ritchiem
Date: Thu Aug  7 08:37:36 2008
New Revision: 683632

URL: http://svn.apache.org/viewvc?rev=683632&view=rev
Log:
QPID-1195 , QPID-1193 Initial changes to allow bind and queue arguments to be stored and recovered from the MessageStore. Created a test to validate that the stored values can be recovered. DerbyStore hasn't fully been implemented. Surrounding work has been done and tested with BDBMessageStore.

Added:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Thu Aug  7 08:37:36 2008
@@ -180,7 +180,7 @@
                                                        null);
             if (queue.isDurable() && !queue.isAutoDelete())
             {
-                _messageStore.createQueue(queue);
+                _messageStore.createQueue(queue, null);
             }
 
             Configuration virtualHostDefaultQueueConfiguration =

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Thu Aug  7 08:37:36 2008
@@ -197,7 +197,7 @@
 
                 if (queue.isDurable())
                 {
-                    messageStore.createQueue(queue);
+                    messageStore.createQueue(queue, null);
                 }
 
                 queueRegistry.registerQueue(queue);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Thu Aug  7 08:37:36 2008
@@ -166,11 +166,19 @@
         assert routingKey != null;
         if (!_index.add(routingKey, queue))
         {
-            _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Queue (" + queue.getName() + ")" + queue + " is already registered with routing key " + routingKey);
+            }
         }
         else
         {
-            _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this);
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Binding queue(" + queue.getName() + ") " + queue + " with routing key " + routingKey
+                              + (args == null ? "" : " and arguments " + args.toString())
+                              + " to exchange " + this);
+            }
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Thu Aug  7 08:37:36 2008
@@ -117,7 +117,7 @@
                     queue = createQueue(queueName, body, virtualHost, session);
                     if (queue.isDurable() && !queue.isAutoDelete())
                     {
-                        store.createQueue(queue);
+                        store.createQueue(queue, body.getArguments());
                     }
                     queueRegistry.registerQueue(queue);
                     if (autoRegister)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Thu Aug  7 08:37:36 2008
@@ -39,6 +39,11 @@
         super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
     }
 
+    public int getPriorities()
+    {
+        return ((PriorityQueueList) _entries).getPriorities();
+    }
+
     @Override
     protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
     {
@@ -63,5 +68,4 @@
         }
     }
 
-    
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Aug  7 08:37:36 2008
@@ -52,6 +52,7 @@
 
     void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
 
+    List<ExchangeBinding> getExchangeBindings();
 
 
     void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Thu Aug  7 08:37:36 2008
@@ -28,7 +28,7 @@
 
 public class AMQQueueFactory
 {
-    private static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+    public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
 
     public static AMQQueue createAMQQueueImpl(AMQShortString name,
                                               boolean durable,

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java?rev=683632&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java Thu Aug  7 08:37:36 2008
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+
+public class ExchangeBinding
+{
+    private final Exchange _exchange;
+    private final AMQShortString _routingKey;
+    private final FieldTable _arguments;
+
+    private static final FieldTable EMPTY_ARGUMENTS = new FieldTable();    
+
+    ExchangeBinding(AMQShortString routingKey, Exchange exchange)
+    {
+        this(routingKey, exchange, EMPTY_ARGUMENTS);
+    }
+
+    ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
+    {
+        _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey;
+        _exchange = exchange;
+        _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
+    }
+
+    void unbind(AMQQueue queue) throws AMQException
+    {
+        _exchange.deregisterQueue(_routingKey, queue, _arguments);
+    }
+
+    public Exchange getExchange()
+    {
+        return _exchange;
+    }
+
+    public AMQShortString getRoutingKey()
+    {
+        return _routingKey;
+    }
+
+    public FieldTable getArguments()
+    {
+        return _arguments;
+    }
+
+    public int hashCode()
+    {
+        return (_exchange == null ? 0 : _exchange.hashCode())
+               + (_routingKey == null ? 0 : _routingKey.hashCode());
+    }
+
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof ExchangeBinding))
+        {
+            return false;
+        }
+        ExchangeBinding eb = (ExchangeBinding) o;
+        return _exchange.equals(eb._exchange)
+               && _routingKey.equals(eb._routingKey);
+    }
+}
\ No newline at end of file

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Thu Aug  7 08:37:36 2008
@@ -36,59 +36,6 @@
  */
 class ExchangeBindings
 {
-    private static final FieldTable EMPTY_ARGUMENTS = new FieldTable();
-
-    static class ExchangeBinding
-    {
-        private final Exchange _exchange;
-        private final AMQShortString _routingKey;
-        private final FieldTable _arguments;
-
-        ExchangeBinding(AMQShortString routingKey, Exchange exchange)
-        {
-            this(routingKey, exchange, EMPTY_ARGUMENTS);
-        }
-
-        ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
-        {
-            _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey;
-            _exchange = exchange;
-            _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
-        }
-
-        void unbind(AMQQueue queue) throws AMQException
-        {
-            _exchange.deregisterQueue(_routingKey, queue, _arguments);
-        }
-
-        public Exchange getExchange()
-        {
-            return _exchange;
-        }
-
-        public AMQShortString getRoutingKey()
-        {
-            return _routingKey;
-        }
-
-        public int hashCode()
-        {
-            return (_exchange == null ? 0 : _exchange.hashCode())
-                   + (_routingKey == null ? 0 : _routingKey.hashCode());
-        }
-
-        public boolean equals(Object o)
-        {
-            if (!(o instanceof ExchangeBinding))
-            {
-                return false;
-            }
-            ExchangeBinding eb = (ExchangeBinding) o;
-            return _exchange.equals(eb._exchange)
-                   && _routingKey.equals(eb._routingKey);
-        }
-    }
-
     private final List<ExchangeBinding> _bindings = new CopyOnWriteArrayList<ExchangeBinding>();
     private final AMQQueue _queue;
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Thu Aug  7 08:37:36 2008
@@ -42,6 +42,11 @@
         }
     }
 
+    public int getPriorities()
+    {
+        return _priorities;
+    }
+
     public AMQQueue getQueue()
     {
         return _queue;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Aug  7 08:37:36 2008
@@ -82,7 +82,7 @@
 
     private volatile Subscription _exclusiveSubscriber;
 
-    private final QueueEntryList _entries;
+    protected final QueueEntryList _entries;
 
     private final AMQQueueMBean _managedObject;
     private final Executor _asyncDelivery;
@@ -223,6 +223,11 @@
         }
     }
 
+    public List<ExchangeBinding> getExchangeBindings()
+    {
+        return new ArrayList<ExchangeBinding>(_bindings.getExchangeBindings());
+    }
+
     // ------ Manage Subscriptions
 
     public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Thu Aug  7 08:37:36 2008
@@ -728,7 +728,7 @@
 
     }
 
-    public void createQueue(AMQQueue queue) throws AMQException
+    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
     {
         _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
 
@@ -1281,6 +1281,11 @@
 
     }
 
+    public boolean isPersistent()
+    {
+        return true;
+    }
+
     private void checkNotClosed() throws MessageStoreClosedException
     {
         if (_closed.get())

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Thu Aug  7 08:37:36 2008
@@ -121,7 +121,7 @@
 
     }
 
-    public void createQueue(AMQQueue queue) throws AMQException
+    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
     {
         // Not required to do anything
     }
@@ -213,7 +213,12 @@
         return bodyList.get(index);
     }
 
-     private void checkNotClosed() throws MessageStoreClosedException
+    public boolean isPersistent()
+    {
+        return false;
+    }
+
+    private void checkNotClosed() throws MessageStoreClosedException
      {
         if (_closed.get())
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Thu Aug  7 08:37:36 2008
@@ -131,9 +131,10 @@
      *
      * @param queue The queue to store.
      *
+     * @param arguments The additional arguments to the binding
      * @throws AMQException If the operation fails for any reason.
      */
-    void createQueue(AMQQueue queue) throws AMQException;
+    void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException;
 
     /**
      * Removes the specified queue from the persistent store.
@@ -255,4 +256,12 @@
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
     ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+
+    /**
+     * Is this store capable of persisting the data
+     * 
+     * @return true if this store is capable of persisting data
+     */
+    boolean isPersistent();
+
 }

Added: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=683632&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Thu Aug  7 08:37:36 2008
@@ -0,0 +1,632 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeType;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.ExchangeBinding;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * This tests the MessageStores by using the available interfaces.
+ *
+ * This test validates that Exchanges, Queues, Bindings and Messages are persisted correctly.
+ */
+public class MessageStoreTest extends TestCase
+{
+
+    private static final int DEFAULT_PRIORTY_LEVEL = 5;
+    private static final Logger _logger = LoggerFactory.getLogger(MessageStoreTest.class);
+
+    public void testMemoryMessageStore()
+    {
+
+        PropertiesConfiguration config = new PropertiesConfiguration();
+
+        config.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");
+
+        runTestWithStore(config);
+    }
+
+    public void DISABLE_testDerbyMessageStore()
+    {
+        PropertiesConfiguration config = new PropertiesConfiguration();
+
+        config.addProperty("store.environment-path", "derbyDB_MST");
+        config.addProperty("store.class", "org.apache.qpid.server.store.DerbyMessageStore");
+
+        runTestWithStore(config);
+    }
+
+    private void reload(Configuration configuration)
+    {
+        if (_virtualHost != null)
+        {
+            try
+            {
+                _virtualHost.close();
+            }
+            catch (Exception e)
+            {
+                fail(e.getMessage());
+            }
+        }
+
+        try
+        {
+            _virtualHost = new VirtualHost(virtualHostName, configuration, null);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    VirtualHost _virtualHost = null;
+    String virtualHostName = "MessageStoreTest";
+
+    AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
+    AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
+    AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange");
+    AMQShortString queueOwner = new AMQShortString("MST");
+
+    AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
+    AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
+    AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue");
+    AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue");
+
+    AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
+    AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
+    AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
+    AMQShortString queueName = new AMQShortString("MST-Queue");
+
+    AMQShortString directRouting = new AMQShortString("MST-direct");
+    AMQShortString topicRouting = new AMQShortString("MST-topic");
+
+    protected void runTestWithStore(Configuration configuration)
+    {
+        //Ensure Environment Path is empty
+        cleanup(configuration);
+
+        //Load the Virtualhost with the required MessageStore
+        reload(configuration);
+
+        MessageStore messageStore = _virtualHost.getMessageStore();
+
+        createAllQueues();
+        createAllTopicQueues();
+
+        //Register Non-Durable DirectExchange
+        Exchange nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false);
+        bindAllQueuesToExchange(nonDurableExchange, directRouting);
+
+        //Register DirectExchange
+        Exchange directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true);
+        bindAllQueuesToExchange(directExchange, directRouting);
+
+        //Register TopicExchange
+        Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
+        bindAllTopicQueuesToExchange(topicExchange, topicRouting);
+
+        //Send Message To NonDurable direct Exchange = persistent        
+        sendMessageOnExchange(nonDurableExchange, directRouting, true);
+        // and non-persistent
+        sendMessageOnExchange(nonDurableExchange, directRouting, false);
+
+        //Send Message To direct Exchange = persistent
+        sendMessageOnExchange(directExchange, directRouting, true);
+        // and non-persistent
+        sendMessageOnExchange(directExchange, directRouting, false);
+
+        //Send Message To topic Exchange = persistent
+        sendMessageOnExchange(topicExchange, topicRouting, true);
+        // and non-persistent
+        sendMessageOnExchange(topicExchange, topicRouting, false);
+
+        //Ensure all the Queues have four messages (one transient, one persistent) x 2 exchange routings
+        validateMessageOnQueues(4, true);
+        //Ensure all the topics have two messages (one transient, one persistent)
+        validateMessageOnTopics(2, true);
+
+        assertEquals("Not all queues correctly registered", 8, _virtualHost.getQueueRegistry().getQueues().size());
+
+        if (!messageStore.isPersistent())
+        {
+            _logger.warn("Unable to test Persistent capabilities of messages store(" + messageStore.getClass() + ") as it is not capable of peristence.");
+            return;
+        }
+
+        //Reload the Virtualhost to test persistence
+        _logger.info("Reloading Virtualhost");
+
+        VirtualHost original = _virtualHost;
+
+        reload(configuration);
+
+        assertTrue("Virtualhost has not been reloaded", original != _virtualHost);
+
+        validateExchanges();
+
+        //Validate Durable Queues still have the persistentn message
+        validateMessageOnQueues(2, false);
+        //Validate Durable Queues still have the persistentn  message
+        validateMessageOnTopics(1, false);
+
+        //Validate Properties of Binding
+        validateBindingProperties();
+
+        //Validate Properties of Queues
+        validateQueueProperties();
+
+        //Validate Non-Durable Queues are gone.
+        assertNull("Non-Durable queue still registered:" + priorityQueueName, _virtualHost.getQueueRegistry().getQueue(priorityQueueName));
+        assertNull("Non-Durable queue still registered:" + queueName, _virtualHost.getQueueRegistry().getQueue(queueName));
+        assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, _virtualHost.getQueueRegistry().getQueue(priorityTopicQueueName));
+        assertNull("Non-Durable queue still registered:" + topicQueueName, _virtualHost.getQueueRegistry().getQueue(topicQueueName));
+
+        assertEquals("Not all queues correctly registered", 4, _virtualHost.getQueueRegistry().getQueues().size());
+    }
+
+    private void validateExchanges()
+    {
+        ExchangeRegistry registry = _virtualHost.getExchangeRegistry();
+
+        assertTrue(directExchangeName + " exchange NOT reloaded after failover",
+                   registry.getExchangeNames().contains(directExchangeName));
+        assertTrue(topicExchangeName + " exchange NOT reloaded after failover",
+                   registry.getExchangeNames().contains(topicExchangeName));
+        assertTrue(nonDurableExchangeName + " exchange reloaded after failover",
+                   !registry.getExchangeNames().contains(nonDurableExchangeName));
+
+        // There are 5 required exchanges + our 2 durable queues
+        assertEquals("Incorrect number of exchanges available", 5 + 2, registry.getExchangeNames().size());
+    }
+
+    /** Validates that the Durable queues */
+    private void validateBindingProperties()
+    {
+        QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+        validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getExchangeBindings(), false);
+        validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getExchangeBindings(), true);
+        validateBindingProperties(queueRegistry.getQueue(durableQueueName).getExchangeBindings(), false);
+        validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getExchangeBindings(), true);
+    }
+
+    /**
+     * Validate that each queue is bound once.
+     *
+     * @param bindings     the set of bindings to validate
+     * @param useSelectors if set validate that the binding has a JMS_SELECTOR argument
+     */
+    private void validateBindingProperties(List<ExchangeBinding> bindings, boolean useSelectors)
+    {
+        assertEquals("Each queue should only be bound once.", 1, bindings.size());
+
+        ExchangeBinding binding = bindings.get(0);
+
+        if (useSelectors)
+        {
+            assertTrue("Binding does not contain a Selector argument.",
+                       binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()));
+        }
+    }
+
+    private void validateQueueProperties()
+    {
+        QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+        validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true);
+        validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true);
+        validateQueueProperties(queueRegistry.getQueue(durableQueueName), false);
+        validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false);
+
+    }
+
+    private void validateQueueProperties(AMQQueue queue, boolean usePriority)
+    {
+        if (usePriority)
+        {
+            assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass());
+            assertEquals("Priority Queue does not have set priorities", DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities());
+        }
+        else
+        {
+            assertEquals("Queue is no longer a Priority Queue", SimpleAMQQueue.class, queue.getClass());
+        }
+    }
+
+    /**
+     * Delete the Store Environment path
+     *
+     * @param configuration The configuration that contains the store environment path.
+     */
+    private void cleanup(Configuration configuration)
+    {
+
+        String environment = configuration.getString("store.environment-path");
+
+        if (environment != null)
+        {
+            File environmentPath = new File(environment);
+
+            if (environmentPath.exists())
+            {
+                deleteDirectory(environmentPath);
+            }
+        }
+    }
+
+    private void deleteDirectory(File path)
+    {
+        if (path.isDirectory())
+        {
+            for (File file : path.listFiles())
+            {
+                deleteDirectory(file);
+            }
+        }
+        else
+        {
+            path.delete();
+        }
+    }
+
+    private void sendMessageOnExchange(Exchange directExchange, AMQShortString routingKey, boolean deliveryMode)
+    {
+        //Set MessagePersustebce
+        BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+        properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue());
+        FieldTable headers = properties.getHeaders();
+        headers.setString("Test", "MST");
+        properties.setHeaders(headers);
+
+        MessagePublishInfo messageInfo = new TestMessagePublishInfo(directExchange, false, false, routingKey);
+
+        IncomingMessage currentMessage = null;
+
+        try
+        {
+            currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(),
+                                                 messageInfo,
+                                                 new NonTransactionalContext(_virtualHost.getMessageStore(),
+                                                                             new StoreContext(), null, null),
+                                                 new InternalTestProtocolSession());
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+
+        currentMessage.setMessageStore(_virtualHost.getMessageStore());
+        currentMessage.setExchange(directExchange);
+
+        ContentHeaderBody headerBody = new ContentHeaderBody();
+        headerBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+        headerBody.bodySize = 0;
+
+        headerBody.properties = properties;
+
+        try
+        {
+            currentMessage.setContentHeaderBody(headerBody);
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+
+        currentMessage.setExpiration();
+
+        try
+        {
+            currentMessage.route();
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+
+        try
+        {
+            currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory());
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+
+        // check and deliver if header says body length is zero
+        if (currentMessage.allContentReceived())
+        {
+            try
+            {
+                currentMessage.deliverToQueues();
+            }
+            catch (AMQException e)
+            {
+                fail(e.getMessage());
+            }
+        }
+    }
+
+    private void createAllQueues()
+    {
+        //Register Durable Priority Queue
+        createQueue(durablePriorityQueueName, true, true);
+
+        //Register Durable Simple Queue
+        createQueue(durableQueueName, false, true);
+
+        //Register NON-Durable Priority Queue
+        createQueue(priorityQueueName, true, false);
+
+        //Register NON-Durable Simple Queue
+        createQueue(queueName, false, false);
+    }
+
+    private void createAllTopicQueues()
+    {
+        //Register Durable Priority Queue
+        createQueue(durablePriorityTopicQueueName, true, true);
+
+        //Register Durable Simple Queue
+        createQueue(durableTopicQueueName, false, true);
+
+        //Register NON-Durable Priority Queue
+        createQueue(priorityTopicQueueName, true, false);
+
+        //Register NON-Durable Simple Queue
+        createQueue(topicQueueName, false, false);
+    }
+
+    private Exchange createExchange(ExchangeType type, AMQShortString name, boolean durable)
+    {
+        Exchange exchange = null;
+
+        try
+        {
+            exchange = type.newInstance(_virtualHost, name, durable, 0, false);
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+
+        try
+        {
+            _virtualHost.getExchangeRegistry().registerExchange(exchange);
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+        return exchange;
+    }
+
+    private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable)
+    {
+
+        FieldTable queueArguments = null;
+
+        if (usePriority)
+        {
+            queueArguments = new FieldTable();
+            queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+        }
+
+        AMQQueue queue = null;
+
+        //Ideally we would be able to use the QueueDeclareHandler here.
+        try
+        {
+            queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, _virtualHost,
+                                                       queueArguments);
+
+            validateQueueProperties(queue, usePriority);
+
+            if (queue.isDurable() && !queue.isAutoDelete())
+            {
+                _virtualHost.getMessageStore().createQueue(queue, queueArguments);
+            }
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+
+        try
+        {
+            _virtualHost.getQueueRegistry().registerQueue(queue);
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+
+    }
+
+    private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey)
+    {
+        FieldTable queueArguments = new FieldTable();
+        queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+
+        QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+        bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityQueueName), false, queueArguments);
+        bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null);
+        bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments);
+        bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null);
+    }
+
+    private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey)
+    {
+        FieldTable queueArguments = new FieldTable();
+        queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+
+        QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+        bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityTopicQueueName), true, queueArguments);
+        bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableTopicQueueName), true, null);
+        bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityTopicQueueName), true, queueArguments);
+        bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(topicQueueName), true, null);
+    }
+
+
+    protected void bindQueueToExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments)
+    {
+        try
+        {
+            exchange.registerQueue(queueName, queue, queueArguments);
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+
+        FieldTable bindArguments = null;
+
+        if (useSelector)
+        {
+            bindArguments = new FieldTable();
+            bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'");
+        }
+
+        try
+        {
+            queue.bind(exchange, routingKey, bindArguments);
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+    private void validateMessage(long messageCount, boolean allQueues)
+    {
+        validateMessageOnTopics(messageCount, allQueues);
+        validateMessageOnQueues(messageCount, allQueues);
+    }
+
+    private void validateMessageOnTopics(long messageCount, boolean allQueues)
+    {
+        validateMessageOnQueue(durablePriorityTopicQueueName, messageCount);
+        validateMessageOnQueue(durableTopicQueueName, messageCount);
+
+        if (allQueues)
+        {
+            validateMessageOnQueue(priorityTopicQueueName, messageCount);
+            validateMessageOnQueue(topicQueueName, messageCount);
+        }
+    }
+
+    private void validateMessageOnQueues(long messageCount, boolean allQueues)
+    {
+        validateMessageOnQueue(durablePriorityQueueName, messageCount);
+        validateMessageOnQueue(durableQueueName, messageCount);
+
+        if (allQueues)
+        {
+            validateMessageOnQueue(priorityQueueName, messageCount);
+            validateMessageOnQueue(queueName, messageCount);
+        }
+    }
+
+    private void validateMessageOnQueue(AMQShortString queueName, long messageCount)
+    {
+        AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueName);
+
+        assertNotNull("Queue(" + queueName + ") not correctly registered:", queue);
+
+        assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getMessageCount());
+    }
+
+    private class TestMessagePublishInfo implements MessagePublishInfo
+    {
+
+        Exchange _exchange;
+        boolean _immediate;
+        boolean _mandatory;
+        AMQShortString _routingKey;
+
+        TestMessagePublishInfo(Exchange exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
+        {
+            _exchange = exchange;
+            _immediate = immediate;
+            _mandatory = mandatory;
+            _routingKey = routingKey;
+        }
+
+        public AMQShortString getExchange()
+        {
+            return _exchange.getName();
+        }
+
+        public void setExchange(AMQShortString exchange)
+        {
+            //no-op
+        }
+
+        public boolean isImmediate()
+        {
+            return _immediate;
+        }
+
+        public boolean isMandatory()
+        {
+            return _mandatory;
+        }
+
+        public AMQShortString getRoutingKey()
+        {
+            return _routingKey;
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=683632&r1=683631&r2=683632&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Thu Aug  7 08:37:36 2008
@@ -78,7 +78,7 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void createQueue(AMQQueue queue) throws AMQException
+    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
     {
     }        
 
@@ -129,6 +129,11 @@
         return null;
     }
 
+    public boolean isPersistent()
+    {
+        return false;
+    }
+
     public void removeQueue(final AMQQueue queue) throws AMQException
     {