You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC

svn commit: r827724 [6/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/src/main/java/org/apach...

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java Tue Oct 20 16:23:01 2009
@@ -21,74 +21,67 @@
 package org.apache.qpid.server.store;
 
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.AMQException;
+import org.apache.commons.configuration.Configuration;
 
 public interface TransactionLog
 {
-    /**
-     * Places a message onto a specified queue, in a given transactional context.
-     *
-     * @param context   The transactional context for the operation.
-     * @param queue     The queue to place the message on.
-     * @param messageId The message to enqueue.
-     * @throws org.apache.qpid.AMQException If the operation fails for any reason.
-     */
-    void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException;
-
-    /**
-     * Extracts a message from a specified queue, in a given transactional context.
-     *
-     * @param context   The transactional context for the operation.
-     * @param queue     The queue to place the message on.
-     * @param messageId The message to dequeue.
-     * @throws org.apache.qpid.AMQException If the operation fails for any reason, or if the specified message does not exist.
-     */
-    void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException;
-
-    /**
-     * Begins a transactional context.
-     *
-     * @param context The transactional context to begin.
-     *
-     * @throws org.apache.qpid.AMQException If the operation fails for any reason.
-     */
-    void beginTran(StoreContext context) throws AMQException;
-
-    /**
-     * Commits all operations performed within a given transactional context.
-     *
-     * @param context The transactional context to commit all operations for.
-     *
-     * @throws org.apache.qpid.AMQException If the operation fails for any reason.
-     */
-    void commitTran(StoreContext context) throws AMQException;
-
-    /**
-     * Commits all operations performed within a given transactional context.
-     *
-     * @param context The transactional context to commit all operations for.
-     *
-     * @throws org.apache.qpid.AMQException If the operation fails for any reason.
-     */
-    StoreFuture commitTranAsync(StoreContext context) throws AMQException;
-
-    /**
-     * Abandons all operations performed within a given transactional context.
-     *
-     * @param context The transactional context to abandon.
-     *
-     * @throws org.apache.qpid.AMQException If the operation fails for any reason.
-     */
-    void abortTran(StoreContext context) throws AMQException;
-
-    /**
-     * Tests a transactional context to see if it has been begun but not yet committed or aborted.
-     *
-     * @param context The transactional context to test.
-     *
-     * @return <tt>true</tt> if the transactional context is live, <tt>false</tt> otherwise.
-     */
-    boolean inTran(StoreContext context);
+
+    public static interface Transaction
+    {
+        /**
+         * Places a message onto a specified queue, in a given transactional context.
+         *
+         * @param queue     The queue to place the message on.
+         * @param messageId The message to enqueue.
+         * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+         */
+        void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException;
+
+        /**
+         * Extracts a message from a specified queue, in a given transactional context.
+         *
+         * @param queue     The queue to place the message on.
+         * @param messageId The message to dequeue.
+         * @throws org.apache.qpid.AMQException If the operation fails for any reason, or if the specified message does not exist.
+         */
+        void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException;
+
+
+        /**
+         * Commits all operations performed within a given transactional context.
+         *
+         * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+         */
+        void commitTran() throws AMQException;
+
+        /**
+         * Commits all operations performed within a given transactional context.
+         *
+         * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+         */
+        StoreFuture commitTranAsync() throws AMQException;
+
+        /**
+         * Abandons all operations performed within a given transactional context.
+         *
+         * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+         */
+        void abortTran() throws AMQException;
+
+
+
+    }
+
+    public void configureTransactionLog(String name,
+                      TransactionLogRecoveryHandler recoveryHandler,
+                      Configuration storeConfiguration,
+                      LogSubject logSubject) throws Exception;
+
+    Transaction newTransaction();
+
+
 
     public static interface StoreFuture
     {

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,33 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+public interface TransactionLogRecoveryHandler
+{
+    QueueEntryRecoveryHandler begin(TransactionLog log);
+
+    public static interface QueueEntryRecoveryHandler
+    {
+        void queueEntry(String queuename, long messageId);
+
+        void completeQueueEntryRecovery();
+    }
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,26 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+public interface TransactionLogResource
+{
+    public String getResourceName();
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,93 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.log4j.Logger;
+
+
+class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
+{
+    private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class);
+
+
+    private final QueueEntry _entry;
+    private final Subscription_0_10 _sub;
+
+    public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+    {
+        _entry = entry;
+        _sub = subscription_0_10;
+    }
+
+    public void onAccept()
+    {
+        final Subscription_0_10 subscription = getSubscription();
+        if(subscription != null && _entry.isAcquiredBy(_sub))
+        {
+            subscription.getSession().acknowledge(subscription, _entry);
+        }
+        else
+        {
+            _logger.warn("MessageAccept received for message which has not been acquired (likely client error)");
+        }
+
+    }
+
+    public void onRelease()
+    {
+        final Subscription_0_10 subscription = getSubscription();
+        if(subscription != null && _entry.isAcquiredBy(_sub))
+        {
+            subscription.release(_entry);
+        }
+        else
+        {
+            _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
+        }
+    }
+
+    public void onReject()
+    {
+        final Subscription_0_10 subscription = getSubscription();
+        if(subscription != null && _entry.isAcquiredBy(_sub))
+        {
+            subscription.reject(_entry);
+        }
+        else
+        {
+            _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
+        }
+
+    }
+
+    public boolean acquire()
+    {
+        return _entry.acquire(getSubscription());
+    }
+
+
+    private Subscription_0_10 getSubscription()
+    {
+        return _sub;
+    }
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.log4j.Logger;
+
+class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
+{
+    private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class);
+
+
+    private final QueueEntry _entry;
+    private Subscription_0_10 _sub;
+
+    public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+    {
+        _entry = entry;
+        _sub = subscription_0_10;
+    }
+
+    public void onAccept()
+    {
+        _logger.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
+    }
+
+    public void onRelease()
+    {
+        if(_entry.isAcquiredBy(_sub))
+        {
+            getSubscription().release(_entry);
+        }
+        else
+        {
+            _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
+        }
+    }
+
+    public void onReject()
+    {
+        if(_entry.isAcquiredBy(_sub))
+        {
+            getSubscription().reject(_entry);
+        }
+        else
+        {
+            _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
+        }
+
+    }
+
+    public boolean acquire()
+    {
+        boolean acquired = _entry.acquire(getSubscription());
+        //TODO - why acknowledge here??? seems bizarre...
+      //  getSubscription().getSession().acknowledge(getSubscription(), _entry);
+        return acquired;
+
+    }
+
+    public Subscription_0_10 getSubscription()
+    {
+        return _sub;
+    }
+
+
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java Tue Oct 20 16:23:01 2009
@@ -43,11 +43,15 @@
 
     public void onComplete(Method method)
     {
-        _session.acknowledge(_sub, _entry);
         if(_restoreCredit)
         {
             _sub.restoreCredit(_entry);
         }
-        _session.removeDispositionListener(method);        
+        if(_entry.isAcquiredBy(_sub))
+        {
+            _session.acknowledge(_sub, _entry);
+        }
+
+        _session.removeDispositionListener(method);
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Tue Oct 20 16:23:01 2009
@@ -51,6 +51,8 @@
 
     void setQueue(AMQQueue queue, boolean exclusive);
 
+    void setNoLocal(boolean noLocal);
+
     AMQShortString getConsumerTag();
 
     long getSubscriptionID();
@@ -71,7 +73,7 @@
 
     void send(QueueEntry msg) throws AMQException;
 
-    void queueDeleted(AMQQueue queue); 
+    void queueDeleted(AMQQueue queue);
 
 
     boolean wouldSuspend(QueueEntry msg);
@@ -97,5 +99,8 @@
 
     void confirmAutoClose();
 
+    public void set(String key, Object value);
+
+    public Object get(String key);
 
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Tue Oct 20 16:23:01 2009
@@ -25,6 +25,8 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -33,6 +35,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.SubscriptionActor;
@@ -42,7 +45,6 @@
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -74,6 +76,8 @@
     private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
     private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
 
+    private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+
     private final Lock _stateChangeLock;
 
     private static final AtomicLong idGenerator = new AtomicLong(0);
@@ -254,7 +258,7 @@
     private final AMQShortString _consumerTag;
 
 
-    private final boolean _noLocal;
+    private boolean _noLocal;
 
     private final FlowCreditManager _creditManager;
 
@@ -410,11 +414,7 @@
     public boolean hasInterest(QueueEntry entry)
     {
 
-        // TODO 0-10 to 0-8 conversion
-        if(!(entry.getMessage() instanceof AMQMessage))
-        {
-            return false;
-        }
+        
 
 
         //check that the message hasn't been rejected
@@ -667,5 +667,21 @@
         return !isBrowser();
     }
 
+    public void set(String key, Object value)
+    {
+        _properties.put(key, value);
+    }
+
+    public Object get(String key)
+    {
+        return _properties.get(key);
+    }
+
+
+    public void setNoLocal(boolean noLocal)
+    {
+        _noLocal = noLocal;
+    }
+
     abstract boolean isBrowser();
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Tue Oct 20 16:23:01 2009
@@ -29,14 +29,18 @@
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.SubscriptionActor;
-import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.transport.ServerSession;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQTypedValue;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.transport.*;
 
@@ -47,6 +51,9 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.nio.ByteBuffer;
 
 public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener
 {
@@ -90,6 +97,7 @@
 
     private LogSubject _logSubject;
     private LogActor _logActor;
+    private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
 
 
     public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
@@ -111,6 +119,11 @@
 
     }
 
+    public void setNoLocal(boolean noLocal)
+    {
+        _noLocal = noLocal;
+    }
+
     public AMQQueue getQueue()
     {
         return _queue;
@@ -135,7 +148,7 @@
         _queue = queue;
         _logSubject = new SubscriptionLogSubject(this);
         _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
-        
+
     }
 
     public AMQShortString getConsumerTag()
@@ -151,11 +164,7 @@
     public boolean hasInterest(QueueEntry entry)
     {
 
-        //TODO 0-8/9 to 0-10 conversion
-        if(!(entry.getMessage() instanceof MessageTransferMessage))
-        {
-            return false;
-        }
+
 
         //check that the message hasn't been rejected
         if (entry.isRejectedBy(this))
@@ -261,70 +270,164 @@
     }
 
 
+    private class AddMessageDispositionListnerAction implements Runnable
+    {
+        public MessageTransfer _xfr;
+        public ServerSession.MessageDispositionChangeListener _action;
+
+        public void run()
+        {
+            _session.onMessageDispositionChange(_xfr, _action);
+        }
+    }
+
+    private final AddMessageDispositionListnerAction _postIdSettingAction = new AddMessageDispositionListnerAction();
+
     public void send(final QueueEntry entry) throws AMQException
     {
         ServerMessage serverMsg = entry.getMessage();
 
 
-        MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
+        MessageTransfer xfr;
 
-
-        Struct[] headers;
-        if(msg.getHeader() == null)
+        if(serverMsg instanceof MessageTransferMessage)
         {
-            headers = EMPTY_STRUCT_ARRAY;
-        }
-        else
-        {
-            headers = msg.getHeader().getStructs();
-        }
 
-        ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
-        DeliveryProperties origDeliveryProps = null;
-        for(Struct header : headers)
-        {
-            if(header instanceof DeliveryProperties)
+            MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
+
+
+            Struct[] headers;
+            if(msg.getHeader() == null)
             {
-                origDeliveryProps = (DeliveryProperties) header;
+                headers = EMPTY_STRUCT_ARRAY;
             }
             else
             {
-                newHeaders.add(header);
+                headers = msg.getHeader().getStructs();
             }
-        }
 
-        DeliveryProperties deliveryProps = new DeliveryProperties();
-        if(origDeliveryProps != null)
-        {
-            if(origDeliveryProps.hasDeliveryMode())
+            ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
+            DeliveryProperties origDeliveryProps = null;
+            for(Struct header : headers)
             {
-                deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
+                if(header instanceof DeliveryProperties)
+                {
+                    origDeliveryProps = (DeliveryProperties) header;
+                }
+                else
+                {
+                    newHeaders.add(header);
+                }
             }
-            if(origDeliveryProps.hasExchange())
+
+            DeliveryProperties deliveryProps = new DeliveryProperties();
+            if(origDeliveryProps != null)
             {
-                deliveryProps.setExchange(origDeliveryProps.getExchange());
+                if(origDeliveryProps.hasDeliveryMode())
+                {
+                    deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
+                }
+                if(origDeliveryProps.hasExchange())
+                {
+                    deliveryProps.setExchange(origDeliveryProps.getExchange());
+                }
+                if(origDeliveryProps.hasExpiration())
+                {
+                    deliveryProps.setExpiration(origDeliveryProps.getExpiration());
+                }
+                if(origDeliveryProps.hasPriority())
+                {
+                    deliveryProps.setPriority(origDeliveryProps.getPriority());
+                }
+                if(origDeliveryProps.hasRoutingKey())
+                {
+                    deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
+                }
+
             }
-            if(origDeliveryProps.hasExpiration())
+
+            deliveryProps.setRedelivered(entry.isRedelivered());
+
+            newHeaders.add(deliveryProps);
+            Header header = new Header(newHeaders);
+
+            xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
+        }
+        else
+        {
+            AMQMessage message_0_8 = (AMQMessage) serverMsg;
+            DeliveryProperties deliveryProps = new DeliveryProperties();
+            MessageProperties messageProps = new MessageProperties();
+
+            int size = (int) message_0_8.getSize();
+            ByteBuffer body = ByteBuffer.allocate(size);
+            message_0_8.getContent(body, 0);
+            body.flip();
+
+            Struct[] headers = new Struct[] { deliveryProps, messageProps };
+
+            BasicContentHeaderProperties properties =
+                    (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().properties;
+            final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
+            if(exchange != null)
             {
-                deliveryProps.setExpiration(origDeliveryProps.getExpiration());
+                deliveryProps.setExchange(exchange.toString());
             }
-            if(origDeliveryProps.hasPriority())
+            deliveryProps.setExpiration(message_0_8.getExpiration());
+            deliveryProps.setImmediate(message_0_8.isImmediate());
+            deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
+            deliveryProps.setRedelivered(entry.isRedelivered());
+            deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
+            deliveryProps.setTimestamp(properties.getTimestamp());
+
+            messageProps.setContentEncoding(properties.getEncodingAsString());
+            messageProps.setContentLength(size);
+            if(properties.getAppId() != null)
             {
-                deliveryProps.setPriority(origDeliveryProps.getPriority());
+                messageProps.setAppId(properties.getAppId().getBytes());
             }
-            if(origDeliveryProps.hasRoutingKey())
+            messageProps.setContentType(properties.getContentTypeAsString());
+            if(properties.getCorrelationId() != null)
             {
-                deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
+                messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
             }
 
-        }
+            // TODO - ReplyTo
+
+            if(properties.getUserId() != null)
+            {
+                messageProps.setUserId(properties.getUserId().getBytes());
+            }
+
+            final Map<String, Object> appHeaders = new HashMap<String, Object>();
+
+            properties.getHeaders().processOverElements(
+                    new FieldTable.FieldTableElementProcessor()
+                    {
 
-        deliveryProps.setRedelivered(entry.isRedelivered());
+                        public boolean processElement(String propertyName, AMQTypedValue value)
+                        {
+                            Object val = value.getValue();
+                            if(val instanceof AMQShortString)
+                            {
+                                val = val.toString();
+                            }
+                            appHeaders.put(propertyName, val);
+                            return true;
+                        }
 
-        newHeaders.add(deliveryProps);
-        Header header = new Header(newHeaders);
+                        public Object getResult()
+                        {
+                            return appHeaders;
+                        }
+                    });
 
-        MessageTransfer xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
+
+            messageProps.setApplicationHeaders(appHeaders);
+
+            Header header = new Header(headers);
+            xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+        }
 
         if(_acceptMode == MessageAcceptMode.NONE)
         {
@@ -342,81 +445,30 @@
         }
 
 
-
-        _session.sendMessage(xfr);
-
+        _postIdSettingAction._xfr = xfr;
         if(_acceptMode == MessageAcceptMode.EXPLICIT)
         {
-            // potential race condition if incomming commands on this session can be processed on a different thread
-            // to this one (i.e. the message is only put in the map *after* it has been sent, theoretically we could get
-            // acknowledgement back before reaching the next line)
-            _session.onMessageDispositionChange(xfr, new ServerSession.MessageDispositionChangeListener()
-                                        {
-                                            public void onAccept()
-                                            {
-                                                _session.acknowledge(Subscription_0_10.this,entry);
-                                            }
-
-                                            public void onRelease()
-                                            {
-                                                release(entry);
-                                            }
-
-                                            public void onReject()
-                                            {
-                                                reject(entry);
-                                            }
-
-                                            public boolean acquire()
-                                            {
-                                                return entry.acquire(Subscription_0_10.this);
-                                            }
-            });
+            _postIdSettingAction._action = new ExplicitAcceptDispositionChangeListener(entry, this);
         }
         else
         {
-            _session.onMessageDispositionChange(xfr, new ServerSession.MessageDispositionChangeListener()
-                                        {
-                                            public void onAccept()
-                                            {
-                                                // TODO : should log error of explicit accept on non-explicit sub
-                                            }
-
-                                            public void onRelease()
-                                            {
-                                                release(entry);
-                                            }
-
-                                            public void onReject()
-                                            {
-                                                reject(entry);
-                                            }
-
-                                            public boolean acquire()
-                                            {
-                                                boolean acquired = entry.acquire(Subscription_0_10.this);
-                                                //TODO - why acknowledge here??? seems bizarre...
-                                                _session.acknowledge(Subscription_0_10.this,entry);
-                                                return acquired;
-
-                                            }
-
-            });
+            _postIdSettingAction._action = new ImplicitAcceptDispositionChangeListener(entry, this);
         }
 
+        _session.sendMessage(xfr, _postIdSettingAction);
 
     }
 
-    private void reject(QueueEntry entry)
+    void reject(QueueEntry entry)
     {
-        entry.setRedelivered(true);
+        entry.setRedelivered();
         entry.routeToAlternate();
 
     }
 
-    private void release(QueueEntry entry)
+    void release(QueueEntry entry)
     {
-        entry.setRedelivered(true);
+        entry.setRedelivered();
         entry.release();
     }
 
@@ -480,6 +532,16 @@
         //No such thing in 0-10
     }
 
+    public void set(String key, Object value)
+    {
+        _properties.put(key, value);
+    }
+
+    public Object get(String key)
+    {
+        return _properties.get(key);
+    }
+
 
     public FlowCreditManager_0_10 getCreditManager()
     {
@@ -565,8 +627,10 @@
     public void acknowledge(QueueEntry entry)
     {
         // TODO Fix Store Context / cleanup
-
-        entry.discard();
+        if(entry.isAcquiredBy(this))
+        {
+            entry.discard();
+        }
     }
 
     public void flush() throws AMQException
@@ -585,5 +649,10 @@
         return _logActor;
     }
 
+    ServerSession getSession()
+    {
+        return _session;
+    }
+
 
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Tue Oct 20 16:23:01 2009
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.transport;
 
 import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -36,7 +35,7 @@
     @Override
     protected void setState(State state)
     {
-        super.setState(state);    
+        super.setState(state);
     }
 
     @Override

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Tue Oct 20 16:23:01 2009
@@ -22,8 +22,6 @@
 
 import org.apache.qpid.transport.*;
 
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Tue Oct 20 16:23:01 2009
@@ -25,10 +25,10 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.AMQException;
 
@@ -46,7 +46,6 @@
 {
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
 
-
     public static interface MessageDispositionChangeListener
     {
         public void onAccept();
@@ -69,7 +68,7 @@
     private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
             new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
 
-    private Transaction _transaction;
+    private ServerTransaction _transaction;
 
     private Principal _principal;
 
@@ -97,10 +96,16 @@
         _reference = new WeakReference(this);
     }
 
+    @Override
+    protected boolean isFull(int id)
+    {
+        return isCommandsFull(id);
+    }
+
     public void enqueue(final ServerMessage message, final ArrayList<AMQQueue> queues)
     {
 
-            _transaction.enqueue(queues,message, new Transaction.Action()
+            _transaction.enqueue(queues,message, new ServerTransaction.Action()
             {
 
                 AMQQueue[] _queues = queues.toArray(new AMQQueue[queues.size()]);
@@ -117,6 +122,7 @@
                         {
                             // TODO
                             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                            throw new RuntimeException(e);
                         }
                     }
                 }
@@ -130,10 +136,11 @@
 
     }
 
-    
-    public void sendMessage(MessageTransfer xfr)
+
+    public void sendMessage(MessageTransfer xfr,
+                            Runnable postIdSettingAction)
     {
-        invoke(xfr);
+        invoke(xfr, postIdSettingAction);
     }
 
     public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
@@ -231,12 +238,12 @@
 
     public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
     {
-        if(!_messageDispositionListenerMap.isEmpty())
+        if(ranges != null && !_messageDispositionListenerMap.isEmpty())
         {
             Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
             Iterator<Range> rangeIter = ranges.iterator();
 
-            if(rangeIter.hasNext())
+            if(rangeIter.hasNext())                               
             {
                 Range range = rangeIter.next();
 
@@ -266,7 +273,6 @@
 
             }
 
-
         }
     }
 
@@ -287,14 +293,14 @@
         for (Task task : _taskList)
         {
             task.doTask(this);
-        }                
+        }
 
     }
 
     public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
     {
         _transaction.dequeue(entry.getQueue(), entry.getMessage(),
-                             new Transaction.Action()
+                             new ServerTransaction.Action()
                              {
 
                                  public void postCommit()
@@ -307,7 +313,6 @@
                                      entry.release();
                                  }
                              });
-
     }
 
     public Collection<Subscription_0_10> getSubscriptions()

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Tue Oct 20 16:23:01 2009
@@ -29,15 +29,19 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.flow.*;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnknownExchangeType;
 import org.apache.qpid.framing.*;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Map;
 
 public class ServerSessionDelegate extends SessionDelegate
 {
@@ -205,20 +209,19 @@
         }
 
 
-
-
-        MessageTransferMessage message = new MessageTransferMessage(xfr, ((ServerSession)ssn).getReference());
-        final MessageStore store = getVirtualHost(ssn).getMessageStore();
-
-        store.storeMessageHeader(message.getMessageNumber(),message);
-        store.storeContent(message.getMessageNumber(), 0, xfr.getBody());
-
         DeliveryProperties delvProps = null;
-        if(message.getHeader() != null && (delvProps = message.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+        if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
         {
             delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
         }
 
+        MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+        final MessageStore store = getVirtualHost(ssn).getMessageStore();
+        StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+        storeMessage.addContent(0,xfr.getBody());
+        storeMessage.flushToStore();
+        MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
+
         ArrayList<AMQQueue> queues = exchange.route(message);
 
 
@@ -267,8 +270,6 @@
 
         ssn.processed(xfr);
 
-
-        super.messageTransfer(ssn, xfr);    //To change body of overridden methods use File | Settings | File Templates.
     }
 
     @Override
@@ -397,6 +398,11 @@
                         exchange.setAlternateExchange(alternate);
                     }
 
+                    if (exchange.isDurable() && !exchange.isAutoDelete())
+                    {
+                        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+                        store.createExchange(exchange);
+                    }
 
                     exchangeRegistry.registerExchange(exchange);
                 }
@@ -407,7 +413,6 @@
                 catch (AMQException e)
                 {
                     //TODO
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                     throw new RuntimeException(e);
                 }
 
@@ -431,7 +436,7 @@
         ex.setDescription(description);
 
         session.invoke(ex);
-        //session.close();
+
     }
 
     private Exchange getExchange(Session session, String exchangeName)
@@ -487,6 +492,13 @@
                 else
                 {
                     exchangeRegistry.unregisterExchange(method.getExchange(), method.getIfUnused());
+
+                    if (exchange.isDurable() && !exchange.isAutoDelete())
+                    {
+                        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+                        store.removeExchange(exchange);
+                    }
+
                 }
             }
             catch (ExchangeInUseException e)
@@ -496,7 +508,6 @@
             catch (AMQException e)
             {
                 // TODO
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                 throw new RuntimeException(e);
             }
         }
@@ -585,6 +596,7 @@
                     if (!exchange.isBound(routingKey, fieldTable, queue))
                     {
                         queue.bind(exchange, routingKey, fieldTable);
+
                     }
                     else
                     {
@@ -607,7 +619,7 @@
     @Override
     public void exchangeUnbind(Session session, ExchangeUnbind method)
     {
-                VirtualHost virtualHost = getVirtualHost(session);
+        VirtualHost virtualHost = getVirtualHost(session);
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
@@ -643,7 +655,6 @@
                 }
                 catch (AMQException e)
                 {
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                     throw new RuntimeException(e);
                 }
             }
@@ -761,6 +772,7 @@
     {
 
         VirtualHost virtualHost = getVirtualHost(session);
+        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
 
         String queueName = method.getQueue();
 
@@ -818,10 +830,35 @@
                             queue.setAlternateExchange(alternate);
                         }
 
+                        if(method.hasArguments()  && method.getArguments() != null)
+                        {
+                            if(method.getArguments().containsKey("no-local"))
+                            {
+                                Object no_local = method.getArguments().get("no-local");
+                                if(no_local instanceof Boolean && ((Boolean)no_local))
+                                {
+                                    queue.setNoLocal(true);
+                                }
+                            }
+                        }
+
 
                         if (queue.isDurable() && !queue.isAutoDelete())
                         {
-                            //store.createQueue(queue, body.getArguments());
+                            if(method.hasArguments() && method.getArguments() != null)
+                            {
+                                Map<String,Object> args = method.getArguments();
+                                FieldTable ftArgs = new FieldTable();
+                                for(Map.Entry<String, Object> entry : args.entrySet())
+                                {
+                                    ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue());
+                                }
+                                store.createQueue(queue, ftArgs);
+                            }
+                            else
+                            {
+                                store.createQueue(queue);
+                            }
                         }
                         queueRegistry.registerQueue(queue);
                         boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister();
@@ -853,8 +890,7 @@
                                     }
                                     catch (AMQException e)
                                     {
-                                        //TODO
-                                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                                        throw new RuntimeException(e);
                                     }
                                 }
                             };
@@ -896,7 +932,6 @@
                     }
                     catch (AMQException e)
                     {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                         throw new RuntimeException(e);
                     }
                 }
@@ -948,7 +983,6 @@
                                 catch (AMQException e)
                                 {
                                     //TODO
-                                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                                     throw new RuntimeException(e);
                                 }
                             }
@@ -1018,19 +1052,19 @@
                         try
                         {
                             int purged = queue.delete();
+                            if (queue.isDurable() && !queue.isAutoDelete())
+                            {
+                                DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+                                store.removeQueue(queue);
+                            }
+
                         }
                         catch (AMQException e)
                         {
                             //TODO
-                            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                             throw new RuntimeException(e);
                         }
 
-
-                        /*    if (queue.isDurable())
-                        {
-                            store.removeQueue(queue);
-                        }*/
                     }
 
                 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Tue Oct 20 16:23:01 2009
@@ -24,14 +24,13 @@
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TransactionLog;
 import org.apache.qpid.AMQException;
 
 import java.util.List;
 import java.util.Collection;
 
-public class AutoCommitTransaction implements Transaction
+public class AutoCommitTransaction implements ServerTransaction
 {
 
     private final TransactionLog _transactionLog;
@@ -55,13 +54,11 @@
             if(message.isPersistent() && queue.isDurable())
             {
 
-                StoreContext context = new StoreContext();
-
-                _transactionLog.beginTran(context);
-                _transactionLog.dequeueMessage(context, queue, message.getMessageNumber());
+                TransactionLog.Transaction txn = _transactionLog.newTransaction();
+                txn.dequeueMessage(queue, message.getMessageNumber());
                 // store.remove enqueue
                 // store.commit
-                _transactionLog.commitTran(context);
+                txn.commitTran();
             }
             postCommitAction.postCommit();
         }
@@ -77,7 +74,7 @@
     {
         try
         {
-            StoreContext context = null;
+            TransactionLog.Transaction txn = null;
             for(QueueEntry entry : ackedMessages)
             {
                 ServerMessage message = entry.getMessage();
@@ -85,18 +82,17 @@
 
                 if(message.isPersistent() && queue.isDurable())
                 {
-                    if(context == null)
+                    if(txn == null)
                     {
-                        context = new StoreContext();
-                        _transactionLog.beginTran(context);
+                        txn = _transactionLog.newTransaction();
                     }
-                    _transactionLog.dequeueMessage(context, queue, message.getMessageNumber());
+                    txn.dequeueMessage(queue, message.getMessageNumber());
                 }
 
             }
-            if(context != null)
+            if(txn != null)
             {
-                _transactionLog.commitTran(context);
+                txn.commitTran();
             }
             postCommitAction.postCommit();
         }
@@ -115,11 +111,10 @@
         {
             if(message.isPersistent() && queue.isDurable())
             {
-                StoreContext context = new StoreContext();
 
-                _transactionLog.beginTran(context);
-                _transactionLog.enqueueMessage(context, queue, message.getMessageNumber());
-                _transactionLog.commitTran(context);
+                TransactionLog.Transaction txn = _transactionLog.newTransaction();
+                txn.enqueueMessage(queue, message.getMessageNumber());
+                txn.commitTran();
             }
             postCommitAction.postCommit();
         }
@@ -140,19 +135,16 @@
 
             if(message.isPersistent())
             {
-                StoreContext context = new StoreContext();
-
-                _transactionLog.beginTran(context);
-
+                TransactionLog.Transaction txn = _transactionLog.newTransaction();
                 Long id = message.getMessageNumber();
                 for(AMQQueue q : queues)
                 {
                     if(q.isDurable())
                     {
-                        _transactionLog.enqueueMessage(context, q, id);
+                        txn.enqueueMessage(q, id);
                     }
                 }
-                _transactionLog.commitTran(context);
+                txn.commitTran();
 
             }
             postCommitAction.postCommit();

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Tue Oct 20 16:23:01 2009
@@ -4,7 +4,6 @@
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TransactionLog;
 import org.apache.qpid.AMQException;
 
@@ -12,11 +11,11 @@
 import java.util.ArrayList;
 import java.util.Collection;
 
-public class LocalTransaction implements Transaction
+public class LocalTransaction implements ServerTransaction
 {
     private final List<Action> _postCommitActions = new ArrayList<Action>();
 
-    private volatile StoreContext _storeContext;
+    private volatile TransactionLog.Transaction _transaction;
     private TransactionLog _transactionLog;
 
     public LocalTransaction(TransactionLog transactionLog)
@@ -37,7 +36,7 @@
             {
 
                 beginTranIfNecessary();
-                _transactionLog.dequeueMessage(_storeContext, queue, message.getMessageNumber());
+                _transaction.dequeueMessage(queue, message.getMessageNumber());
 
             }
             catch(AMQException e)
@@ -60,7 +59,7 @@
                 if(message.isPersistent() && queue.isDurable())
                 {
                     beginTranIfNecessary();
-                    _transactionLog.dequeueMessage(_storeContext, queue, message.getMessageNumber());
+                    _transaction.dequeueMessage(queue, message.getMessageNumber());
                 }
 
             }
@@ -70,10 +69,10 @@
             tidyUpOnError(e);
         }
         _postCommitActions.add(postCommitAction);
-            
+
     }
 
-    private void tidyUpOnError(AMQException e)
+    private void tidyUpOnError(Exception e)
     {
         try
         {
@@ -86,13 +85,13 @@
         {
             try
             {
-                _transactionLog.abortTran(_storeContext);
+                _transaction.abortTran();
             }
-            catch (AMQException e1)
+            catch (Exception e1)
             {
                 // TODO could try to chain the information to the original error
             }
-            _storeContext = null;
+            _transaction = null;
             _postCommitActions.clear();
         }
 
@@ -101,14 +100,13 @@
 
     private void beginTranIfNecessary()
     {
-        if(_storeContext == null)
+        if(_transaction == null)
         {
-            _storeContext = new StoreContext();
             try
             {
-                _transactionLog.beginTran(_storeContext);
+                _transaction = _transactionLog.newTransaction();
             }
-            catch (AMQException e)
+            catch (Exception e)
             {
                 tidyUpOnError(e);
             }
@@ -122,9 +120,9 @@
             beginTranIfNecessary();
             try
             {
-                _transactionLog.enqueueMessage(_storeContext, queue, message.getMessageNumber());
+                _transaction.enqueueMessage(queue, message.getMessageNumber());
             }
-            catch (AMQException e)
+            catch (Exception e)
             {
                 tidyUpOnError(e);
             }
@@ -137,10 +135,10 @@
     public void enqueue(List<AMQQueue> queues, EnqueableMessage message, Action postCommitAction)
     {
 
-        
+
         if(message.isPersistent())
         {
-            if(_storeContext == null)
+            if(_transaction == null)
             {
                 for(AMQQueue queue : queues)
                 {
@@ -161,12 +159,12 @@
                 {
                     if(queue.isDurable())
                     {
-                        _transactionLog.enqueueMessage(_storeContext, queue, message.getMessageNumber());
+                        _transaction.enqueueMessage(queue, message.getMessageNumber());
                     }
                 }
 
             }
-            catch (AMQException e)
+            catch (Exception e)
             {
                 tidyUpOnError(e);
             }
@@ -180,10 +178,10 @@
     {
         try
         {
-            if(_storeContext != null)
+            if(_transaction != null)
             {
 
-                _transactionLog.commitTran(_storeContext);
+                _transaction.commitTran();
             }
 
             for(Action action : _postCommitActions)
@@ -191,7 +189,7 @@
                 action.postCommit();
             }
         }
-        catch (AMQException e)
+        catch (Exception e)
         {
             for(Action action : _postCommitActions)
             {
@@ -202,7 +200,7 @@
         }
         finally
         {
-            _storeContext = null;
+            _transaction = null;
             _postCommitActions.clear();
         }
 
@@ -214,10 +212,10 @@
         try
         {
 
-            if(_storeContext != null)
+            if(_transaction != null)
             {
 
-                _transactionLog.abortTran(_storeContext);
+                _transaction.abortTran();
             }
         }
         catch (AMQException e)
@@ -237,7 +235,7 @@
             }
             finally
             {
-                _storeContext = null;
+                _transaction = null;
                 _postCommitActions.clear();
             }
         }

Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (from r824084, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java&r1=824084&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Tue Oct 20 16:23:01 2009
@@ -30,7 +30,7 @@
 import java.util.SortedSet;
 import java.util.Collection;
 
-public interface Transaction
+public interface ServerTransaction
 {
 
     void addPostCommitAction(Action postCommitAction);

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,62 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.access.ACLManager;
+import org.apache.qpid.server.management.ManagedObject;
+
+public interface VirtualHost
+{
+    IConnectionRegistry getConnectionRegistry();
+
+    VirtualHostConfiguration getConfiguration();
+
+    String getName();
+
+    QueueRegistry getQueueRegistry();
+
+    ExchangeRegistry getExchangeRegistry();
+
+    ExchangeFactory getExchangeFactory();
+
+    MessageStore getMessageStore();
+
+    TransactionLog getTransactionLog();
+
+    DurableConfigurationStore getDurableConfigurationStore();
+
+    AuthenticationManager getAuthenticationManager();
+
+    ACLManager getAccessManager();
+
+    void close() throws Exception;
+
+    ManagedObject getManagedObject();
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,340 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.TreeMap;
+
+public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
+                                                        ConfigurationRecoveryHandler.QueueRecoveryHandler,
+                                                        ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
+                                                        ConfigurationRecoveryHandler.BindingRecoveryHandler,
+                                                        MessageStoreRecoveryHandler,
+                                                        MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
+                                                        TransactionLogRecoveryHandler,
+                                                        TransactionLogRecoveryHandler.QueueEntryRecoveryHandler
+{
+    private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
+
+
+    private final VirtualHost _virtualHost;
+
+    private MessageStoreLogSubject _logSubject;
+    private List<ProcessAction> _actions;
+
+    private MessageStore _store;
+    private TransactionLog _transactionLog;
+
+    private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
+    private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
+    private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
+
+
+
+    public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
+    {
+        _virtualHost = virtualHost;
+    }
+
+    public QueueRecoveryHandler begin(MessageStore store)
+    {
+        _logSubject = new MessageStoreLogSubject(_virtualHost,store);
+        _store = store;
+        CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1004(null, false));
+
+        return this;
+    }
+
+    public void queue(String queueName, String owner, FieldTable arguments)
+    {
+        AMQShortString queueNameShortString = new AMQShortString(queueName);
+
+        AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+
+        if (q == null)
+        {
+            q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
+                                                   arguments);
+            _virtualHost.getQueueRegistry().registerQueue(q);
+        }
+
+        CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1004(queueName, true));
+
+        //Record that we have a queue for recovery
+        _queueRecoveries.put(queueName, 0);
+    }
+
+    public ExchangeRecoveryHandler completeQueueRecovery()
+    {
+        return this;
+    }
+
+    public void exchange(String exchangeName, String type, boolean autoDelete)
+    {
+        try
+        {
+            Exchange exchange;
+            AMQShortString exchangeNameSS = new AMQShortString(exchangeName);
+            exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
+            if (exchange == null)
+            {
+                exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
+                _virtualHost.getExchangeRegistry().registerExchange(exchange);
+            }
+        }
+        catch (AMQException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public BindingRecoveryHandler completeExchangeRecovery()
+    {
+        return this;
+    }
+
+    public StoredMessageRecoveryHandler begin()
+    {
+        // TODO - log begin
+        return this;
+    }
+
+    public void message(StoredMessage message)
+    {
+        ServerMessage serverMessage;
+        switch(message.getMetaData().getType())
+        {
+            case META_DATA_0_8:
+                serverMessage = new AMQMessage(message);
+                break;
+            case META_DATA_0_10:
+                serverMessage = new MessageTransferMessage(message, null);
+                break;
+            default:
+                throw new RuntimeException("Unknown message type retreived from store " + message.getMetaData().getClass());
+        }
+
+
+        _recoveredMessages.put(message.getMessageNumber(), serverMessage);
+        _unusedMessages.put(message.getMessageNumber(), message);
+    }
+
+    public void completeMessageRecovery()
+    {
+        //TODO - log end
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log)
+    {
+        _transactionLog = log;
+        return this;
+    }
+
+    private static final class ProcessAction
+    {
+        private final AMQQueue _queue;
+        private final AMQMessage _message;
+
+        public ProcessAction(AMQQueue queue, AMQMessage message)
+        {
+            _queue = queue;
+            _message = message;
+        }
+
+        public void process()
+        {
+            try
+            {
+                _queue.enqueue(_message);
+            }
+            catch(AMQException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+
+    public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf)
+    {
+        _actions = new ArrayList<ProcessAction>();
+        try
+        {
+            QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+            Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName);
+            AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
+            if (queue == null)
+            {
+                _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
+                    + exchange.getName());
+            }
+            else
+            {
+
+
+                FieldTable argumentsFT = null;
+                if(buf != null)
+                {
+                    argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
+                }
+
+                _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName
+                    + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
+
+                queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
+
+            }
+        }
+        catch (AMQException e)
+        {
+             throw new RuntimeException(e);
+        }
+
+    }
+
+    public void completeBindingRecovery()
+    {
+        //return this;
+    }
+
+    public void complete()
+    {
+
+
+    }
+
+    public void queueEntry(final String queueName, long messageId)
+    {
+        AMQShortString queueNameShortString = new AMQShortString(queueName);
+
+        AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+
+        try
+        {
+            if(queue != null)
+            {
+                ServerMessage message = _recoveredMessages.get(messageId);
+                _unusedMessages.remove(messageId);
+
+                if(message != null)
+                {
+
+
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getName());
+                    }
+
+                    Integer count = _queueRecoveries.get(queueName);
+                    if (count == null)
+                    {
+                        count = 0;
+                    }
+
+                    queue.enqueue(message);
+
+                    _queueRecoveries.put(queueName, ++count);
+                }
+                else
+                {
+                    _logger.warn("Message id " + messageId + " referenced in log as enqueue in queue " + queue.getName() + " is unknwon, entry will be discarded");
+                    TransactionLog.Transaction txn = _transactionLog.newTransaction();
+                    txn.dequeueMessage(queue, messageId);
+                    txn.commitTranAsync();
+                }
+            }
+            else
+            {
+                _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
+                TransactionLog.Transaction txn = _transactionLog.newTransaction();
+                TransactionLogResource mockQueue =
+                        new TransactionLogResource()
+                        {
+
+                            public String getResourceName()
+                            {
+                                return queueName;
+                            }
+                        };
+                txn.dequeueMessage(mockQueue, messageId);
+                txn.commitTranAsync();
+            }
+
+        }
+        catch(AMQException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+
+
+    }
+
+    public void completeQueueEntryRecovery()
+    {
+
+        for(StoredMessage m : _unusedMessages.values())
+        {
+            _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
+            m.remove();
+        }
+
+        for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
+        {
+            CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1005(entry.getValue(), entry.getKey()));
+
+            CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1006(entry.getKey(), true));
+        }        
+
+        CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1006(null, false));
+    }
+
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org