You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/08/30 14:19:42 UTC

svn commit: r571129 [4/15] - in /incubator/qpid/trunk/qpid/java: ./ broker/ broker/bin/ broker/distribution/src/main/assembly/ broker/etc/ broker/src/main/java/org/apache/log4j/ broker/src/main/java/org/apache/qpid/configuration/ broker/src/main/java/o...

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Aug 30 05:19:31 2007
@@ -87,6 +87,10 @@
     private final Object _queueHeadLock = new Object();
     private String _processingThreadName = "";
 
+
+    /** Used by any reaping thread to purge messages */
+    private StoreContext _reapingStoreContext = new StoreContext();
+
     ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
     {
 
@@ -453,12 +457,31 @@
         //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
         while (purgeMessage(message, sub))
         {
+            // if we are purging then ensure we mark this message taken for the current subscriber
+            // the current subscriber may be null in the case of a get or a purge but this is ok.
+//            boolean alreadyTaken = message.taken(_queue, sub);
+
             //remove the already taken message or expired
             AMQMessage removed = messages.poll();
 
             assert removed == message;
 
-            _totalMessageSize.addAndGet(-message.getSize());
+            // if the message expired then the _totalMessageSize needs adjusting
+            if (message.expired(_queue))
+            {
+                _totalMessageSize.addAndGet(-message.getSize());
+
+                // Use the reapingStoreContext as any sub(if we have one) may be in a tx.
+                message.dequeue(_reapingStoreContext, _queue);
+
+                if (_log.isInfoEnabled())
+                {
+                    _log.info(debugIdentity() + " Doing clean up of the main _message queue.");
+                }
+            }
+
+            //else the clean up is not required as the message has already been taken for this queue therefore
+            // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated.
 
             if (_log.isTraceEnabled())
             {
@@ -473,7 +496,10 @@
     }
 
     /**
-     * 
+     *  This method will return true if the message is to be purged from the queue.
+     *
+     *
+     *  SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
      * @param message
      * @param sub
      * @return
@@ -493,15 +519,15 @@
         // if the message is null then don't purge as we have no messagse.
         if (message != null)
         {
+            // Check that the message hasn't expired.
+            if (message.expired(_queue))
+            {
+                return true;
+            }
+
             // if we have a subscriber perform message checks
             if (sub != null)
             {
-                // Check that the message hasn't expired.
-                if (message.expired(sub.getChannel().getStoreContext(), _queue))
-                {
-                    return true;
-                }
-
                 // if we have a queue browser(we don't purge) so check mark the message as taken
                 purge = ((!sub.isBrowser() || message.isTaken(_queue)));
             }
@@ -606,7 +632,10 @@
             {
                 if (_log.isInfoEnabled())
                 {
-                    _log.info(debugIdentity() + "We could do clean up of the main _message queue here");
+                    //fixme - we should do the clean up as the message remains on the _message queue
+                    // this is resulting in the next consumer receiving the message and then attempting to purge it
+                    //
+                    _log.info(debugIdentity() + "We should do clean up of the main _message queue here");
                 }
             }
 
@@ -617,7 +646,14 @@
         }
         catch (AMQException e)
         {
-            message.release(_queue);
+            if (message != null)
+            {
+                message.release(_queue);
+            }
+            else
+            {
+                _log.error(debugIdentity() + "Unable to release message as it is null. " + e, e);
+            }
             _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
         }
     }
@@ -696,25 +732,6 @@
 
     }
 
-//    private void sendNextMessage(Subscription sub)
-//    {
-//        if (sub.filtersMessages())
-//        {
-//            sendNextMessage(sub, sub.getPreDeliveryQueue());
-//            if (sub.isAutoClose())
-//            {
-//                if (sub.getPreDeliveryQueue().isEmpty())
-//                {
-//                    sub.close();
-//                }
-//            }
-//        }
-//        else
-//        {
-//            sendNextMessage(sub, _messages);
-//        }
-//    }
-
     public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
     {
 
@@ -723,8 +740,6 @@
         {
             _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
         }
-        // This shouldn't be done here.
-//        msg.release();
 
         //Check if we have someone to deliver the message to.
         _lock.lock();
@@ -800,7 +815,7 @@
                         if (debugEnabled)
                         {
                             _log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
-                                      "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
+                                       "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
                         }
                     }
                 }
@@ -810,7 +825,7 @@
                     if (debugEnabled)
                     {
                         _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
-                                  " Subscriber:" + System.identityHashCode(s));
+                                   " Subscriber:" + System.identityHashCode(s));
                     }
 
                     deliver(context, name, msg, deliverFirst);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Thu Aug 30 05:19:31 2007
@@ -20,13 +20,14 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 public class DefaultQueueRegistry implements QueueRegistry
 {
     private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
@@ -56,5 +57,15 @@
     public AMQQueue getQueue(AMQShortString name)
     {
         return _queueMap.get(name);
+    }
+
+    public Collection<AMQShortString> getQueueNames()
+    {
+        return _queueMap.keySet();
+    }
+
+    public Collection<AMQQueue> getQueues()
+    {
+        return _queueMap.values();
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Thu Aug 30 05:19:31 2007
@@ -80,7 +80,10 @@
 
         public boolean equals(Object o)
         {
-            if (!(o instanceof ExchangeBinding)) return false;
+            if (!(o instanceof ExchangeBinding))
+            {
+                return false;
+            }
             ExchangeBinding eb = (ExchangeBinding) o;
             return _exchange.equals(eb._exchange)
                    && _routingKey.equals(eb._routingKey)
@@ -104,16 +107,16 @@
      */
     void addBinding(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
     {
-        _bindings.add(new ExchangeBinding(routingKey, exchange, arguments ));
+        _bindings.add(new ExchangeBinding(routingKey, exchange, arguments));
     }
 
 
     public void remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
     {
-        _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments ));
+        _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments));
     }
 
-    
+
     /**
      * Deregisters this queue from any exchange it has been bound to
      */

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Thu Aug 30 05:19:31 2007
@@ -56,9 +56,7 @@
     {
     }
 
-    public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId)
-            throws
-            AMQException
+    public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
     {
         return _contentHeaderBody;
     }
@@ -68,36 +66,28 @@
         return _contentBodies.size();
     }
 
-    public long getBodySize(StoreContext context, Long messageId)
-            throws
-            AMQException
+    public long getBodySize(StoreContext context, Long messageId) throws AMQException
     {
         return getContentHeaderBody(context, messageId).bodySize;
     }
 
-    public ContentChunk getContentChunk(StoreContext context, Long messageId, int index)
-            throws
-            AMQException,
-            IllegalArgumentException
+    public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
     {
         if (index > _contentBodies.size() - 1)
         {
             throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
-                    (_contentBodies.size() - 1));
+                                               (_contentBodies.size() - 1));
         }
         return _contentBodies.get(index);
     }
 
     public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody)
-            throws
-            AMQException
+            throws AMQException
     {
         _contentBodies.add(contentBody);
     }
 
-    public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId)
-            throws
-            AMQException
+    public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
     {
         return _messagePublishInfo;
     }
@@ -113,50 +103,40 @@
         _redelivered = redelivered;
     }
 
-    public boolean isPersistent(StoreContext context, Long messageId)
-            throws
-            AMQException
+    public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
     {
         //todo remove literal values to a constant file such as AMQConstants in common
         ContentHeaderBody chb = getContentHeaderBody(context, messageId);
         return chb.properties instanceof BasicContentHeaderProperties &&
-                ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+               ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
     }
 
     /**
      * This is called when all the content has been received.
-     *
      * @param messagePublishInfo
      * @param contentHeaderBody
      * @throws AMQException
      */
     public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
                                                ContentHeaderBody contentHeaderBody)
-            throws
-            AMQException
+            throws AMQException
     {
         _messagePublishInfo = messagePublishInfo;
         _contentHeaderBody = contentHeaderBody;
         _arrivalTime = System.currentTimeMillis();
     }
 
-    public void removeMessage(StoreContext storeContext, Long messageId)
-            throws
-            AMQException
+    public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
     {
         // NO OP
     }
 
-    public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue)
-            throws
-            AMQException
+    public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
     {
         // NO OP
     }
 
-    public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue)
-            throws
-            AMQException
+    public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
     {
         // NO OP
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java Thu Aug 30 05:19:31 2007
@@ -20,14 +20,13 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.messageStore.StorableMessage;
 
 /**
  * Constructs a message handle based on the publish body, the content header and the queue to which the message
  * has been routed.
  *
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
  */
 public class MessageHandleFactory
 {
@@ -37,8 +36,9 @@
         // just hardcoded for now
         if (persistent)
         {
-          //  return new WeakReferenceMessageHandle(store);
-          return new StorableMessageHandle(store, m);                             
+            return new WeakReferenceMessageHandle(store);
+             //DTX MessageStore
+//          return new StorableMessageHandle(store, m);
         }
         else
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java Thu Aug 30 05:19:31 2007
@@ -1,18 +1,22 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
  */
 package org.apache.qpid.server.queue;
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Thu Aug 30 05:19:31 2007
@@ -1,18 +1,21 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
 package org.apache.qpid.server.queue;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java Thu Aug 30 05:19:31 2007
@@ -1,18 +1,21 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
 package org.apache.qpid.server.queue;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Thu Aug 30 05:19:31 2007
@@ -23,8 +23,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.messageStore.StorableQueue;
 
+import java.util.Collection;
 
 public interface QueueRegistry
 {
@@ -35,4 +35,9 @@
     void unregisterQueue(AMQShortString name) throws AMQException;
 
     AMQQueue getQueue(AMQShortString name);
+
+    Collection<AMQShortString> getQueueNames();
+
+    Collection<AMQQueue> getQueues();
+
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java Thu Aug 30 05:19:31 2007
@@ -17,24 +17,21 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.messageStore.StorableMessage;
-import org.apache.qpid.server.messageStore.JDBCStore;
-import org.apache.qpid.server.exception.InternalErrorException;
-import org.apache.qpid.server.exception.MessageDoesntExistException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.StoreContext;
 
 import javax.transaction.xa.Xid;
-import java.util.List;
-import java.util.LinkedList;
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * Created by Arnaud Simon
@@ -94,7 +91,8 @@
             try
             {
                 _contentHeaderBody = _messageStore.getContentHeaderBody(_message);
-            } catch (Exception e)
+            }
+            catch (Exception e)
             {
                 throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
             }
@@ -106,17 +104,17 @@
             throws
             AMQException
     {
-       if (_chunks == null )
-       {
-           if(_message.isStaged() )
-           {
-              loadChunks();
-           }
-           else
-           {
-               return 0;
-           }
-      }
+        if (_chunks == null)
+        {
+            if (_message.isStaged())
+            {
+                loadChunks();
+            }
+            else
+            {
+                return 0;
+            }
+        }
         return _chunks.size();
     }
 
@@ -144,8 +142,8 @@
             AMQException
     {
         try
-            {
-                _chunks = new LinkedList<ContentChunk>();
+        {
+            _chunks = new LinkedList<ContentChunk>();
             byte[] underlying = _messageStore.loadContent(_message, 1, 0);
             final int size = underlying.length;
             final org.apache.mina.common.ByteBuffer data =
@@ -169,7 +167,8 @@
                 }
             };
             _chunks.add(cb);
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
         }
@@ -198,8 +197,10 @@
             // read it from the store
             try
             {
+
                 _messagePublishInfo = _messageStore.getMessagePublishInfo(_message);
-            } catch (Exception e)
+            }
+            catch (Exception e)
             {
                 throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
             }
@@ -222,7 +223,7 @@
             AMQException
     {
         return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
-                ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+               ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
     }
 
     public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId,
@@ -254,7 +255,8 @@
             {
                 _messageStore.enqueue((Xid) storeContext.getPayload(), _message, queue);
             }
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new AMQException(null, "PRoblem during message enqueue", e);
         }
@@ -270,7 +272,8 @@
             {
                 _messageStore.dequeue((Xid) storeContext.getPayload(), _message, queue);
             }
-        } catch (Exception e)
+        }
+        catch (Exception e)
         {
             throw new AMQException(null, "PRoblem during message dequeue", e);
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java Thu Aug 30 05:19:31 2007
@@ -1,18 +1,21 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
 package org.apache.qpid.server.queue;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Thu Aug 30 05:19:31 2007
@@ -34,7 +34,6 @@
 import org.apache.qpid.server.store.StoreContext;
 
 /**
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
  */
 public class WeakReferenceMessageHandle implements AMQMessageHandle
 {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java Thu Aug 30 05:19:31 2007
@@ -107,7 +107,7 @@
     }
 
     private void initialisePrincipalDatabase(PrincipalDatabase principalDatabase, Configuration config, int index)
-        throws FileNotFoundException, ConfigurationException
+            throws FileNotFoundException, ConfigurationException
     {
         String baseName = _base + "(" + index + ").attributes.attribute.";
         List<String> argumentNames = config.getList(baseName + "name");
@@ -139,9 +139,9 @@
             if (method == null)
             {
                 throw new ConfigurationException("No method " + methodName + " found in class "
-                    + principalDatabase.getClass()
-                    + " hence unable to configure principal database. The method must be public and "
-                    + "have a single String argument with a void return type");
+                                                 + principalDatabase.getClass()
+                                                 + " hence unable to configure principal database. The method must be public and "
+                                                 + "have a single String argument with a void return type");
             }
 
             try
@@ -152,7 +152,7 @@
             {
                 if (ite instanceof ConfigurationException)
                 {
-                    throw (ConfigurationException) ite;
+                    throw(ConfigurationException) ite;
                 }
                 else
                 {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java Thu Aug 30 05:19:31 2007
@@ -26,8 +26,6 @@
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.exception.InternalErrorException;
-import org.apache.qpid.server.exception.InvalidXidException;
 import org.apache.log4j.Logger;
 
 import javax.transaction.xa.Xid;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Thu Aug 30 05:19:31 2007
@@ -1,18 +1,21 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
 package org.apache.qpid.server.txn;
@@ -31,7 +34,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.NoConsumersException;
-import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 
 /** @author Apache Software Foundation */
@@ -74,7 +77,7 @@
     {
         if (!_inTran)
         {
-          //  _messageStore.beginTran(_storeContext);
+            _messageStore.beginTran(_storeContext);
             _inTran = true;
         }
     }
@@ -93,10 +96,12 @@
     {
         try
         {
-            if( ! deliverFirst )
-            {
-                message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
-            }
+            //DTX removed  - deliverFirst is to do with the position on the Queue not enqueuing!!
+            // This should be done in routingComplete
+//            if( ! deliverFirst )
+//            {
+//                message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
+//            }
             queue.process(_storeContext, message, deliverFirst);
             //following check implements the functionality
             //required by the 'immediate' flag:
@@ -216,7 +221,8 @@
     {
         if (persistent)
         {
-           // _messageStore.commitTran(_storeContext);
+            //DTX removed this option.
+            _messageStore.commitTran(_storeContext);
             _inTran = false;
         }
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Thu Aug 30 05:19:31 2007
@@ -28,24 +28,144 @@
 import org.apache.qpid.server.store.StoreContext;
 
 /**
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ * TransactionalContext provides a context in which transactional operations on {@link AMQMessage}s are performed.
+ * Different levels of transactional support for the delivery of messages may be provided by different implementations
+ * of this interface.
+ *
+ * <p/>The fundamental transactional operations that can be performed on a message queue are 'enqueue' and 'dequeue'.
+ * In this interface, these have been recast as the {@link #messageFullyReceived} and {@link #acknowledgeMessage}
+ * operations. This interface essentially provides a way to make enqueueing and dequeuing transactional.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Explicitly accept a transaction start notification.
+ * <tr><td> Commit all pending operations in a transaction.
+ * <tr><td> Rollback all pending operations in a transaction.
+ * <tr><td> Deliver a message to a queue as part of a transaction.
+ * <tr><td> Redeliver a message to a queue as part of a transaction.
+ * <tr><td> Mark a message as acknowledged as part of a transaction.
+ * <tr><td> Accept notification that a message has been completely received as part of a transaction.
+ * <tr><td> Accept notification that a message has been fully processed as part of a transaction.
+ * <tr><td> Associate a message store context with this transaction context.
+ * </table>
+ *
+ * @todo The 'fullyReceived' and 'messageProcessed' events sit uncomfortably in the responsibilities of a transactional
+ *       context. They are non-transactional operations, used to trigger other side-effects. Consider moving them
+ *       somewhere else, a seperate interface for example.
+ *
+ * @todo This transactional context could be written as a wrapper extension to a Queue implementation, that provides
+ *       transactional management of the enqueue and dequeue operations, with added commit/rollback methods. Any
+ *       queue implementation could be made transactional by wrapping it as a transactional queue. This would mean
+ *       that the enqueue/dequeue operations do not need to be recast as deliver/acknowledge operations, which may be
+ *       conceptually neater.
+ *
+ * For example:
+ * <pre>
+ * public interface Transactional
+ * {
+ *    public void commit();
+ *    public void rollback();
+ * }
+ *
+ * public interface TransactionalQueue<E> extends Transactional, SizeableQueue<E>
+ * {}
+ *
+ * public class Queues
+ * {
+ *    ...
+ *    // For transactional messaging, take a transactional view onto the queue.
+ *    public static <E> TransactionalQueue<E> getTransactionalQueue(SizeableQueue<E> queue) { ... }
+ *
+ *    // For non-transactional messaging, take a non-transactional view onto the queue.
+ *    public static <E> TransactionalQueue<E> getNonTransactionalQueue(SizeableQueue<E> queue) { ... }
+ * }
+ * </pre>
  */
 public interface TransactionalContext
 {
+    /**
+     * Explicitly begins the transaction, if it has not already been started. {@link #commit} or {@link #rollback}
+     * should automatically begin the next transaction in the chain.
+     *
+     * @throws AMQException If the transaction cannot be started for any reason.
+     */
     void beginTranIfNecessary() throws AMQException;
 
+    /**
+     * Makes all pending operations on the transaction permanent and visible.
+     *
+     * @throws AMQException If the transaction cannot be committed for any reason.
+     */
     void commit() throws AMQException;
 
+    /**
+     * Erases all pending operations on the transaction.
+     *
+     * @throws AMQException If the transaction cannot be committed for any reason.
+     */
     void rollback() throws AMQException;
 
+    /**
+     * Delivers the specified message to the specified queue. A 'deliverFirst' flag may be set if the message is a
+     * redelivery, and should be placed on the front of the queue.
+     *
+     * <p/>This is an 'enqueue' operation.
+     *
+     * @param message      The message to deliver.
+     * @param queue        The queue to deliver the message to.
+     * @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt>
+     *                     for normal FIFO message ordering.
+     *
+     * @throws AMQException If the message cannot be delivered for any reason.
+     */
     void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException;
 
+    /**
+     * Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by
+     * setting the 'multiple' flag. It is also possible for the acknowledged message id to be zero, when the 'multiple'
+     * flag is set, in which case an acknowledgement up to the latest delivered message should be done.
+     *
+     * <p/>This is a 'dequeue' operation.
+     *
+     * @param deliveryTag              The id of the message to acknowledge, or zero, if using multiple acknowledgement
+     *                                 up to the latest message.
+     * @param lastDeliveryTag          The latest message delivered.
+     * @param multiple                 <tt>true</tt> if all message ids up the acknowledged one or latest delivered, are
+     *                                 to be acknowledged, <tt>false</tt> otherwise.
+     * @param unacknowledgedMessageMap The unacknowledged messages in the transaction, to remove the acknowledged message
+     *                                 from.
+     *
+     * @throws AMQException If the message cannot be acknowledged for any reason.
+     */
     void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
-                            UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
+        UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
 
+    /**
+     * Notifies the transactional context that a message has been fully received. The actual message that was received
+     * is not specified. This event may be used to trigger a process related to the receipt of the message, for example,
+     * flushing its data to disk.
+     *
+     * @param persistent <tt>true</tt> if the received message is persistent, <tt>false</tt> otherwise.
+     *
+     * @throws AMQException If the fully received event cannot be processed for any reason.
+     */
     void messageFullyReceived(boolean persistent) throws AMQException;
 
+    /**
+     * Notifies the transactional context that a message has been delivered, succesfully or otherwise. The actual
+     * message that was delivered is not specified. This event may be used to trigger a process related to the
+     * outcome of the delivery of the message, for example, cleaning up failed deliveries.
+     *
+     * @param protocolSession The protocol session of the deliverable message.
+     *
+     * @throws AMQException If the message processed event cannot be handled for any reason.
+     */
     void messageProcessed(AMQProtocolSession protocolSession) throws AMQException;
 
+    /**
+     * Gets the message store context associated with this transactional context.
+     *
+     * @return The message store context associated with this transactional context.
+     */
     StoreContext getStoreContext();
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Thu Aug 30 05:19:31 2007
@@ -57,11 +57,11 @@
         super(new MapConfiguration(new HashMap()));
     }
 
-    public void initialise()
-            throws
-            Exception
+    public void initialise() throws Exception
     {
-        _configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.MemoryMessageStore");
+        //DTX MessageStore
+//        _configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.MemoryMessageStore");
+        _configuration.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");        
         _configuration.addProperty("txn.class", "org.apache.qpid.server.txn.MemoryTransactionManager");
        // _configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.JDBCStore");
        // _configuration.addProperty("txn.class", "org.apache.qpid.server.txn.JDBCTransactionManager");

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=571129&r1=571128&r2=571129&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Aug 30 05:19:31 2007
@@ -1,290 +1,289 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessManagerImpl;
-import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.messageStore.MessageStore;
-
-public class VirtualHost implements Accessable
-{
-    private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
-
-    private final String _name;
-
-    private QueueRegistry _queueRegistry;
-
-    private ExchangeRegistry _exchangeRegistry;
-
-    private ExchangeFactory _exchangeFactory;
-
-    private MessageStore _messageStore;
-
-    private TransactionManager _transactionManager;
-
-    protected VirtualHostMBean _virtualHostMBean;
-
-    private AMQBrokerManagerMBean _brokerMBean;
-
-    private AuthenticationManager _authenticationManager;
-
-    private AccessManager _accessManager;
-
-
-    public void setAccessableName(String name)
-    {
-        _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
-                     + name + ") ignored remains :" + getAccessableName());
-    }
-
-    public String getAccessableName()
-    {
-        return _name;
-    }
-
-
-    /**
-     * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
-     * implementaion of an Exchange MBean should extend this class.
-     */
-    public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
-    {
-        public VirtualHostMBean() throws NotCompliantMBeanException
-        {
-            super(ManagedVirtualHost.class, "VirtualHost");
-        }
-
-        public String getObjectInstanceName()
-        {
-            return _name.toString();
-        }
-
-        public String getName()
-        {
-            return _name.toString();
-        }
-
-        public VirtualHost getVirtualHost()
-        {
-            return VirtualHost.this;
-        }
-
-
-    } // End of MBean class
-
-    /**
-     * Used for testing only
-     *
-     * @param name
-     * @param store
-     *
-     * @throws Exception
-     */
-    public VirtualHost(String name, MessageStore store) throws Exception
-    {
-        this(name, null, store);
-    }
-
-    /**
-     * Normal Constructor
-     *
-     * @param name
-     * @param hostConfig
-     *
-     * @throws Exception
-     */
-    public VirtualHost(String name, Configuration hostConfig) throws Exception
-    {
-        this(name, hostConfig, null);
-    }
-
-    private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
-    {
-        _name = name;
-
-        _virtualHostMBean = new VirtualHostMBean();
-        // This isn't needed to be registered
-        //_virtualHostMBean.register();
-
-        _queueRegistry = new DefaultQueueRegistry(this);
-        _exchangeFactory = new DefaultExchangeFactory(this);
-        _exchangeFactory.initialise(hostConfig);
-        _exchangeRegistry = new DefaultExchangeRegistry(this);
-
-        if (store != null)
-        {
-            _messageStore = store;
-        }
-        else
-        {
-            if (hostConfig == null)
-            {
-                throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
-            }
-            initialiseTransactionManager(hostConfig);
-            initialiseMessageStore(hostConfig);
-        }
-
-        _exchangeRegistry.initialise();
-
-
-        _logger.warn("VirtualHost authentication Managers require spec change to be operational.");
-        _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
-
-        _accessManager = new AccessManagerImpl(name, hostConfig);
-
-        _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
-        _brokerMBean.register();
-    }
-
-    private void initialiseMessageStore(Configuration config) throws Exception
-    {
-        String messageStoreClass = config.getString("store.class");
-
-        Class clazz = Class.forName(messageStoreClass);
-        Object o = clazz.newInstance();
-
-        if (!(o instanceof MessageStore))
-        {
-            throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
-                                         " does not.");
-        }
-        _messageStore = (MessageStore) o;
-        _messageStore.configure(this, _transactionManager, "store", config);
-    }
-
-    private void initialiseTransactionManager(Configuration config) throws Exception
-    {
-        String transactionManagerClass = config.getString("txn.class");
-        Class clazz = Class.forName(transactionManagerClass);
-        Object o = clazz.newInstance();
-
-        if (!(o instanceof TransactionManager))
-        {
-            throw new ClassCastException("Transaction Manager class must implement " + TransactionManager.class + ". Class " + clazz +
-                                         " does not.");
-        }
-        _transactionManager = (TransactionManager) o;
-    }
-
-
-    public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
-    {
-        T instance;
-        try
-        {
-            instance = instanceType.newInstance();
-        }
-        catch (Exception e)
-        {
-            _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
-            throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
-        }
-        Configurator.configure(instance);
-
-        return instance;
-    }
-
-
-    public String getName()
-    {
-        return _name;
-    }
-
-    public QueueRegistry getQueueRegistry()
-    {
-        return _queueRegistry;
-    }
-
-    public ExchangeRegistry getExchangeRegistry()
-    {
-        return _exchangeRegistry;
-    }
-
-    public ExchangeFactory getExchangeFactory()
-    {
-        return _exchangeFactory;
-    }
-
-    public ApplicationRegistry getApplicationRegistry()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public MessageStore getMessageStore()
-    {
-        return _messageStore;
-    }
-
-    public TransactionManager getTransactionManager()
-    {
-        return _transactionManager;
-    }
-
-    public AuthenticationManager getAuthenticationManager()
-    {
-        return _authenticationManager;
-    }
-
-    public AccessManager getAccessManager()
-    {
-        return _accessManager;
-    }
-
-    public void close() throws Exception
-    {
-        if (_messageStore != null)
-        {
-            _messageStore.close();
-        }
-    }
-
-    public ManagedObject getBrokerMBean()
-    {
-        return _brokerMBean;
-    }
-
-    public ManagedObject getManagedObject()
-    {
-        return _virtualHostMBean;
-    }
-}
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import javax.management.NotCompliantMBeanException;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.security.access.AccessManager;
+import org.apache.qpid.server.security.access.AccessManagerImpl;
+import org.apache.qpid.server.security.access.Accessable;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+
+public class VirtualHost implements Accessable
+{
+    private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+    private final String _name;
+
+    private QueueRegistry _queueRegistry;
+
+    private ExchangeRegistry _exchangeRegistry;
+
+    private ExchangeFactory _exchangeFactory;
+
+    private MessageStore _messageStore;
+
+    private TransactionManager _transactionManager;
+
+    protected VirtualHostMBean _virtualHostMBean;
+
+    private AMQBrokerManagerMBean _brokerMBean;
+
+    private AuthenticationManager _authenticationManager;
+
+    private AccessManager _accessManager;
+
+
+    public void setAccessableName(String name)
+    {
+        _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
+                     + name + ") ignored remains :" + getAccessableName());
+    }
+
+    public String getAccessableName()
+    {
+        return _name;
+    }
+
+
+    /**
+     * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
+     * implementaion of an Exchange MBean should extend this class.
+     */
+    public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+    {
+        public VirtualHostMBean() throws NotCompliantMBeanException
+        {
+            super(ManagedVirtualHost.class, "VirtualHost");
+        }
+
+        public String getObjectInstanceName()
+        {
+            return _name.toString();
+        }
+
+        public String getName()
+        {
+            return _name.toString();
+        }
+
+        public VirtualHost getVirtualHost()
+        {
+            return VirtualHost.this;
+        }
+
+
+    } // End of MBean class
+
+    /**
+     * Used for testing only
+     * @param name
+     * @param store
+     *
+     * @throws Exception
+     */
+    public VirtualHost(String name, MessageStore store) throws Exception
+    {
+        this(name, null, store);
+    }
+
+    /**
+     * Normal Constructor
+     *
+     * @param name
+     * @param hostConfig
+     *
+     * @throws Exception
+     */
+    public VirtualHost(String name, Configuration hostConfig) throws Exception
+    {
+        this(name, hostConfig, null);
+    }
+
+    private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
+    {
+        _name = name;
+
+        _virtualHostMBean = new VirtualHostMBean();
+        // This isn't needed to be registered
+        //_virtualHostMBean.register();
+
+        _queueRegistry = new DefaultQueueRegistry(this);
+        _exchangeFactory = new DefaultExchangeFactory(this);
+        _exchangeFactory.initialise(hostConfig);
+        _exchangeRegistry = new DefaultExchangeRegistry(this);
+
+        if (store != null)
+        {
+            _messageStore = store;
+        }
+        else
+        {
+            if (hostConfig == null)
+            {
+                throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
+            }
+            initialiseTransactionManager(hostConfig);
+            initialiseMessageStore(hostConfig);
+        }
+
+        _exchangeRegistry.initialise();
+
+        _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
+
+        _accessManager = new AccessManagerImpl(name, hostConfig);
+
+        _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+        _brokerMBean.register();
+    }
+
+    private void initialiseMessageStore(Configuration config) throws Exception
+    {
+        String messageStoreClass = config.getString("store.class");
+
+        Class clazz = Class.forName(messageStoreClass);
+        Object o = clazz.newInstance();
+
+        if (!(o instanceof MessageStore))
+        {
+            throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+                                         " does not.");
+        }
+        _messageStore = (MessageStore) o;
+          //DTX MessageStore
+//        _messageStore.configure(this, _transactionManager, "store", config);
+        _messageStore.configure(this, "store", config);
+    }
+
+    private void initialiseTransactionManager(Configuration config) throws Exception
+    {
+        String transactionManagerClass = config.getString("txn.class");
+        Class clazz = Class.forName(transactionManagerClass);
+        Object o = clazz.newInstance();
+
+        if (!(o instanceof TransactionManager))
+        {
+            throw new ClassCastException("Transaction Manager class must implement " + TransactionManager.class + ". Class " + clazz +
+                                         " does not.");
+        }
+        _transactionManager = (TransactionManager) o;
+    }
+
+
+    public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+    {
+        T instance;
+        try
+        {
+            instance = instanceType.newInstance();
+        }
+        catch (Exception e)
+        {
+            _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+            throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
+        }
+        Configurator.configure(instance);
+
+        return instance;
+    }
+
+
+    public String getName()
+    {
+        return _name;
+    }
+
+    public QueueRegistry getQueueRegistry()
+    {
+        return _queueRegistry;
+    }
+
+    public ExchangeRegistry getExchangeRegistry()
+    {
+        return _exchangeRegistry;
+    }
+
+    public ExchangeFactory getExchangeFactory()
+    {
+        return _exchangeFactory;
+    }
+
+    public ApplicationRegistry getApplicationRegistry()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public MessageStore getMessageStore()
+    {
+        return _messageStore;
+    }
+
+    public TransactionManager getTransactionManager()
+    {
+        return _transactionManager;
+    }
+
+    public AuthenticationManager getAuthenticationManager()
+    {
+        return _authenticationManager;
+    }
+
+    public AccessManager getAccessManager()
+    {
+        return _accessManager;
+    }
+
+    public void close() throws Exception
+    {
+        if (_messageStore != null)
+        {
+            _messageStore.close();
+        }
+    }
+
+    public ManagedObject getBrokerMBean()
+    {
+        return _brokerMBean;
+    }
+
+    public ManagedObject getManagedObject()
+    {
+        return _virtualHostMBean;
+    }
+}
+

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java?rev=571129&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java Thu Aug 30 05:19:31 2007
@@ -0,0 +1,652 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.configuration.Configuration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.tools.messagestore.commands.Clear;
+import org.apache.qpid.tools.messagestore.commands.Command;
+import org.apache.qpid.tools.messagestore.commands.Copy;
+import org.apache.qpid.tools.messagestore.commands.Dump;
+import org.apache.qpid.tools.messagestore.commands.Help;
+import org.apache.qpid.tools.messagestore.commands.List;
+import org.apache.qpid.tools.messagestore.commands.Load;
+import org.apache.qpid.tools.messagestore.commands.Quit;
+import org.apache.qpid.tools.messagestore.commands.Select;
+import org.apache.qpid.tools.messagestore.commands.Show;
+import org.apache.qpid.tools.messagestore.commands.Move;
+import org.apache.qpid.tools.messagestore.commands.Purge;
+import org.apache.qpid.tools.utils.CommandParser;
+import org.apache.qpid.tools.utils.Console;
+import org.apache.qpid.tools.utils.SimpleCommandParser;
+import org.apache.qpid.tools.utils.SimpleConsole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+/**
+ * MessageStoreTool.
+ */
+public class MessageStoreTool
+{
+    /** Text outputted at the start of each console.*/
+    private static final String BOILER_PLATE = "MessageStoreTool - for examining Persistent Qpid Broker MessageStore instances";
+
+    /** I/O Wrapper. */
+    protected Console _console;
+
+    /** Batch mode flag. */
+    protected boolean _batchMode;
+
+    /** Internal State object. */
+    private State _state = new State();
+
+    private HashMap<String, Command> _commands = new HashMap<String, Command>();
+
+    /** SLF4J Logger. */
+    private static Logger _devlog = LoggerFactory.getLogger(MessageStoreTool.class);
+
+    /** Loaded configuration file. */
+    private Configuration _config;
+
+    /** Control used for main run loop. */
+    private boolean _running = true;
+    private boolean _initialised = false;
+
+    //---------------------------------------------------------------------------------------------------/
+
+    public static void main(String[] args) throws Configuration.InitException
+    {
+
+        MessageStoreTool tool = new MessageStoreTool(args);
+
+        tool.start();
+    }
+
+
+    public MessageStoreTool(String[] args) throws Configuration.InitException
+    {
+        this(args, System.in, System.out);
+    }
+
+    public MessageStoreTool(String[] args, InputStream in, OutputStream out) throws Configuration.InitException
+    {
+        BufferedReader consoleReader = new BufferedReader(new InputStreamReader(in));
+        BufferedWriter consoleWriter = new BufferedWriter(new OutputStreamWriter(out));
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(this)));
+        _batchMode = false;
+
+        _console = new SimpleConsole(consoleWriter, consoleReader);
+
+        _config = new Configuration();
+
+        setOptions();
+        _config.processCommandline(args);
+    }
+
+
+    private void setOptions()
+    {
+        Option help = new Option("h", "help", false, "print this message");
+        Option version = new Option("v", "version", false, "print the version information and exit");
+        Option configFile =
+                OptionBuilder.withArgName("file").hasArg()
+                        .withDescription("use given configuration file By "
+                                         + "default looks for a file named "
+                                         + Configuration.DEFAULT_CONFIG_FILE + " in " + Configuration.QPID_HOME)
+                        .withLongOpt("config")
+                        .create("c");
+
+        _config.setOption(help);
+        _config.setOption(version);
+        _config.setOption(configFile);
+    }
+
+    public State getState()
+    {
+        return _state;
+    }
+
+    public Map<String, Command> getCommands()
+    {
+        return _commands;
+    }
+
+    public void setConfigurationFile(String configfile) throws Configuration.InitException
+    {
+        _config.loadConfig(new File(configfile));
+        setup();
+    }
+
+    public Console getConsole()
+    {
+        return _console;
+    }
+
+    public void setConsole(Console console)
+    {
+        _console = console;
+    }
+
+    /**
+     * Simple ShutdownHook to cleanly shutdown the databases
+     */
+    class ShutdownHook implements Runnable
+    {
+        MessageStoreTool _tool;
+
+        ShutdownHook(MessageStoreTool messageStoreTool)
+        {
+            _tool = messageStoreTool;
+        }
+
+        public void run()
+        {
+            _tool.quit();
+        }
+    }
+
+    public void quit()
+    {
+        _running = false;
+
+        if (_initialised)
+        {
+            ApplicationRegistry.remove(1);
+        }
+
+        _console.println("...exiting");
+
+        _console.close();
+    }
+
+    public void setBatchMode(boolean batchmode)
+    {
+        _batchMode = batchmode;
+    }
+
+    /**
+     * Main loop
+     */
+    protected void start()
+    {
+        setup();
+
+        if (!_initialised)
+        {
+            System.exit(1);
+        }
+
+        _console.println("");
+
+        _console.println(BOILER_PLATE);        
+
+        runCLI();
+    }
+
+    private void setup()
+    {
+        loadDefaultVirtualHosts();
+
+        loadCommands();
+
+        _state.clearAll();
+    }
+
+    private void loadCommands()
+    {
+        _commands.clear();
+        //todo Dynamically load the classes that exis in com.redhat.etp.qpid.commands
+        _commands.put("close", new Clear(this));
+        _commands.put("copy", new Copy(this));
+        _commands.put("dump", new Dump(this));
+        _commands.put("help", new Help(this));
+        _commands.put("list", new List(this));
+        _commands.put("load", new Load(this));
+        _commands.put("move", new Move(this));
+        _commands.put("purge", new Purge(this));
+        _commands.put("quit", new Quit(this));
+        _commands.put("select", new Select(this));
+        _commands.put("show", new Show(this));
+    }
+
+    private void loadDefaultVirtualHosts()
+    {
+        final File configFile = _config.getConfigFile();
+
+        loadVirtualHosts(configFile);
+    }
+
+    private void loadVirtualHosts(File configFile)
+    {
+
+        if (!configFile.exists())
+        {
+            _devlog.error("Config file not found:" + configFile.getAbsolutePath());
+            return;
+        }
+        else
+        {
+            _devlog.debug("using config file :" + configFile.getAbsolutePath());
+        }
+
+        try
+        {
+            ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile);
+
+            ApplicationRegistry.remove(1);
+
+            ApplicationRegistry.initialise(registry);
+
+            checkMessageStores();
+            _initialised = true;
+        }
+        catch (ConfigurationException e)
+        {
+            _console.println("Unable to load configuration due to configuration error: " + e.getMessage());
+            e.printStackTrace();
+        }
+        catch (Exception e)
+        {
+            _console.println("Unable to load configuration due to: " + e.getMessage());
+            e.printStackTrace();
+        }
+
+
+    }
+
+    private void checkMessageStores()
+    {
+        Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts();
+
+        boolean warning = false;
+        for (VirtualHost vhost : vhosts)
+        {
+            if (vhost.getMessageStore() instanceof MemoryMessageStore)
+            {
+                _console.println("WARNING: Virtualhost '" + vhost.getName() + "' is using a MemoryMessageStore. "
+                                 + "Changes will not persist.");
+                warning = true;
+            }
+        }
+
+        if (warning)
+        {
+            _console.println("");
+            _console.println("Please ensure you are using the correct config file currently using '"
+                             + _config.getConfigFile().getAbsolutePath() + "'");
+            _console.println("New config file can be specifed by 'load <config file>' or -c on the commandline.");
+            _console.println("");
+        }
+    }
+
+    private void runCLI()
+    {
+        while (_running)
+        {
+            if (!_batchMode)
+            {
+                printPrompt();
+            }
+
+            String[] args = _console.readCommand();
+
+            while (args != null)
+            {
+                exec(args);
+
+                if (_running)
+                {
+                    if (!_batchMode)
+                    {
+                        printPrompt();
+                    }
+
+                    args = _console.readCommand();
+                }
+            }
+        }
+    }
+
+    private void printPrompt()
+    {
+        _console.print(prompt());
+    }
+
+
+    /**
+     * Execute a script (batch mode).
+     *
+     * @param script The file script
+     */
+    protected void runScripts(String script)
+    {
+        //Store Current State
+        boolean oldBatch = _batchMode;
+        CommandParser oldParser = _console.getCommandParser();
+        setBatchMode(true);
+
+        try
+        {
+            _devlog.debug("Running script '" + script + "'");
+
+            _console.setCommandParser(new SimpleCommandParser(new BufferedReader(new FileReader(script))));
+
+            start();
+        }
+        catch (java.io.FileNotFoundException e)
+        {
+            _devlog.error("Script not found: '" + script + "' due to:" + e.getMessage());
+        }
+
+        //Restore previous state
+        _console.setCommandParser(oldParser);
+        setBatchMode(oldBatch);
+    }
+
+    public String prompt()
+    {
+        String state = _state.toString();
+        if (state != null && state.length() != 0)
+        {
+            return state + ":bdb$ ";
+        }
+        else
+        {
+            return "bdb$ ";
+        }
+    }
+
+    /**
+     * Execute the command.
+     *
+     * @param args [command, arg0, arg1...].
+     */
+    protected void exec(String[] args)
+    {
+        // Comment lines start with a #
+        if (args.length == 0 || args[0].startsWith("#"))
+        {
+            return;
+        }
+
+        final String command = args[0];
+
+        Command cmd = _commands.get(command);
+
+        if (cmd == null)
+        {
+            _console.println("Command not understood: " + command);
+        }
+        else
+        {
+            cmd.execute(args);
+        }
+    }
+
+
+    /**
+     * Displays usage info.
+     */
+    protected static void help()
+    {
+        System.out.println(BOILER_PLATE);
+        System.out.println("Usage: java " + MessageStoreTool.class + " [Options]");
+        System.out.println("       [-c <broker config file>] : Defaults to \"$QPID_HOME/etc/config.xml\"");
+    }
+
+
+    /**
+     * This class is used to store the current state of the tool.
+     *
+     * This is then interrogated by the various commands to augment their behaviour.
+     *
+     *
+     */
+    public class State
+    {
+        private VirtualHost _vhost = null;
+        private AMQQueue _queue = null;
+        private Exchange _exchange = null;
+        private java.util.List<Long> _msgids = null;
+
+        public State()
+        {
+        }
+
+        public void setQueue(AMQQueue queue)
+        {
+            _queue = queue;
+        }
+
+        public AMQQueue getQueue()
+        {
+            return _queue;
+        }
+
+        public void setVhost(VirtualHost vhost)
+        {
+            _vhost = vhost;
+        }
+
+        public VirtualHost getVhost()
+        {
+            return _vhost;
+        }
+
+        public Exchange getExchange()
+        {
+            return _exchange;
+        }
+
+        public void setExchange(Exchange exchange)
+        {
+            _exchange = exchange;
+        }
+
+        public String toString()
+        {
+            StringBuilder status = new StringBuilder();
+
+            if (_vhost != null)
+            {
+                status.append(_vhost.getName());
+
+                if (_exchange != null)
+                {
+                    status.append("[");
+                    status.append(_exchange.getName());
+                    status.append("]");
+
+                    if (_queue != null)
+                    {
+                        status.append("->'");
+                        status.append(_queue.getName());
+                        status.append("'");
+
+                        if (_msgids != null)
+                        {
+                            status.append(printMessages());
+                        }
+                    }
+                }
+            }
+
+            return status.toString();
+        }
+
+
+        public String printMessages()
+        {
+            StringBuilder sb = new StringBuilder();
+
+            Long previous = null;
+
+            Long start = null;
+            for (Long id : _msgids)
+            {
+                if (previous != null)
+                {
+                    if (id == previous + 1)
+                    {
+                        if (start == null)
+                        {
+                            start = previous;
+                        }
+                    }
+                    else
+                    {
+                        if (start != null)
+                        {
+                            sb.append(",");
+                            sb.append(start);
+                            sb.append("-");
+                            sb.append(id);
+                            start = null;
+                        }
+                        else
+                        {
+                            sb.append(",");
+                            sb.append(previous);
+                        }
+                    }
+                }
+
+                previous = id;
+            }
+
+            if (start != null)
+            {
+                sb.append(",");
+                sb.append(start);
+                sb.append("-");
+                sb.append(_msgids.get(_msgids.size() - 1));
+            }
+            else
+            {
+                sb.append(",");
+                sb.append(previous);
+            }
+
+            // surround list in ()
+            sb.replace(0, 1, "(");
+            sb.append(")");
+            return sb.toString();
+        }
+
+        public void clearAll()
+        {
+            _vhost = null;
+            clearExchange();
+        }
+
+        public void clearExchange()
+        {
+            _exchange = null;
+            clearQueue();
+        }
+
+        public void clearQueue()
+        {
+            _queue = null;
+            clearMessages();
+        }
+
+        public void clearMessages()
+        {
+            _msgids = null;
+        }
+
+        /**
+         * A common location to provide parsing of the message id string
+         * utilised by a number of the commands.
+         * The String is comma separated list of ids that can be individual ids
+         * or a range (4-10)
+         *
+         * @param msgString string of msg ids to parse 1,2,4-10
+         */
+        public void setMessages(String msgString)
+        {
+            StringTokenizer tok = new StringTokenizer(msgString, ",");
+
+            if (tok.hasMoreTokens())
+            {
+                _msgids = new LinkedList<Long>();
+            }
+
+            while (tok.hasMoreTokens())
+            {
+                String next = tok.nextToken();
+                if (next.contains("-"))
+                {
+                    Long start = Long.parseLong(next.substring(0, next.indexOf("-")));
+                    Long end = Long.parseLong(next.substring(next.indexOf("-") + 1));
+
+                    if (end >= start)
+                    {
+                        for (long l = start; l <= end; l++)
+                        {
+                            _msgids.add(l);
+                        }
+                    }
+                }
+                else
+                {
+                    _msgids.add(Long.parseLong(next));
+                }
+            }
+
+        }
+
+        public void setMessages(java.util.List<Long> msgids)
+        {
+            _msgids = msgids;
+        }
+
+        public java.util.List<Long> getMessages()
+        {
+            return _msgids;
+        }
+    }//Class State
+
+}//Class MessageStoreTool

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java?rev=571129&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java Thu Aug 30 05:19:31 2007
@@ -0,0 +1,66 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+public abstract class AbstractCommand implements Command
+{
+    protected Console _console;
+    protected MessageStoreTool _tool;
+
+    public AbstractCommand(MessageStoreTool tool)
+    {
+        _console = tool.getConsole();
+        _tool = tool;
+    }
+
+    public void setOutput(Console out)
+    {
+        _console = out;
+    }
+
+    protected void commandError(String message, String[] args)
+    {
+        _console.print(getCommand() + " : " + message);
+
+        if (args != null)
+        {
+            for (int i = 1; i < args.length; i++)
+            {
+                _console.print(args[i]);
+            }
+        }
+        _console.println("");
+        _console.println(help());
+    }
+
+
+    public abstract String help();
+
+    public abstract String usage();
+
+    public abstract String getCommand();
+
+
+    public abstract void execute(String... args);
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date