You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC

svn commit: r1225178 [4/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/ bdbstore/src/test/ bdbstore/src/test/jav...

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Wed Dec 28 13:02:41 2011
@@ -18,10 +18,7 @@
  */
 package org.apache.qpid.server.security.access;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.qpid.framing.AMQShortString;
@@ -35,7 +32,7 @@ import org.apache.qpid.server.queue.AMQQ
  * {@link #equals(Object)} and {@link #hashCode()} are intended for use in maps. This is due to the wildcard matching
  * described above.
  */
-public class ObjectProperties extends HashMap<ObjectProperties.Property, String>
+public class ObjectProperties
 {
     /** serialVersionUID */
     private static final long serialVersionUID = -1356019341374170495L;
@@ -93,7 +90,9 @@ public class ObjectProperties extends Ha
 			return properties;
 		}
     }
-	
+
+    private final EnumMap<Property, String> _properties = new EnumMap<Property, String>(Property.class);
+
 	public static List<String> getAllPropertyNames()
     {
 		List<String> properties = new ArrayList<String>();		
@@ -113,7 +112,7 @@ public class ObjectProperties extends Ha
     {
         super();
         
-        putAll(copy);
+        _properties.putAll(copy._properties);
     }
     
     public ObjectProperties(String name)
@@ -231,7 +230,7 @@ public class ObjectProperties extends Ha
 	public List<String> getPropertyNames()
     {
 		List<String> properties = new ArrayList<String>();		
-		for (Property property : keySet())
+		for (Property property : _properties.keySet())
 		{
 			properties.add(property.getName());
 		}
@@ -240,17 +239,22 @@ public class ObjectProperties extends Ha
     
     public Boolean isSet(Property key)
     {
-        return containsKey(key) && Boolean.valueOf(get(key));
+        return _properties.containsKey(key) && Boolean.valueOf(_properties.get(key));
     }
-    
+
+    public String get(Property key)
+    {
+        return _properties.get(key);
+    }
+
     public String getName()
     {
-        return get(Property.NAME);
+        return _properties.get(Property.NAME);
     }
     
     public void setName(String name)
     {
-        put(Property.NAME, name);
+        _properties.put(Property.NAME, name);
     }
     
     public void setName(AMQShortString name)
@@ -262,39 +266,38 @@ public class ObjectProperties extends Ha
     {
         return put(key, value == null ? "" : value.asString());
     }
-    
-    @Override
+
     public String put(Property key, String value)
     {
-        return super.put(key, value == null ? "" : value.trim());
+        return _properties.put(key, value == null ? "" : value.trim());
     }
     
     public void put(Property key, Boolean value)
     {
         if (value != null)
         {
-            super.put(key, Boolean.toString(value));
+            _properties.put(key, Boolean.toString(value));
         }
     }
     
     public boolean matches(ObjectProperties properties)
     {
-        if (properties.keySet().isEmpty())
+        if (properties._properties.keySet().isEmpty())
         {
             return true;
         }
         
-        if (!keySet().containsAll(properties.keySet()))
+        if (!_properties.keySet().containsAll(properties._properties.keySet()))
         {
             return false;
         }
         
-        for (Map.Entry<Property,String> entry : properties.entrySet())
+        for (Map.Entry<Property,String> entry : properties._properties.entrySet())
         {
             Property key = entry.getKey();
             String ruleValue = entry.getValue();
             
-            String thisValue = get(key);
+            String thisValue = _properties.get(key);
 
             if (!valueMatches(thisValue, ruleValue)) 
             {
@@ -315,4 +318,29 @@ public class ObjectProperties extends Ha
                         && thisValue.length() > ruleValue.length()
                         && thisValue.startsWith(ruleValue.substring(0, ruleValue.length() - 2)));
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ObjectProperties that = (ObjectProperties) o;
+
+        if (_properties != null ? !_properties.equals(that._properties) : that._properties != null) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return _properties != null ? _properties.hashCode() : 0;
+    }
+
+    @Override
+    public String toString()
+    {
+        return _properties.toString();
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Wed Dec 28 13:02:41 2011
@@ -52,6 +52,8 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
 import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
 /**
@@ -60,7 +62,7 @@ import org.apache.qpid.server.queue.AMQQ
  * 
  * TODO extract the SQL statements into a generic JDBC store
  */
-public class DerbyMessageStore implements MessageStore
+public class DerbyMessageStore implements MessageStore, DurableConfigurationStore
 {
 
     private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -197,12 +199,16 @@ public class DerbyMessageStore implement
                           Configuration storeConfiguration,
                           LogSubject logSubject) throws Exception
     {
-        CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
-
         if(!_configured)
         {
 
             _logSubject = logSubject;
+        }
+
+        CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+
+        if(!_configured)
+        {
 
             commonConfiguration(name, storeConfiguration, logSubject);
             _configured = true;
@@ -219,6 +225,11 @@ public class DerbyMessageStore implement
                           Configuration storeConfiguration,
                           LogSubject logSubject) throws Exception
     {
+
+        if(!_configured)
+        {
+            _logSubject = logSubject;
+        }
         CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
 
         if(!_configured)
@@ -697,7 +708,7 @@ public class DerbyMessageStore implement
 
                     if (results == 0)
                     {
-                        throw new RuntimeException("Message metadata not found for message id " + messageId);
+                        _logger.warn("Message metadata not found for message id " + messageId);
                     }
 
                     if (_logger.isDebugEnabled())
@@ -1678,14 +1689,26 @@ public class DerbyMessageStore implement
             }
         }
 
-        public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+        public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
-            DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId);
+            if(message.getStoredMessage() instanceof StoredDerbyMessage)
+            {
+                try
+                {
+                    ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection());
+                }
+                catch (SQLException e)
+                {
+                    throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
+                }
+            }
+
+            DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
         }
 
-        public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+        public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
-            DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId);
+            DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
 
         }
 
@@ -1709,8 +1732,11 @@ public class DerbyMessageStore implement
     {
 
         private final long _messageId;
+        private StorableMessageMetaData _metaData;
         private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
-        private Connection _conn;
+        private byte[] _data;
+        private volatile SoftReference<byte[]> _dataRef;
+        
 
         StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
         {
@@ -1721,27 +1747,19 @@ public class DerbyMessageStore implement
         StoredDerbyMessage(long messageId,
                            StorableMessageMetaData metaData, boolean persist)
         {
-            try
-            {
-                _messageId = messageId;
+            _messageId = messageId;
+            
 
-                _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
-                if(persist)
-                {
-                    _conn = newConnection();
-                    storeMetaData(_conn, messageId, metaData);
-                }
-            }
-            catch (SQLException e)
+            _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+            if(persist)
             {
-                throw new RuntimeException(e);
+                _metaData = metaData;    
             }
-
         }
 
         public StorableMessageMetaData getMetaData()
         {
-            StorableMessageMetaData metaData = _metaDataRef.get();
+            StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
             if(metaData == null)
             {
                 try
@@ -1765,27 +1783,62 @@ public class DerbyMessageStore implement
 
         public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
         {
-            DerbyMessageStore.this.addContent(_conn, _messageId, offsetInMessage, src);
+            src = src.slice();
+
+            if(_data == null)
+            {
+                _data = new byte[src.remaining()];
+                _dataRef = new SoftReference<byte[]>(_data);
+                src.duplicate().get(_data);
+            }
+            else
+            {
+                byte[] oldData = _data;
+                _data = new byte[oldData.length + src.remaining()];
+                _dataRef = new SoftReference<byte[]>(_data);
+
+                System.arraycopy(oldData,0,_data,0,oldData.length);
+                src.duplicate().get(_data, oldData.length, src.remaining());
+            }
+            
         }
 
         public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
         {
-            return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+            byte[] data = _dataRef == null ? null : _dataRef.get();
+            if(data != null)
+            {
+                int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+                dst.put(data, offsetInMessage, length);
+                return length;
+            }
+            else
+            {
+                return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+            }
+        }
+
+
+        public ByteBuffer getContent(int offsetInMessage, int size)
+        {
+            ByteBuffer buf = ByteBuffer.allocate(size);
+            getContent(offsetInMessage, buf);
+            buf.position(0);
+            return  buf;
         }
 
-        public StoreFuture flushToStore()
+        public synchronized StoreFuture flushToStore()
         {
             try
             {
-                if(_conn != null)
+                if(_metaData != null)
                 {
-                    if(_logger.isDebugEnabled())
-                    {
-                        _logger.debug("Flushing message " + _messageId + " to store");
-                    }
+                    Connection conn = newConnection();
+
+                    store(conn);
                     
-                    _conn.commit();
-                    _conn.close();
+                    conn.commit();
+                    conn.close();
                 }
             }
             catch (SQLException e)
@@ -1796,16 +1849,34 @@ public class DerbyMessageStore implement
                 }
                 throw new RuntimeException(e);
             }
-            finally
+            return IMMEDIATE_FUTURE;
+        }
+
+        private synchronized void store(final Connection conn) throws SQLException
+        {
+            if(_metaData != null)
             {
-                _conn = null;
+                try
+                {
+                    storeMetaData(conn, _messageId, _metaData);
+                    DerbyMessageStore.this.addContent(conn, _messageId, 0,
+                                                      _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+                }
+                finally
+                {
+                    _metaData = null;
+                    _data = null;
+                }
+            }
+
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Storing message " + _messageId + " to store");
             }
-            return IMMEDIATE_FUTURE;
         }
 
         public void remove()
         {
-            flushToStore();
             DerbyMessageStore.this.removeMessage(_messageId);
         }
     }
@@ -1839,4 +1910,5 @@ public class DerbyMessageStore implement
             }
         }
     }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Wed Dec 28 13:02:41 2011
@@ -35,10 +35,12 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
 import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
 /** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore
+public class MemoryMessageStore implements MessageStore, DurableConfigurationStore
 {
     private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
 
@@ -53,11 +55,11 @@ public class MemoryMessageStore implemen
 
     private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
     {
-        public void enqueueMessage(TransactionLogResource  queue, Long messageId) throws AMQStoreException
+        public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
         }
 
-        public void dequeueMessage(TransactionLogResource  queue, Long messageId) throws AMQStoreException
+        public void dequeueMessage(TransactionLogResource  queue, EnqueableMessage message) throws AMQStoreException
         {
         }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Wed Dec 28 13:02:41 2011
@@ -20,14 +20,16 @@
  */
 package org.apache.qpid.server.store;
 
+import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.message.EnqueableMessage;
 
 /**
  * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
  *
  */
-public interface MessageStore extends DurableConfigurationStore, TransactionLog
+public interface MessageStore
 {
     StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
         {
@@ -77,4 +79,69 @@ public interface MessageStore extends Du
     boolean isPersistent();
 
 
+
+    public static interface Transaction
+    {
+        /**
+         * Places a message onto a specified queue, in a given transactional context.
+         *
+         *
+         *
+         * @param queue     The queue to place the message on.
+         * @param message
+         * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
+         */
+        void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+        /**
+         * Extracts a message from a specified queue, in a given transactional context.
+         *
+         * @param queue     The queue to place the message on.
+         * @param message The message to dequeue.
+         * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+         */
+        void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+
+        /**
+         * Commits all operations performed within a given transactional context.
+         *
+         * @throws AMQStoreException If the operation fails for any reason.
+         */
+        void commitTran() throws AMQStoreException;
+
+        /**
+         * Commits all operations performed within a given transactional context.
+         *
+         * @throws AMQStoreException If the operation fails for any reason.
+         */
+        StoreFuture commitTranAsync() throws AMQStoreException;
+
+        /**
+         * Abandons all operations performed within a given transactional context.
+         *
+         * @throws AMQStoreException If the operation fails for any reason.
+         */
+        void abortTran() throws AMQStoreException;
+
+
+
+    }
+
+    public void configureTransactionLog(String name,
+                      TransactionLogRecoveryHandler recoveryHandler,
+                      Configuration storeConfiguration,
+                      LogSubject logSubject) throws Exception;
+
+    Transaction newTransaction();
+
+
+
+    public static interface StoreFuture
+    {
+        boolean isComplete();
+
+        void waitForCompletion();
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Wed Dec 28 13:02:41 2011
@@ -26,15 +26,13 @@ import java.nio.ByteBuffer;
 public class StoredMemoryMessage implements StoredMessage
 {
     private final long _messageNumber;
-    private final ByteBuffer _content;
+    private ByteBuffer _content;
     private final StorableMessageMetaData _metaData;
 
     public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData)
     {
         _messageNumber = messageNumber;
         _metaData = metaData;
-        _content = ByteBuffer.allocate(metaData.getContentSize());
-
     }
 
     public long getMessageNumber()
@@ -44,26 +42,79 @@ public class StoredMemoryMessage impleme
 
     public void addContent(int offsetInMessage, ByteBuffer src)
     {
-        src = src.duplicate();
-        ByteBuffer dst = _content.duplicate();
-        dst.position(offsetInMessage);
-        dst.put(src);
+        if(_content == null)
+        {
+            if(offsetInMessage == 0)
+            {
+                _content = src.slice();
+            }
+            else
+            {
+                final int contentSize = _metaData.getContentSize();
+                int size = (contentSize < offsetInMessage + src.remaining())
+                        ? offsetInMessage + src.remaining()
+                        : contentSize;
+                _content = ByteBuffer.allocate(size);
+                addContent(offsetInMessage, src);
+            }
+        }
+        else
+        {
+            if(_content.limit() >= offsetInMessage + src.remaining())
+            {
+                _content.position(offsetInMessage);
+                _content.put(src);
+                _content.position(0);
+            }
+            else
+            {
+                final int contentSize = _metaData.getContentSize();
+                int size = (contentSize < offsetInMessage + src.remaining())
+                        ? offsetInMessage + src.remaining()
+                        : contentSize;
+                ByteBuffer oldContent = _content;
+                _content = ByteBuffer.allocate(size);
+                _content.put(oldContent);
+                _content.position(0);
+                addContent(offsetInMessage, src);
+            }
+
+        }
     }
 
     public int getContent(int offset, ByteBuffer dst)
     {
         ByteBuffer src = _content.duplicate();
-        src.position(offset);
-        src = src.slice();
-        if(dst.remaining() < src.limit())
+
+        int oldPosition = src.position();
+
+        src.position(oldPosition + offset);
+
+        int length = dst.remaining() < src.remaining() ? dst.remaining() : src.remaining();
+        src.limit(oldPosition + length);
+
+        dst.put(src);
+
+
+        return length;
+    }
+
+
+    public ByteBuffer getContent(int offsetInMessage, int size)
+    {
+        ByteBuffer buf = _content.duplicate();
+
+        if(offsetInMessage != 0)
         {
-            src.limit(dst.remaining());
+            buf.position(offsetInMessage);
+            buf = buf.slice();
         }
-        dst.put(src);
-        return src.limit();
+
+        buf.limit(size);
+        return buf;
     }
 
-    public TransactionLog.StoreFuture flushToStore()
+    public MessageStore.StoreFuture flushToStore()
     {
         return MessageStore.IMMEDIATE_FUTURE;
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java Wed Dec 28 13:02:41 2011
@@ -32,7 +32,9 @@ public interface StoredMessage<M extends
 
     int getContent(int offsetInMessage, ByteBuffer dst);
 
-    TransactionLog.StoreFuture flushToStore();
+    ByteBuffer getContent(int offsetInMessage, int size);
+
+    MessageStore.StoreFuture flushToStore();
 
     void remove();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java Wed Dec 28 13:02:41 2011
@@ -20,72 +20,7 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.AMQStoreException;
-import org.apache.commons.configuration.Configuration;
-
 public interface TransactionLog
 {
 
-    public static interface Transaction
-    {
-        /**
-         * Places a message onto a specified queue, in a given transactional context.
-         *
-         * @param queue     The queue to place the message on.
-         * @param messageId The message to enqueue.
-         * @throws AMQStoreException If the operation fails for any reason.
-         */
-        void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException;
-
-        /**
-         * Extracts a message from a specified queue, in a given transactional context.
-         *
-         * @param queue     The queue to place the message on.
-         * @param messageId The message to dequeue.
-         * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
-         */
-        void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException;
-
-
-        /**
-         * Commits all operations performed within a given transactional context.
-         *
-         * @throws AMQStoreException If the operation fails for any reason.
-         */
-        void commitTran() throws AMQStoreException;
-
-        /**
-         * Commits all operations performed within a given transactional context.
-         *
-         * @throws AMQStoreException If the operation fails for any reason.
-         */
-        StoreFuture commitTranAsync() throws AMQStoreException;
-
-        /**
-         * Abandons all operations performed within a given transactional context.
-         *
-         * @throws AMQStoreException If the operation fails for any reason.
-         */
-        void abortTran() throws AMQStoreException;
-
-
-
-    }
-
-    public void configureTransactionLog(String name,
-                      TransactionLogRecoveryHandler recoveryHandler,
-                      Configuration storeConfiguration,
-                      LogSubject logSubject) throws Exception;
-
-    Transaction newTransaction();
-
-
-
-    public static interface StoreFuture
-    {
-        boolean isComplete();
-
-        void waitForCompletion();
-    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java Wed Dec 28 13:02:41 2011
@@ -22,7 +22,7 @@ package org.apache.qpid.server.store;
 
 public interface TransactionLogRecoveryHandler
 {
-    QueueEntryRecoveryHandler begin(TransactionLog log);
+    QueueEntryRecoveryHandler begin(MessageStore log);
 
     public static interface QueueEntryRecoveryHandler
     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Wed Dec 28 13:02:41 2011
@@ -62,7 +62,6 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageTransfer;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.Struct;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -70,10 +69,10 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.AMQException;
 
 import java.text.MessageFormat;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.locks.Lock;
@@ -183,6 +182,7 @@ public class Subscription_0_10 implement
             throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
         }
         _queue = queue;
+
         Map<String, Object> arguments = queue.getArguments() == null ? Collections.EMPTY_MAP : queue.getArguments();
         _traceExclude = (String) arguments.get("qpid.trace.exclude");
         _trace = (String) arguments.get("qpid.trace.id");
@@ -224,8 +224,8 @@ public class Subscription_0_10 implement
 
         if (_noLocal && entry.getMessage() instanceof MessageTransferMessage)
         {
-            Session messageSession= ((MessageTransferMessage)entry.getMessage()).getSession();
-            if (messageSession != null && messageSession.getConnection() == _session.getConnection())
+            Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
+            if (connectionRef != null && connectionRef == _session.getReference())
             {
                 return false;
             }
@@ -377,35 +377,8 @@ public class Subscription_0_10 implement
         {
 
             MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
-
-
-            Struct[] headers;
-            if(msg.getHeader() == null)
-            {
-                headers = EMPTY_STRUCT_ARRAY;
-            }
-            else
-            {
-                headers = msg.getHeader().getStructs();
-            }
-
-            ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
-            DeliveryProperties origDeliveryProps = null;
-            for(Struct header : headers)
-            {
-                if(header instanceof DeliveryProperties)
-                {
-                    origDeliveryProps = (DeliveryProperties) header;
-                }
-                else
-                {
-                    if(header instanceof MessageProperties)
-                    {
-                        messageProps = (MessageProperties) header;
-                    }
-                    newHeaders.add(header);
-                }
-            }
+            DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
+            messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
 
             deliveryProps = new DeliveryProperties();
             if(origDeliveryProps != null)
@@ -440,17 +413,16 @@ public class Subscription_0_10 implement
 
             deliveryProps.setRedelivered(entry.isRedelivered());
 
-            newHeaders.add(deliveryProps);
-
             if(_trace != null && messageProps == null)
             {
                 messageProps = new MessageProperties();
-                newHeaders.add(messageProps);
             }
 
-            Header header = new Header(newHeaders);
+            Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
+
 
-            xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
+            xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+                        : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
         }
         else if(serverMsg instanceof AMQMessage)
         {
@@ -463,8 +435,6 @@ public class Subscription_0_10 implement
             message_0_8.getContent(body, 0);
             body.flip();
 
-            Struct[] headers = new Struct[] { deliveryProps, messageProps };
-
             BasicContentHeaderProperties properties =
                     (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
             final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
@@ -505,8 +475,9 @@ public class Subscription_0_10 implement
 
             messageProps.setApplicationHeaders(appHeaders);
 
-            Header header = new Header(headers);
-            xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+            Header header = new Header(deliveryProps, messageProps, null);
+            xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
+                        : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
         }
         else
         {
@@ -519,8 +490,6 @@ public class Subscription_0_10 implement
             serverMsg.getContent(body, 0);
             body.flip();
 
-            Struct[] headers = new Struct[] { deliveryProps, messageProps };
-
 
             deliveryProps.setExpiration(serverMsg.getExpiration());
             deliveryProps.setImmediate(serverMsg.isImmediate());
@@ -567,8 +536,9 @@ public class Subscription_0_10 implement
 
             messageProps.setApplicationHeaders(appHeaders);
 */
-            Header header = new Header(headers);
-            xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+            Header header = new Header(deliveryProps, messageProps, null);
+            xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
+                        : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
         }
 
         boolean excludeDueToFederation = false;
@@ -644,28 +614,51 @@ public class Subscription_0_10 implement
         }
     }
 
-    private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
+    private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
     {
-        ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
-        txn.dequeue(entry.getQueue(),entry.getMessage(),
-                                new ServerTransaction.Action()
-                            {
-                                public void postCommit()
-                                {
-                                    if(restoreCredit)
-                                    {
-                                        restoreCredit(entry);
-                                    }
-                                    entry.discard();
-                                }
+        _deferredMessageCredit += deferredMessageCredit;
+        _deferredSizeCredit += deferredSizeCredit;
 
-                                public void onRollback()
-                                {
+    }
 
-                                }
-                            });
+    public void flushCreditState()
+    {
+        flushCreditState(false);
+    }
+    public void flushCreditState(boolean strict)
+    {
+        if(strict || !isSuspended() || _deferredMessageCredit >= 200
+          || !(_creditManager instanceof WindowCreditManager)
+          || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+        {
+            _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+            _deferredMessageCredit = 0;
+            _deferredSizeCredit = 0l;
+        }
     }
 
+    private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
+    {
+        AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); 
+        dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
+                           new ServerTransaction.Action()
+                           {
+                               public void postCommit()
+                               {
+                                   if (restoreCredit)
+                                   {
+                                       restoreCredit(entry);
+                                   }
+                                   entry.discard();
+                               }
+
+                               public void onRollback()
+                               {
+
+                               }
+                           });
+   }
+
     void reject(final QueueEntry entry)
     {
         entry.setRedelivered();
@@ -704,7 +697,7 @@ public class Subscription_0_10 implement
         {
             final InboundMessage m = new InboundMessageAdapter(entry);
 
-            final ArrayList<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
+            final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
 
             if (destinationQueues == null || destinationQueues.isEmpty())
             {
@@ -751,6 +744,7 @@ public class Subscription_0_10 implement
         return _stateChangeLock.tryLock();
     }
 
+
     public void getSendLock()
     {
         _stateChangeLock.lock();
@@ -816,28 +810,6 @@ public class Subscription_0_10 implement
         return _properties.get(key);
     }
 
-    private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
-    {
-        _deferredMessageCredit += deferredMessageCredit;
-        _deferredSizeCredit += deferredSizeCredit;
-
-    }
-
-    public void flushCreditState()
-    {
-        flushCreditState(false);
-    }
-    public void flushCreditState(boolean strict)
-    {
-        if(strict || !isSuspended() || _deferredMessageCredit >= 200
-          || !(_creditManager instanceof WindowCreditManager)
-          || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
-        {
-            _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
-            _deferredMessageCredit = 0;
-            _deferredSizeCredit = 0l;
-        }
-    }
 
     public FlowCreditManager_0_10 getCreditManager()
     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Wed Dec 28 13:02:41 2011
@@ -75,7 +75,7 @@ public class ServerConnection extends Co
     private boolean _statisticsEnabled = false;
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
     private final long _connectionId;
-
+    private final Object _reference = new Object();
     private ServerConnectionMBean _mBean;
     private VirtualHost _virtualHost;
     private AtomicLong _lastIoTime = new AtomicLong();
@@ -90,6 +90,11 @@ public class ServerConnection extends Co
         return _config.getId();
     }
 
+    public Object getReference()
+    {
+        return _reference;
+    }
+
     @Override
     protected void invoke(Method method)
     {
@@ -414,13 +419,11 @@ public class ServerConnection extends Co
         return _connectionId;
     }
 
-    @Override
     public boolean isSessionNameUnique(byte[] name)
     {
         return !super.hasSessionWithName(name);
     }
 
-    @Override
     public String getUserName()
     {
         return _authorizedPrincipal.getName();
@@ -450,11 +453,11 @@ public class ServerConnection extends Co
     {
         for (Session ssn : getChannels())
         {
-            ((ServerSession)ssn).flushCreditState();
+            ((ServerSession)ssn).receivedComplete();
         }
     }
 
-    @Override
+
     public ManagedObject getManagedObject()
     {
         return _mBean;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Wed Dec 28 13:02:41 2011
@@ -23,7 +23,6 @@ package org.apache.qpid.server.transport
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
 import static org.apache.qpid.util.Serial.gt;
 
-import java.lang.ref.WeakReference;
 import java.security.Principal;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -71,6 +70,7 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.Range;
 import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionDelegate;
 import org.slf4j.Logger;
@@ -86,6 +86,7 @@ public class ServerSession extends Sessi
     private ConnectionConfig _connectionConfig;
     private long _createTime = System.currentTimeMillis();
     private LogActor _actor = GenericActor.getInstance(this);
+    private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
 
     public static interface MessageDispositionChangeListener
     {
@@ -121,8 +122,6 @@ public class ServerSession extends Sessi
 
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
 
-    private final WeakReference<Session> _reference;
-
     ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
     {
         this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
@@ -134,7 +133,6 @@ public class ServerSession extends Sessi
         _connectionConfig = connConfig;        
         _transaction = new AutoCommitTransaction(this.getMessageStore());
 
-        _reference = new WeakReference<Session>(this);
         _id = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
     }
@@ -161,40 +159,22 @@ public class ServerSession extends Sessi
         return isCommandsFull(id);
     }
 
-    public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
+    public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
     {
         getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
-        _transaction.enqueue(queues,message, new ServerTransaction.Action()
-            {
-
-                BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
-
-                public void postCommit()
-                {
-                    MessageReference<?> ref = message.newReference();
-                    for(int i = 0; i < _queues.length; i++)
-                    {
-                        try
-                        {
-                            _queues[i].enqueue(message);
-                        }
-                        catch (AMQException e)
-                        {
-                            // TODO
-                            throw new RuntimeException(e);
-                        }
-                    }
-                    ref.release();
-                }
-
-                public void onRollback()
-                {
-                    // NO-OP
-                }
-            });
-
-            incrementOutstandingTxnsIfNecessary();
-            updateTransactionalActivity();
+        PostEnqueueAction postTransactionAction;
+        if(isTransactional())
+        {
+           postTransactionAction = new PostEnqueueAction(queues, message) ;
+        }
+        else
+        {
+            postTransactionAction = _postEnqueueAction;
+            postTransactionAction.setState(queues, message);
+        }
+        _transaction.enqueue(queues,message, postTransactionAction, 0L);
+        incrementOutstandingTxnsIfNecessary();
+        updateTransactionalActivity();
     }
 
 
@@ -252,7 +232,7 @@ public class ServerSession extends Sessi
 
     public RangeSet acquire(RangeSet transfers)
     {
-        RangeSet acquired = new RangeSet();
+        RangeSet acquired = RangeSetFactory.createRangeSet();
 
         if(!_messageDispositionListenerMap.isEmpty())
         {
@@ -300,41 +280,56 @@ public class ServerSession extends Sessi
 
     public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
     {
-        if(ranges != null && !_messageDispositionListenerMap.isEmpty())
+        if(ranges != null)
         {
-            Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
-            Iterator<Range> rangeIter = ranges.iterator();
 
-            if(rangeIter.hasNext())
+            if(ranges.size() == 1)
             {
-                Range range = rangeIter.next();
+                Range r = ranges.getFirst();
+                for(int i = r.getLower(); i <= r.getUpper(); i++)
+                {
+                    MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i);
+                    if(changeListener != null)
+                    {
+                        action.performAction(changeListener);
+                    }
+                }
+            }
+            else if(!_messageDispositionListenerMap.isEmpty())
+            {
+                Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
+                Iterator<Range> rangeIter = ranges.iterator();
 
-                while(range != null && unacceptedMessages.hasNext())
+                if(rangeIter.hasNext())
                 {
-                    int next = unacceptedMessages.next();
-                    while(gt(next, range.getUpper()))
+                    Range range = rangeIter.next();
+
+                    while(range != null && unacceptedMessages.hasNext())
                     {
-                        if(rangeIter.hasNext())
+                        int next = unacceptedMessages.next();
+                        while(gt(next, range.getUpper()))
                         {
-                            range = rangeIter.next();
+                            if(rangeIter.hasNext())
+                            {
+                                range = rangeIter.next();
+                            }
+                            else
+                            {
+                                range = null;
+                                break;
+                            }
                         }
-                        else
+                        if(range != null && range.includes(next))
                         {
-                            range = null;
-                            break;
+                            MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
+                            action.performAction(changeListener);
                         }
-                    }
-                    if(range != null && range.includes(next))
-                    {
-                        MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
-                        action.performAction(changeListener);
-                    }
 
 
-                }
+                    }
 
+                }
             }
-
         }
     }
 
@@ -534,10 +529,10 @@ public class ServerSession extends Sessi
         _taskList.remove(task);
     }
 
-    public WeakReference<Session> getReference()
-     {
-         return _reference;
-     }
+    public Object getReference()
+    {
+        return ((ServerConnection) getConnection()).getReference();
+    }
 
     public MessageStore getMessageStore()
     {
@@ -697,7 +692,7 @@ public class ServerSession extends Sessi
         }
     }
 
-    public void flushCreditState()
+    public void receivedComplete()
     {
         final Collection<Subscription_0_10> subscriptions = getSubscriptions();
         for (Subscription_0_10 subscription_0_10 : subscriptions)
@@ -706,6 +701,54 @@ public class ServerSession extends Sessi
         }
     }
 
+    private static class PostEnqueueAction implements ServerTransaction.Action
+    {
+
+        private List<? extends BaseQueue> _queues;
+        private ServerMessage _message;
+        private final boolean _transactional;
+
+        public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message)
+        {
+            _transactional = true;
+            setState(queues, message);
+        }
+
+        public PostEnqueueAction()
+        {
+            _transactional = false;
+        }
+
+        public void setState(List<? extends BaseQueue> queues, ServerMessage message)
+        {
+            _message = message;
+            _queues = queues;
+        }
+
+        public void postCommit()
+        {
+            MessageReference<?> ref = _message.newReference();
+            for(int i = 0; i < _queues.size(); i++)
+            {
+                try
+                {
+                    _queues.get(i).enqueue(_message, _transactional, null);
+                }
+                catch (AMQException e)
+                {
+                    // TODO
+                    throw new RuntimeException(e);
+                }
+            }
+            ref.release();
+        }
+
+        public void onRollback()
+        {
+            // NO-OP
+        }
+    }
+
     public int getUnacknowledgedMessageCount()
     {
         return _messageDispositionListenerMap.size();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Wed Dec 28 13:02:41 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
@@ -55,46 +56,7 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Acquired;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.ExchangeBind;
-import org.apache.qpid.transport.ExchangeBound;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.ExchangeDeclare;
-import org.apache.qpid.transport.ExchangeDelete;
-import org.apache.qpid.transport.ExchangeQuery;
-import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExchangeUnbind;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.MessageAccept;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquire;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCancel;
-import org.apache.qpid.transport.MessageFlow;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageFlush;
-import org.apache.qpid.transport.MessageReject;
-import org.apache.qpid.transport.MessageRejectCode;
-import org.apache.qpid.transport.MessageRelease;
-import org.apache.qpid.transport.MessageResume;
-import org.apache.qpid.transport.MessageSetFlowMode;
-import org.apache.qpid.transport.MessageStop;
-import org.apache.qpid.transport.MessageSubscribe;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.QueueDeclare;
-import org.apache.qpid.transport.QueueDelete;
-import org.apache.qpid.transport.QueuePurge;
-import org.apache.qpid.transport.QueueQuery;
-import org.apache.qpid.transport.QueueQueryResult;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.TxCommit;
-import org.apache.qpid.transport.TxRollback;
-import org.apache.qpid.transport.TxSelect;
+import org.apache.qpid.transport.*;
 
 public class ServerSessionDelegate extends SessionDelegate
 {
@@ -295,7 +257,8 @@ public class ServerSessionDelegate exten
         final Exchange exchange = getExchangeForMessage(ssn, xfr);
 
         DeliveryProperties delvProps = null;
-        if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+        if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps
+                .hasExpiration())
         {
             delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
         }
@@ -312,7 +275,7 @@ public class ServerSessionDelegate exten
         }
 
         final Exchange exchangeInUse;
-        ArrayList<? extends BaseQueue> queues = exchange.route(messageMetaData);
+        List<? extends BaseQueue> queues = exchange.route(messageMetaData);
         if(queues.isEmpty() && exchange.getAlternateExchange() != null)
         {
             final Exchange alternateExchange = exchange.getAlternateExchange();
@@ -334,15 +297,16 @@ public class ServerSessionDelegate exten
         if(!queues.isEmpty())
         {
             final MessageStore store = getVirtualHost(ssn).getMessageStore();
-            final StoredMessage<MessageMetaData_0_10> storeMessage = createAndFlushStoreMessage(xfr, messageMetaData, store);
+            final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
             MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
             ((ServerSession) ssn).enqueue(message, queues);
+            storeMessage.flushToStore();
         }
         else
         {
             if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
             {
-                RangeSet rejects = new RangeSet();
+                RangeSet rejects = RangeSetFactory.createRangeSet();
                 rejects.add(xfr.getId());
                 MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
                 ssn.invoke(reject);
@@ -353,11 +317,13 @@ public class ServerSessionDelegate exten
             }
         }
 
+
+
         ssn.processed(xfr);
     }
 
-    private StoredMessage<MessageMetaData_0_10> createAndFlushStoreMessage(final MessageTransfer xfr,
-            final MessageMetaData_0_10 messageMetaData, final MessageStore store)
+    private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
+                                                                   final MessageMetaData_0_10 messageMetaData, final MessageStore store)
     {
         final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
         ByteBuffer body = xfr.getBody();
@@ -365,7 +331,6 @@ public class ServerSessionDelegate exten
         {
             storeMessage.addContent(0, body);
         }
-        storeMessage.flushToStore();
         return storeMessage;
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.txn;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
@@ -30,7 +31,7 @@ import org.apache.qpid.server.message.En
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
 
 /**
  * An implementation of ServerTransaction where each enqueue/dequeue
@@ -43,11 +44,11 @@ public class AutoCommitTransaction imple
 {
     protected static final Logger _logger = Logger.getLogger(AutoCommitTransaction.class);
 
-    private final TransactionLog _transactionLog;
+    private final MessageStore _messageStore;
 
-    public AutoCommitTransaction(TransactionLog transactionLog)
+    public AutoCommitTransaction(MessageStore transactionLog)
     {
-        _transactionLog = transactionLog;
+        _messageStore = transactionLog;
     }
 
     public long getTransactionStartTime()
@@ -59,14 +60,14 @@ public class AutoCommitTransaction imple
      * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
      * by the caller are executed immediately.
      */
-    public void addPostTransactionAction(Action immediateAction)
+    public void addPostTransactionAction(final Action immediateAction)
     {
         immediateAction.postCommit();
     }
 
     public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
     {
-        TransactionLog.Transaction txn = null;
+        MessageStore.Transaction txn = null;
         try
         {
             if(message.isPersistent() && queue.isDurable())
@@ -76,8 +77,8 @@ public class AutoCommitTransaction imple
                     _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
                 }
 
-                txn = _transactionLog.newTransaction();
-                txn.dequeueMessage(queue, message.getMessageNumber());
+                txn = _messageStore.newTransaction();
+                txn.dequeueMessage(queue, message);
                 txn.commitTran();
                 txn = null;
             }
@@ -98,7 +99,7 @@ public class AutoCommitTransaction imple
 
     public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
     {
-        TransactionLog.Transaction txn = null;
+        MessageStore.Transaction txn = null;
         try
         {
             for(QueueEntry entry : queueEntries)
@@ -115,10 +116,10 @@ public class AutoCommitTransaction imple
 
                     if(txn == null)
                     {
-                        txn = _transactionLog.newTransaction();
+                        txn = _messageStore.newTransaction();
                     }
 
-                    txn.dequeueMessage(queue, message.getMessageNumber());
+                    txn.dequeueMessage(queue, message);
                 }
 
             }
@@ -145,7 +146,7 @@ public class AutoCommitTransaction imple
 
     public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
     {
-        TransactionLog.Transaction txn = null;
+        MessageStore.Transaction txn = null;
         try
         {
             if(message.isPersistent() && queue.isDurable())
@@ -155,8 +156,8 @@ public class AutoCommitTransaction imple
                     _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
                 }
 
-                txn = _transactionLog.newTransaction();
-                txn.enqueueMessage(queue, message.getMessageNumber());
+                txn = _messageStore.newTransaction();
+                txn.enqueueMessage(queue, message);
                 txn.commitTran();
                 txn = null;
             }
@@ -176,15 +177,14 @@ public class AutoCommitTransaction imple
 
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
     {
-        TransactionLog.Transaction txn = null;
+        MessageStore.Transaction txn = null;
         try
         {
 
             if(message.isPersistent())
             {
-                Long id = message.getMessageNumber();
                 for(BaseQueue queue : queues)
                 {
                     if (queue.isDurable())
@@ -195,22 +195,26 @@ public class AutoCommitTransaction imple
                         }
                         if (txn == null)
                         {
-                            txn = _transactionLog.newTransaction();
+                            txn = _messageStore.newTransaction();
                         }
                         
-                        txn.enqueueMessage(queue, id);
+                        txn.enqueueMessage(queue, message);
+
+
                     }
                 }
                 
-                if (txn != null)
-                {
-                    txn.commitTran();
-                    txn = null;
-
-                }
             }
+            if (txn != null)
+            {
+                txn.commitTran();
+                txn = null;
+            }
+
             postTransactionAction.postCommit();
             postTransactionAction = null;
+
+
         }
         catch (AMQException e)
         {
@@ -225,6 +229,11 @@ public class AutoCommitTransaction imple
     }
 
 
+    public void commit(final Runnable immediatePostTransactionAction)
+    {
+        immediatePostTransactionAction.run();
+    }    
+    
     public void commit()
     {
     }
@@ -233,7 +242,7 @@ public class AutoCommitTransaction imple
     {
     }
 
-    private void rollbackIfNecessary(Action postTransactionAction, TransactionLog.Transaction txn)
+    private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
     {
         if (txn != null)
         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Wed Dec 28 13:02:41 2011
@@ -29,11 +29,7 @@ import org.apache.qpid.server.message.En
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,11 +46,11 @@ public class LocalTransaction implements
 
     private final List<Action> _postTransactionActions = new ArrayList<Action>();
 
-    private volatile TransactionLog.Transaction _transaction;
-    private TransactionLog _transactionLog;
+    private volatile MessageStore.Transaction _transaction;
+    private MessageStore _transactionLog;
     private long _txnStartTime = 0L;
 
-    public LocalTransaction(TransactionLog transactionLog)
+    public LocalTransaction(MessageStore transactionLog)
     {
         _transactionLog = transactionLog;
     }
@@ -63,7 +59,7 @@ public class LocalTransaction implements
     {
         return _transaction != null;
     }
-    
+
     public long getTransactionStartTime()
     {
         return _txnStartTime;
@@ -88,7 +84,7 @@ public class LocalTransaction implements
                 }
 
                 beginTranIfNecessary();
-                _transaction.dequeueMessage(queue, message.getMessageNumber());
+                _transaction.dequeueMessage(queue, message);
 
             }
             catch(AMQException e)
@@ -118,7 +114,7 @@ public class LocalTransaction implements
                     }
 
                     beginTranIfNecessary();
-                    _transaction.dequeueMessage(queue, message.getMessageNumber());
+                    _transaction.dequeueMessage(queue, message);
                 }
 
             }
@@ -191,7 +187,7 @@ public class LocalTransaction implements
                 }
                 
                 beginTranIfNecessary();
-                _transaction.enqueueMessage(queue, message.getMessageNumber());
+                _transaction.enqueueMessage(queue, message);
             }
             catch (Exception e)
             {
@@ -202,13 +198,13 @@ public class LocalTransaction implements
         }
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
     {
         _postTransactionActions.add(postTransactionAction);
 
         if (_txnStartTime == 0L)
         {
-            _txnStartTime = System.currentTimeMillis();
+            _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime;
         }
 
         if(message.isPersistent())
@@ -226,7 +222,7 @@ public class LocalTransaction implements
                         
                         
                         beginTranIfNecessary();
-                        _transaction.enqueueMessage(queue, message.getMessageNumber());
+                        _transaction.enqueueMessage(queue, message);
                     }
                 }
 
@@ -242,6 +238,11 @@ public class LocalTransaction implements
 
     public void commit()
     {
+        commit(null);
+    }
+
+    public void commit(Runnable immediateAction)
+    {
         try
         {
             if(_transaction != null)
@@ -249,9 +250,14 @@ public class LocalTransaction implements
                 _transaction.commitTran();
             }
 
-            for(Action action : _postTransactionActions)
+            if(immediateAction != null)
+            {
+                immediateAction.run();
+            }
+
+            for(int i = 0; i < _postTransactionActions.size(); i++)
             {
-                action.postCommit();
+                _postTransactionActions.get(i).postCommit();
             }
         }
         catch (Exception e)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Wed Dec 28 13:02:41 2011
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.txn;
 
 import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 
@@ -42,7 +43,7 @@ import java.util.List;
  */
 public interface ServerTransaction
 {
-    /** 
+    /**
      * Represents an action to be performed on transaction commit or rollback
      */
     public static interface Action
@@ -91,7 +92,7 @@ public interface ServerTransaction
      * 
      * Store operations will result only for a persistent messages on durable queues.
      */
-    void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction);
+    void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime);
 
     /** 
      * Commit the transaction represented by this object.
@@ -101,6 +102,8 @@ public interface ServerTransaction
      */
     void commit();
 
+    void commit(Runnable immediatePostTransactionAction);
+
     /** Rollback the transaction represented by this object.
      * 
      * If the caller has registered one or more Actions, the onRollback() method on each will

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Wed Dec 28 13:02:41 2011
@@ -39,7 +39,6 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
 
 public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
 {
@@ -57,8 +56,6 @@ public interface VirtualHost extends Dur
 
     MessageStore getMessageStore();
 
-    TransactionLog getTransactionLog();
-
     DurableConfigurationStore getDurableConfigurationStore();
 
     AuthenticationManager getAuthenticationManager();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Wed Dec 28 13:02:41 2011
@@ -20,12 +20,12 @@
 */
 package org.apache.qpid.server.virtualhost;
 
+import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLog;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -73,7 +73,6 @@ public class VirtualHostConfigRecoveryHa
     private List<ProcessAction> _actions;
 
     private MessageStore _store;
-    private TransactionLog _transactionLog;
 
     private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
     private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
@@ -86,7 +85,7 @@ public class VirtualHostConfigRecoveryHa
         _virtualHost = virtualHost;
     }
 
-    public QueueRecoveryHandler begin(MessageStore store)
+    public VirtualHostConfigRecoveryHandler begin(MessageStore store)
     {
         _logSubject = new MessageStoreLogSubject(_virtualHost,store);
         _store = store;
@@ -99,14 +98,12 @@ public class VirtualHostConfigRecoveryHa
     {
         try
         {
-            AMQShortString queueNameShortString = new AMQShortString(queueName);
-    
-            AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+            AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName);
     
             if (q == null)
             {
-                q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, exclusive, _virtualHost,
-                                                       arguments);
+                q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost,
+                                                       FieldTable.convertToMap(arguments));
                 _virtualHost.getQueueRegistry().registerQueue(q);
             }
     
@@ -186,12 +183,6 @@ public class VirtualHostConfigRecoveryHa
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public TransactionLogRecoveryHandler.QueueEntryRecoveryHandler begin(TransactionLog log)
-    {
-        _transactionLog = log;
-        return this;
-    }
-
     private static final class ProcessAction
     {
         private final AMQQueue _queue;
@@ -316,15 +307,15 @@ public class VirtualHostConfigRecoveryHa
                 else
                 {
                     _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded");
-                    TransactionLog.Transaction txn = _transactionLog.newTransaction();
-                    txn.dequeueMessage(queue, messageId);
+                    MessageStore.Transaction txn = _store.newTransaction();
+                    txn.dequeueMessage(queue, new DummyMessage(messageId));
                     txn.commitTranAsync();
                 }
             }
             else
             {
                 _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
-                TransactionLog.Transaction txn = _transactionLog.newTransaction();
+                MessageStore.Transaction txn = _store.newTransaction();
                 TransactionLogResource mockQueue =
                         new TransactionLogResource()
                         {
@@ -334,7 +325,7 @@ public class VirtualHostConfigRecoveryHa
                                 return queueName;
                             }
                         };
-                txn.dequeueMessage(mockQueue, messageId);
+                txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
                 txn.commitTranAsync();
             }
 
@@ -367,4 +358,32 @@ public class VirtualHostConfigRecoveryHa
         CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
     }
 
+    private static class DummyMessage implements EnqueableMessage
+    {
+
+
+        private final long _messageId;
+
+        public DummyMessage(long messageId)
+        {
+            _messageId = messageId;
+        }
+
+        public long getMessageNumber()
+        {
+            return _messageId;
+        }
+
+
+        public boolean isPersistent()
+        {
+            return true;
+        }
+
+
+        public StoredMessage getStoredMessage()
+        {
+            return null;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Wed Dec 28 13:02:41 2011
@@ -75,7 +75,6 @@ import org.apache.qpid.server.stats.Stat
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
 import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
 import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
 
@@ -228,7 +227,10 @@ public class VirtualHostImpl implements 
         if (store != null)
         {
             _messageStore = store;
-            _durableConfigurationStore = store;
+            if(store instanceof DurableConfigurationStore)
+            {
+                _durableConfigurationStore = (DurableConfigurationStore) store;
+            }
         }
         else
         {
@@ -380,6 +382,8 @@ public class VirtualHostImpl implements 
         Class clazz = Class.forName(messageStoreClass);
         Object o = clazz.newInstance();
 
+
+
         if (!(o instanceof MessageStore))
         {
             throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
@@ -390,10 +394,18 @@ public class VirtualHostImpl implements 
 
         MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore);
 
-        messageStore.configureConfigStore(this.getName(),
-                                          recoveryHandler,
-                                          hostConfig.getStoreConfiguration(),
-                                          storeLogSubject);
+
+        if(messageStore instanceof DurableConfigurationStore)
+        {
+            DurableConfigurationStore durableConfigurationStore = (DurableConfigurationStore) messageStore;
+
+            durableConfigurationStore.configureConfigStore(this.getName(),
+                                              recoveryHandler,
+                                              hostConfig.getStoreConfiguration(),
+                                              storeLogSubject);
+
+            _durableConfigurationStore = durableConfigurationStore;
+        }
 
         messageStore.configureMessageStore(this.getName(),
                                            recoveryHandler,
@@ -405,7 +417,8 @@ public class VirtualHostImpl implements 
                                            storeLogSubject);
 
         _messageStore = messageStore;
-        _durableConfigurationStore = messageStore;
+
+
     }
 
     private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
@@ -553,11 +566,6 @@ public class VirtualHostImpl implements 
         return _messageStore;
     }
 
-    public TransactionLog getTransactionLog()
-    {
-        return _messageStore;
-    }
-
     public DurableConfigurationStore getDurableConfigurationStore()
     {
         return _durableConfigurationStore;

Propchange: qpid/trunk/qpid/java/broker/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org