You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/07/25 16:24:37 UTC

svn commit: r1613440 [1/2] - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/...

Author: rgodfrey
Date: Fri Jul 25 14:24:36 2014
New Revision: 1613440

URL: http://svn.apache.org/r1613440
Log:
QPID-4304 : [Java Broker] Add an attribute to queues - "messageDurability" - which controls whether message data is persisted or not.  By default, depend on the persistence setting of the message, but allow an individual queue to declare that all (or no) messages should be persisted on the queue

Added:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/trunk/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
    qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
    qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
    qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
    qpid/trunk/qpid/java/test-profiles/CPPExcludes
    qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Fri Jul 25 14:24:36 2014
@@ -55,7 +55,6 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.store.Xid;
@@ -123,14 +122,7 @@ public abstract class AbstractBDBMessage
 
         long newMessageId = getNextMessageId();
 
-        if (metaData.isPersistent())
-        {
-            return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData);
-        }
-        else
-        {
-            return new StoredMemoryMessage<T>(newMessageId, metaData);
-        }
+        return new StoredBDBMessage<T>(newMessageId, metaData);
     }
 
     public long getNextMessageId()
@@ -1049,7 +1041,7 @@ public abstract class AbstractBDBMessage
 
     protected abstract Logger getLogger();
 
-    private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+    class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
     {
 
         private final long _messageId;
@@ -1177,8 +1169,7 @@ public abstract class AbstractBDBMessage
             }
         }
 
-        @Override
-        public synchronized StoreFuture flushToStore()
+        synchronized StoreFuture flushToStore()
         {
             if(!stored())
             {
@@ -1229,6 +1220,7 @@ public abstract class AbstractBDBMessage
     {
         private Transaction _txn;
         private int _storeSizeIncrease;
+        private final List<Runnable> _onCommitActions = new ArrayList<>();
 
         private BDBTransaction() throws StoreException
         {
@@ -1250,8 +1242,16 @@ public abstract class AbstractBDBMessage
             if(message.getStoredMessage() instanceof StoredBDBMessage)
             {
                 final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
-                storedMessage.store(_txn);
-                _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+                _onCommitActions.add(new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        storedMessage.store(_txn);
+                        _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+                    }
+                });
+
             }
 
             AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
@@ -1269,16 +1269,25 @@ public abstract class AbstractBDBMessage
         public void commitTran() throws StoreException
         {
             checkMessageStoreOpen();
-
+            doPreCommitActions();
             AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
             AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
         }
 
+        private void doPreCommitActions()
+        {
+            for(Runnable action : _onCommitActions)
+            {
+                action.run();
+            }
+            _onCommitActions.clear();
+        }
+
         @Override
         public StoreFuture commitTranAsync() throws StoreException
         {
             checkMessageStoreOpen();
-
+            doPreCommitActions();
             AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
             return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
         }
@@ -1287,7 +1296,7 @@ public abstract class AbstractBDBMessage
         public void abortTran() throws StoreException
         {
             checkMessageStoreOpen();
-
+            _onCommitActions.clear();
             AbstractBDBMessageStore.this.abortTran(_txn);
         }
 

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java Fri Jul 25 14:24:36 2014
@@ -27,6 +27,7 @@ import com.sleepycat.bind.tuple.TupleBin
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -131,9 +132,9 @@ public class PreparedTransactionBinding 
         }
 
         @Override
-        public boolean isDurable()
+        public MessageDurability getMessageDurability()
         {
-            return true;
+            return MessageDurability.DEFAULT;
         }
     }
 }

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Fri Jul 25 14:24:36 2014
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -51,9 +54,6 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageTransfer;
 import org.apache.qpid.util.FileUtils;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /**
  * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
  * the BDB Store as well as additional tests specific to the BDB store-implementation.
@@ -113,7 +113,7 @@ public class BDBMessageStoreTest extends
 
         storedMessage_0_8.addContent(0, firstContentBytes_0_8);
         storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
-        storedMessage_0_8.flushToStore();
+        ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
 
         /*
          * Create and insert a 0-10 message (metadata and content)
@@ -132,7 +132,7 @@ public class BDBMessageStoreTest extends
         long messageid_0_10 = storedMessage_0_10.getMessageNumber();
 
         storedMessage_0_10.addContent(0, completeContentBody_0_10);
-        storedMessage_0_10.flushToStore();
+        ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore();
 
         /*
          * reload the store only (read-only)
@@ -387,7 +387,7 @@ public class BDBMessageStoreTest extends
         StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
 
         storedMessage_0_8.addContent(0, chunk1);
-        storedMessage_0_8.flushToStore();
+        ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
 
         return storedMessage_0_8;
     }

Modified: qpid/trunk/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java (original)
+++ qpid/trunk/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/plugin/PluggableService.java Fri Jul 25 14:24:36 2014
@@ -24,7 +24,7 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-@Retention(RetentionPolicy.SOURCE)
+@Retention(RetentionPolicy.CLASS)
 @Target(ElementType.TYPE)
 public @interface PluggableService
 {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Fri Jul 25 14:24:36 2014
@@ -235,12 +235,6 @@ public class InternalMessage extends Abs
                 }
 
                 @Override
-                public StoreFuture flushToStore()
-                {
-                    throw new UnsupportedOperationException();
-                }
-
-                @Override
                 public void remove()
                 {
                     throw new UnsupportedOperationException();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Fri Jul 25 14:24:36 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.model;
 import java.util.Collection;
 
 import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.store.MessageDurability;
 
 @ManagedObject( defaultType = "standard" )
 public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@@ -35,6 +36,7 @@ public interface Queue<X extends Queue<X
     String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages";
     String ALTERNATE_EXCHANGE = "alternateExchange";
     String EXCLUSIVE = "exclusive";
+    String MESSAGE_DURABILITY = "messageDurability";
     String MESSAGE_GROUP_KEY = "messageGroupKey";
     String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
     String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup";
@@ -130,6 +132,10 @@ public interface Queue<X extends Queue<X
     @ManagedAttribute( defaultValue = "${queue.alertRepeatGap}")
     long getAlertRepeatGap();
 
+    @ManagedAttribute( defaultValue = "DEFAULT" )
+    MessageDurability getMessageDurability();
+
+
 
     //children
     Collection<? extends Binding> getBindings();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Jul 25 14:24:36 2014
@@ -77,6 +77,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
@@ -175,6 +176,9 @@ public abstract class AbstractQueue<X ex
     @ManagedAttributeField
     private ExclusivityPolicy _exclusive;
 
+    @ManagedAttributeField
+    private MessageDurability _messageDurability;
+
     private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name
 
     private final Set<NotificationCheck> _notificationChecks =
@@ -245,12 +249,38 @@ public abstract class AbstractQueue<X ex
     {
         super.onCreate();
 
+        if(isDurable() && (getLifetimePolicy()  == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+                            || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END))
+        {
+            Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
+                         new PrivilegedAction<Object>()
+                         {
+                             @Override
+                             public Object run()
+                             {
+                                 setAttribute(AbstractConfiguredObject.DURABLE, true, false);
+                                 return null;
+                             }
+                         });
+        }
 
-        if (isDurable() && !(getLifetimePolicy()  == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
-                             || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END))
+        if (isDurable())
         {
             _virtualHost.getDurableConfigurationStore().create(asObjectRecord());
         }
+        else if(getMessageDurability() != MessageDurability.NEVER)
+        {
+            Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
+                         new PrivilegedAction<Object>()
+                         {
+                             @Override
+                             public Object run()
+                             {
+                                 setAttribute(Queue.MESSAGE_DURABILITY, getMessageDurability(), MessageDurability.NEVER);
+                                 return null;
+                             }
+                         });
+        }
 
         _recovering.set(false);
     }
@@ -510,6 +540,11 @@ public abstract class AbstractQueue<X ex
         }
     }
 
+    @Override
+    public final MessageDurability getMessageDurability()
+    {
+        return _messageDurability;
+    }
 
     @Override
     public Collection<String> getAvailableAttributes()

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Fri Jul 25 14:24:36 2014
@@ -52,6 +52,9 @@ public class QueueArgumentsConverter
     public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
     public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
     public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
+
+    public static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
+
     public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude";
     public static final String QPID_TRACE_ID = "qpid.trace.id";
 
@@ -91,6 +94,7 @@ public class QueueArgumentsConverter
         ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP);
 
         ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
+        ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY);
 
     }
 
@@ -138,7 +142,12 @@ public class QueueArgumentsConverter
         {
             if(modelArguments.containsKey(entry.getValue()))
             {
-                wireArguments.put(entry.getKey(), modelArguments.get(entry.getValue()));
+                Object value = modelArguments.get(entry.getValue());
+                if(value instanceof Enum)
+                {
+                    value = ((Enum) value).name();
+                }
+                wireArguments.put(entry.getKey(), value);
             }
         }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Fri Jul 25 14:24:36 2014
@@ -447,14 +447,8 @@ public abstract class AbstractJDBCMessag
     {
         checkMessageStoreOpen();
 
-        if(metaData.isPersistent())
-        {
-            return new StoredJDBCMessage(getNextMessageId(), metaData);
-        }
-        else
-        {
-            return new StoredMemoryMessage(getNextMessageId(), metaData);
-        }
+        return new StoredJDBCMessage(getNextMessageId(), metaData);
+
     }
 
     @Override
@@ -970,9 +964,9 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public boolean isDurable()
+        public MessageDurability getMessageDurability()
         {
-            return true;
+            return MessageDurability.DEFAULT;
         }
     }
 
@@ -1122,7 +1116,7 @@ public abstract class AbstractJDBCMessag
     {
         private final ConnectionWrapper _connWrapper;
         private int _storeSizeIncrease;
-
+        private final List<Runnable> _onCommitActions = new ArrayList<>();
 
         protected JDBCTransaction()
         {
@@ -1144,16 +1138,23 @@ public abstract class AbstractJDBCMessag
             final StoredMessage storedMessage = message.getStoredMessage();
             if(storedMessage instanceof StoredJDBCMessage)
             {
-                try
-                {
-                    ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
-                }
-                catch (SQLException e)
+                _onCommitActions.add(new Runnable()
                 {
-                    throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
-                }
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
+                            _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+                        }
+                        catch (SQLException e)
+                        {
+                            throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
+                        }
+                    }
+                });
             }
-            _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
             AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
 
         }
@@ -1170,7 +1171,7 @@ public abstract class AbstractJDBCMessag
         public void commitTran()
         {
             checkMessageStoreOpen();
-
+            doPreCommitActions();
             AbstractJDBCMessageStore.this.commitTran(_connWrapper);
             storedSizeChange(_storeSizeIncrease);
         }
@@ -1179,17 +1180,26 @@ public abstract class AbstractJDBCMessag
         public StoreFuture commitTranAsync()
         {
             checkMessageStoreOpen();
-
+            doPreCommitActions();
             StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
             storedSizeChange(_storeSizeIncrease);
             return storeFuture;
         }
 
+        private void doPreCommitActions()
+        {
+            for(Runnable action : _onCommitActions)
+            {
+                action.run();
+            }
+            _onCommitActions.clear();
+        }
+
         @Override
         public void abortTran()
         {
             checkMessageStoreOpen();
-
+            _onCommitActions.clear();
             AbstractJDBCMessageStore.this.abortTran(_connWrapper);
         }
 
@@ -1215,7 +1225,6 @@ public abstract class AbstractJDBCMessag
 
         private final long _messageId;
         private final boolean _isRecovered;
-
         private StorableMessageMetaData _metaData;
         private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
         private byte[] _data;
@@ -1320,39 +1329,6 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public synchronized StoreFuture flushToStore()
-        {
-            checkMessageStoreOpen();
-
-            Connection conn = null;
-            try
-            {
-                if(!stored())
-                {
-                    conn = newConnection();
-
-                    store(conn);
-
-                    conn.commit();
-                    storedSizeChange(getMetaData().getContentSize());
-                }
-            }
-            catch (SQLException e)
-            {
-                if(getLogger().isDebugEnabled())
-                {
-                    getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e);
-                }
-                throw new StoreException(e);
-            }
-            finally
-            {
-                JdbcUtils.closeConnection(conn, AbstractJDBCMessageStore.this.getLogger());
-            }
-            return StoreFuture.IMMEDIATE_FUTURE;
-        }
-
-        @Override
         public void remove()
         {
             checkMessageStoreOpen();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Jul 25 14:24:36 2014
@@ -64,6 +64,12 @@ public class MemoryMessageStore implemen
         @Override
         public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
         {
+
+            if(message.getStoredMessage() instanceof StoredMemoryMessage)
+            {
+                _messages.putIfAbsent(message.getMessageNumber(), (StoredMemoryMessage) message.getStoredMessage());
+            }
+
             Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
             if (messageIds == null)
             {
@@ -196,31 +202,20 @@ public class MemoryMessageStore implemen
     {
         long id = getNextMessageId();
 
-        if(metaData.isPersistent())
+        StoredMemoryMessage<T> storedMemoryMessage = new StoredMemoryMessage<T>(id, metaData)
         {
-            return new StoredMemoryMessage<T>(id, metaData)
+
+            @Override
+            public void remove()
             {
+                _messages.remove(getMessageNumber());
+                super.remove();
+            }
 
-                @Override
-                public StoreFuture flushToStore()
-                {
-                    _messages.putIfAbsent(getMessageNumber(), this) ;
-                    return super.flushToStore();
-                }
+        };
 
-                @Override
-                public void remove()
-                {
-                    _messages.remove(getMessageNumber());
-                    super.remove();
-                }
+        return storedMemoryMessage;
 
-            };
-        }
-        else
-        {
-            return new StoredMemoryMessage<T>(id, metaData);
-        }
     }
 
     @Override

Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java?rev=1613440&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageDurability.java Fri Jul 25 14:24:36 2014
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public enum MessageDurability
+{
+    DEFAULT(false,true),
+    ALWAYS(true,true),
+    NEVER(false,false);
+
+    private final boolean _nonPersistent;
+    private final boolean _persistent;
+
+    MessageDurability(final boolean nonPersistent, final boolean persistent)
+    {
+        _nonPersistent = nonPersistent;
+        _persistent = persistent;
+    }
+
+    public boolean persist(final boolean persistent)
+    {
+        return persistent ? _persistent : _nonPersistent;
+    }
+}

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Fri Jul 25 14:24:36 2014
@@ -122,12 +122,6 @@ public class StoredMemoryMessage<T exten
         return buf;
     }
 
-    public StoreFuture flushToStore()
-    {
-        return StoreFuture.IMMEDIATE_FUTURE;
-    }
-
-
     public T getMetaData()
     {
         return _metaData;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java Fri Jul 25 14:24:36 2014
@@ -34,7 +34,5 @@ public interface StoredMessage<M extends
 
     ByteBuffer getContent(int offsetInMessage, int size);
 
-    StoreFuture flushToStore();
-
     void remove();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java Fri Jul 25 14:24:36 2014
@@ -24,7 +24,9 @@ import java.util.UUID;
 
 public interface TransactionLogResource
 {
+
     String getName();
     public UUID getId();
-    boolean isDurable();
+    //boolean isDurable();
+    MessageDurability getMessageDurability();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Fri Jul 25 14:24:36 2014
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.server.txn;
 
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
@@ -31,9 +34,6 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
 
-import java.util.Collection;
-import java.util.List;
-
 /**
  * An implementation of ServerTransaction where each enqueue/dequeue
  * operation takes place within it own transaction.
@@ -93,7 +93,7 @@ public class AsyncAutoCommitTransaction 
         try
         {
             StoreFuture future;
-            if(message.isPersistent() && queue.isDurable())
+            if(queue.getMessageDurability().persist(message.isPersistent()))
             {
                 if (_logger.isDebugEnabled())
                 {
@@ -162,7 +162,7 @@ public class AsyncAutoCommitTransaction 
                 ServerMessage message = entry.getMessage();
                 TransactionLogResource queue = entry.getOwningResource();
 
-                if(message.isPersistent() && queue.isDurable())
+                if(queue.getMessageDurability().persist(message.isPersistent()))
                 {
                     if (_logger.isDebugEnabled())
                     {
@@ -205,7 +205,7 @@ public class AsyncAutoCommitTransaction 
         try
         {
             StoreFuture future;
-            if(message.isPersistent() && queue.isDurable())
+            if(queue.getMessageDurability().persist(message.isPersistent()))
             {
                 if (_logger.isDebugEnabled())
                 {
@@ -237,28 +237,24 @@ public class AsyncAutoCommitTransaction 
         try
         {
 
-            if(message.isPersistent())
+            for(BaseQueue queue : queues)
             {
-                for(BaseQueue queue : queues)
+                if (queue.getMessageDurability().persist(message.isPersistent()))
                 {
-                    if (queue.isDurable())
+                    if (_logger.isDebugEnabled())
                     {
-                        if (_logger.isDebugEnabled())
-                        {
-                            _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
-                        }
-                        if (txn == null)
-                        {
-                            txn = _messageStore.newTransaction();
-                        }
-
-                        txn.enqueueMessage(queue, message);
+                        _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
+                    }
+                    if (txn == null)
+                    {
+                        txn = _messageStore.newTransaction();
+                    }
+                    txn.enqueueMessage(queue, message);
 
 
-                    }
                 }
-
             }
+
             StoreFuture future;
             if (txn != null)
             {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Fri Jul 25 14:24:36 2014
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.server.txn;
 
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
@@ -30,9 +33,6 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
 
-import java.util.Collection;
-import java.util.List;
-
 /**
  * An implementation of ServerTransaction where each enqueue/dequeue
  * operation takes place within it own transaction.
@@ -77,7 +77,7 @@ public class AutoCommitTransaction imple
         Transaction txn = null;
         try
         {
-            if(message.isPersistent() && queue.isDurable())
+            if(queue.getMessageDurability().persist(message.isPersistent()))
             {
                 if (_logger.isDebugEnabled())
                 {
@@ -109,7 +109,7 @@ public class AutoCommitTransaction imple
                 ServerMessage message = entry.getMessage();
                 TransactionLogResource queue = entry.getOwningResource();
 
-                if(message.isPersistent() && queue.isDurable())
+                if(queue.getMessageDurability().persist(message.isPersistent()))
                 {
                     if (_logger.isDebugEnabled())
                     {
@@ -146,7 +146,7 @@ public class AutoCommitTransaction imple
         Transaction txn = null;
         try
         {
-            if(message.isPersistent() && queue.isDurable())
+            if(queue.getMessageDurability().persist(message.isPersistent()))
             {
                 if (_logger.isDebugEnabled())
                 {
@@ -175,25 +175,21 @@ public class AutoCommitTransaction imple
         try
         {
 
-            if(message.isPersistent())
+            for(BaseQueue queue : queues)
             {
-                for(BaseQueue queue : queues)
+                if (queue.getMessageDurability().persist(message.isPersistent()))
                 {
-                    if (queue.isDurable())
+                    if (_logger.isDebugEnabled())
                     {
-                        if (_logger.isDebugEnabled())
-                        {
-                            _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
-                        }
-                        if (txn == null)
-                        {
-                            txn = _messageStore.newTransaction();
-                        }
-
-                        txn.enqueueMessage(queue, message);
+                        _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName());
+                    }
+                    if (txn == null)
+                    {
+                        txn = _messageStore.newTransaction();
+                    }
+                    txn.enqueueMessage(queue, message);
 
 
-                    }
                 }
 
             }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java Fri Jul 25 14:24:36 2014
@@ -381,7 +381,7 @@ public class DtxBranch
 
         public boolean isDurable()
         {
-            return _message.isPersistent() && _resource.isDurable();
+            return _resource.getMessageDurability().persist(_message.isPersistent());
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Fri Jul 25 14:24:36 2014
@@ -20,21 +20,21 @@
  */
 package org.apache.qpid.server.txn;
 
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.TransactionLogResource;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.Transaction;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import org.apache.qpid.server.store.TransactionLogResource;
 
 /**
  * A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -97,7 +97,7 @@ public class LocalTransaction implements
         _postTransactionActions.add(postTransactionAction);
         initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
-        if(message.isPersistent() && queue.isDurable())
+        if(queue.getMessageDurability().persist(message.isPersistent()))
         {
             try
             {
@@ -129,7 +129,7 @@ public class LocalTransaction implements
                 ServerMessage message = entry.getMessage();
                 TransactionLogResource queue = entry.getOwningResource();
 
-                if(message.isPersistent() && queue.isDurable())
+                if(queue.getMessageDurability().persist(message.isPersistent()))
                 {
                     if (_logger.isDebugEnabled())
                     {
@@ -186,7 +186,7 @@ public class LocalTransaction implements
         _postTransactionActions.add(postTransactionAction);
         initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
-        if(message.isPersistent() && queue.isDurable())
+        if(queue.getMessageDurability().persist(message.isPersistent()))
         {
             try
             {
@@ -211,29 +211,26 @@ public class LocalTransaction implements
         _postTransactionActions.add(postTransactionAction);
         initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
-        if(message.isPersistent())
+        try
         {
-            try
+            for(BaseQueue queue : queues)
             {
-                for(BaseQueue queue : queues)
+                if(queue.getMessageDurability().persist(message.isPersistent()))
                 {
-                    if(queue.isDurable())
+                    if (_logger.isDebugEnabled())
                     {
-                        if (_logger.isDebugEnabled())
-                        {
-                            _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() );
-                        }
+                        _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getName() );
+                    }
 
-                        beginTranIfNecessary();
-                        _transaction.enqueueMessage(queue, message);
+                    beginTranIfNecessary();
+                    _transaction.enqueueMessage(queue, message);
 
-                    }
                 }
             }
-            catch(RuntimeException e)
-            {
-                tidyUpOnError(e);
-            }
+        }
+        catch(RuntimeException e)
+        {
+            tidyUpOnError(e);
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java Fri Jul 25 14:24:36 2014
@@ -38,6 +38,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.plugin.MessageMetaDataType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
@@ -220,9 +221,9 @@ public class SynchronousMessageStoreReco
                             }
 
                             @Override
-                            public boolean isDurable()
+                            public MessageDurability getMessageDurability()
                             {
-                                return false;
+                                return MessageDurability.DEFAULT;
                             }
                         };
                 txn.dequeueMessage(mockQueue, new DummyMessage(messageId));

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Fri Jul 25 14:24:36 2014
@@ -34,8 +34,6 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
-import org.apache.qpid.server.model.VirtualHostNode;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
@@ -49,6 +47,7 @@ import org.apache.qpid.server.logging.Ev
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
 import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -56,6 +55,7 @@ import org.apache.qpid.server.model.Life
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java Fri Jul 25 14:24:36 2014
@@ -20,16 +20,14 @@
  */
 package org.apache.qpid.server.store;
 
-import static org.mockito.Mockito.mock;
-
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.VirtualHost;
@@ -148,12 +146,6 @@ public abstract class MessageStoreQuotaE
         return _transactionResource;
     }
 
-    @Override
-    public boolean isDurable()
-    {
-        return true;
-    }
-
     private static class TestMessage implements EnqueueableMessage
     {
         private final StoredMessage<?> _handle;
@@ -180,4 +172,10 @@ public abstract class MessageStoreQuotaE
             return _handle;
         }
     }
+
+    @Override
+    public MessageDurability getMessageDurability()
+    {
+        return MessageDurability.DEFAULT;
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java Fri Jul 25 14:24:36 2014
@@ -28,13 +28,14 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.hamcrest.Description;
+import org.mockito.ArgumentMatcher;
+
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.UUIDGenerator;
@@ -45,9 +46,6 @@ import org.apache.qpid.server.store.hand
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.test.utils.QpidTestCase;
 
-import org.hamcrest.Description;
-import org.mockito.ArgumentMatcher;
-
 public abstract class MessageStoreTestCase extends QpidTestCase
 {
     private MessageStore _store;
@@ -117,8 +115,7 @@ public abstract class MessageStoreTestCa
         long messageId = 1;
         int contentSize = 0;
         final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
-        StoreFuture flushFuture = message.flushToStore();
-        flushFuture.waitForCompletion();
+        enqueueMessage(message, "dummyQ");
 
         MessageHandler handler = mock(MessageHandler.class);
         _store.visitMessages(handler);
@@ -127,14 +124,60 @@ public abstract class MessageStoreTestCa
 
     }
 
+    public void enqueueMessage(final StoredMessage<TestMessageMetaData> message, final String queueName)
+    {
+        Transaction txn = _store.newTransaction();
+        txn.enqueueMessage(new TransactionLogResource()
+        {
+            private final UUID _id = UUID.nameUUIDFromBytes(queueName.getBytes());
+
+            @Override
+            public String getName()
+            {
+                return queueName;
+            }
+
+            @Override
+            public UUID getId()
+            {
+                return _id;
+            }
+
+            @Override
+            public MessageDurability getMessageDurability()
+            {
+                return MessageDurability.DEFAULT;
+            }
+        }, new EnqueueableMessage()
+        {
+            @Override
+            public long getMessageNumber()
+            {
+                return message.getMessageNumber();
+            }
+
+            @Override
+            public boolean isPersistent()
+            {
+                return true;
+            }
+
+            @Override
+            public StoredMessage getStoredMessage()
+            {
+                return message;
+            }
+        });
+        txn.commitTran();
+    }
+
     public void testVisitMessagesAborted() throws Exception
     {
         int contentSize = 0;
         for (int i = 0; i < 3; i++)
         {
             final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
-            StoreFuture flushFuture = message.flushToStore();
-            flushFuture.waitForCompletion();
+            enqueueMessage(message, "dummyQ");
         }
 
         MessageHandler handler = mock(MessageHandler.class);
@@ -151,16 +194,16 @@ public abstract class MessageStoreTestCa
         for (int i = 0; i < 3; i++)
         {
             final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
-            StoreFuture flushFuture = message.flushToStore();
-            flushFuture.waitForCompletion();
+            enqueueMessage(message, "dummyQ");
+
         }
 
         reopenStore();
 
         final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize));
 
-        StoreFuture flushFuture = message.flushToStore();
-        flushFuture.waitForCompletion();
+        enqueueMessage(message, "dummyQ");
+
 
         assertTrue("Unexpected message id " + message.getMessageNumber(), message.getMessageNumber() >= 4);
     }
@@ -170,8 +213,6 @@ public abstract class MessageStoreTestCa
         long messageId = 1;
         int contentSize = 0;
         final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
-        StoreFuture flushFuture = message.flushToStore();
-        flushFuture.waitForCompletion();
 
         EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message);
 
@@ -305,8 +346,6 @@ public abstract class MessageStoreTestCa
         long messageId = 1;
         int contentSize = 0;
         final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false));
-        StoreFuture flushFuture = message.flushToStore();
-        flushFuture.waitForCompletion();
 
         MessageHandler handler = mock(MessageHandler.class);
         _store.visitMessages(handler);
@@ -319,8 +358,7 @@ public abstract class MessageStoreTestCa
         long messageId = 1;
         int contentSize = 0;
         final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
-        StoreFuture flushFuture = message.flushToStore();
-        flushFuture.waitForCompletion();
+        enqueueMessage(message, "dummyQ");
 
         final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>();
         _store.visitMessages(new MessageHandler()
@@ -360,7 +398,7 @@ public abstract class MessageStoreTestCa
         TransactionLogResource queue = mock(TransactionLogResource.class);
         when(queue.getId()).thenReturn(queueId);
         when(queue.getName()).thenReturn("testQueue");
-        when(queue.isDurable()).thenReturn(true);
+        when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
         return queue;
     }
 
@@ -391,8 +429,6 @@ public abstract class MessageStoreTestCa
     private EnqueueableMessage createEnqueueableMessage(long messageId1)
     {
         final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0));
-        StoreFuture flushFuture = message1.flushToStore();
-        flushFuture.waitForCompletion();
         EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1);
         return enqueueableMessage1;
     }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java Fri Jul 25 14:24:36 2014
@@ -25,6 +25,7 @@ import java.util.Collections;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.Transaction;
@@ -53,6 +54,7 @@ public class AsyncAutoCommitTransactionT
         when(_messageStore.newTransaction()).thenReturn(_storeTransaction);
         when(_storeTransaction.commitTranAsync()).thenReturn(_future);
         when(_queue.isDurable()).thenReturn(true);
+        when(_queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
     }
 
     public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureAlreadyComplete() throws Exception

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Fri Jul 25 14:24:36 2014
@@ -20,22 +20,23 @@
  */
 package org.apache.qpid.server.txn;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.MockMessageInstance;
+import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
 import org.apache.qpid.test.utils.QpidTestCase;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /**
  * A unit test ensuring that AutoCommitTransaction creates a separate transaction for
  * each dequeue/enqueue operation that involves enlistable messages. Verifies
@@ -428,6 +429,7 @@ public class AutoCommitTransactionTest e
     {
         BaseQueue queue = mock(BaseQueue.class);
         when(queue.isDurable()).thenReturn(durable);
+        when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER);
         return queue;
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Fri Jul 25 14:24:36 2014
@@ -20,22 +20,23 @@
  */
 package org.apache.qpid.server.txn;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.MockMessageInstance;
+import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
 import org.apache.qpid.test.utils.QpidTestCase;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /**
  * A unit test ensuring that LocalTransactionTest creates a long-lived store transaction
  * that spans many dequeue/enqueue operations of enlistable messages.  Verifies
@@ -652,6 +653,7 @@ public class LocalTransactionTest extend
     {
         BaseQueue queue = mock(BaseQueue.class);
         when(queue.isDurable()).thenReturn(durable);
+        when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER);
         return queue;
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java Fri Jul 25 14:24:36 2014
@@ -42,6 +42,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.NullMessageStore;
 import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -379,6 +380,7 @@ public class SynchronousMessageStoreReco
     {
         AMQQueue<?> queue = mock(AMQQueue.class);
         final UUID queueId = UUID.randomUUID();
+        when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
         when(queue.getId()).thenReturn(queueId);
         when(queue.getName()).thenReturn("test-queue");
         when(_virtualHost.getQueue(queueId)).thenReturn(queue);

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Fri Jul 25 14:24:36 2014
@@ -102,12 +102,6 @@ public class MessageConverter_Internal_t
                     }
 
                     @Override
-                    public StoreFuture flushToStore()
-                    {
-                        return StoreFuture.IMMEDIATE_FUTURE;
-                    }
-
-                    @Override
                     public void remove()
                     {
                         throw new UnsupportedOperationException();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Fri Jul 25 14:24:36 2014
@@ -102,12 +102,6 @@ public class MessageConverter_v0_10 impl
                     }
 
                     @Override
-                    public StoreFuture flushToStore()
-                    {
-                        return StoreFuture.IMMEDIATE_FUTURE;
-                    }
-
-                    @Override
                     public void remove()
                     {
                         throw new UnsupportedOperationException();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Jul 25 14:24:36 2014
@@ -334,11 +334,7 @@ public class ServerSessionDelegate exten
 
         int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
 
-        if(enqueues != 0)
-        {
-            storeMessage.flushToStore();
-        }
-        else
+        if(enqueues == 0)
         {
             if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
             {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Jul 25 14:24:36 2014
@@ -34,8 +34,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -93,7 +91,6 @@ import org.apache.qpid.server.protocol.C
 import org.apache.qpid.server.protocol.ConsumerListener;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
@@ -152,9 +149,6 @@ public class AMQChannel<T extends AMQPro
 
     private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
 
-    // Set of messages being acknowledged in the current transaction
-    private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>();
-
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
 
     private ServerTransaction _transaction;
@@ -422,7 +416,6 @@ public class AMQChannel<T extends AMQPro
                         else
                         {
                             incrementOutstandingTxnsIfNecessary();
-                            handle.flushToStore();
                         }
                     }
                 }
@@ -1412,7 +1405,7 @@ public class AMQChannel<T extends AMQPro
             }
             finally
             {
-                _acknowledgedMessages.clear();
+                _ackedMessages.clear();
             }
 
         }
@@ -1435,7 +1428,7 @@ public class AMQChannel<T extends AMQPro
                 }
                 finally
                 {
-                    _acknowledgedMessages.clear();
+                    _ackedMessages.clear();
                 }
             }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Fri Jul 25 14:24:36 2014
@@ -114,12 +114,6 @@ public class MessageConverter_Internal_t
             }
 
             @Override
-            public StoreFuture flushToStore()
-            {
-                return StoreFuture.IMMEDIATE_FUTURE;
-            }
-
-            @Override
             public void remove()
             {
                 throw new UnsupportedOperationException();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Fri Jul 25 14:24:36 2014
@@ -104,11 +104,6 @@ public class MockStoredMessage implement
         return  buf;
     }
 
-    public StoreFuture flushToStore()
-    {
-        return StoreFuture.IMMEDIATE_FUTURE;
-    }
-
     public void remove()
     {
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java Fri Jul 25 14:24:36 2014
@@ -20,15 +20,21 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.util.UUID;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.store.MessageCounter;
+import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 /**
@@ -85,8 +91,9 @@ public class ReferenceCountingTest exten
         final MessageMetaData mmd = new MessageMetaData(info, chb);
 
         StoredMessage storedMessage = _store.addMessage(mmd);
-        storedMessage.flushToStore();
-
+        Transaction txn = _store.newTransaction();
+        txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
+        txn.commitTran();
         AMQMessage message = new AMQMessage(storedMessage);
 
         MessageReference ref = message.newReference();
@@ -151,14 +158,13 @@ public class ReferenceCountingTest exten
         final MessageMetaData mmd = new MessageMetaData(info, chb);
 
         StoredMessage storedMessage = _store.addMessage(mmd);
-        storedMessage.flushToStore();
-
+        Transaction txn = _store.newTransaction();
+        txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
+        txn.commitTran();
         AMQMessage message = new AMQMessage(storedMessage);
 
 
         MessageReference ref = message.newReference();
-        // we call routing complete to set up the handle
-     //   message.routingComplete(_store, _storeContext, new MessageHandleFactory());
 
         assertEquals(1, getStoreMessageCount());
         MessageReference ref2 = message.newReference();
@@ -166,6 +172,54 @@ public class ReferenceCountingTest exten
         assertEquals(1, getStoreMessageCount());
     }
 
+    private TransactionLogResource createTransactionLogResource(final String queueName)
+    {
+        return new TransactionLogResource()
+        {
+            @Override
+            public String getName()
+            {
+                return queueName;
+            }
+
+            @Override
+            public UUID getId()
+            {
+                return UUID.nameUUIDFromBytes(queueName.getBytes());
+            }
+
+            @Override
+            public MessageDurability getMessageDurability()
+            {
+                return MessageDurability.DEFAULT;
+            }
+        };
+    }
+
+    private EnqueueableMessage createEnqueueableMessage(final StoredMessage storedMessage)
+    {
+        return new EnqueueableMessage()
+        {
+            @Override
+            public long getMessageNumber()
+            {
+                return storedMessage.getMessageNumber();
+            }
+
+            @Override
+            public boolean isPersistent()
+            {
+                return true;
+            }
+
+            @Override
+            public StoredMessage getStoredMessage()
+            {
+                return storedMessage;
+            }
+        };
+    }
+
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(ReferenceCountingTest.class);

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Fri Jul 25 14:24:36 2014
@@ -261,12 +261,6 @@ public abstract class MessageConverter_t
                         }
 
                         @Override
-                        public StoreFuture flushToStore()
-                        {
-                            throw new UnsupportedOperationException();
-                        }
-
-                        @Override
                         public void remove()
                         {
                             throw new UnsupportedOperationException();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Fri Jul 25 14:24:36 2014
@@ -159,8 +159,6 @@ public class ReceivingLink_1_0 implement
                 offset += bareMessageBuf.remaining();
             }
 
-            storedMessage.flushToStore();
-
             Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
             MessageReference<Message_1_0> reference = message.newReference();
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Fri Jul 25 14:24:36 2014
@@ -111,12 +111,6 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public StoreFuture flushToStore()
-            {
-                return StoreFuture.IMMEDIATE_FUTURE;
-            }
-
-            @Override
             public void remove()
             {
                 throw new UnsupportedOperationException();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Fri Jul 25 14:24:36 2014
@@ -210,12 +210,6 @@ public class MessageConverter_0_10_to_0_
             }
 
             @Override
-            public StoreFuture flushToStore()
-            {
-                return StoreFuture.IMMEDIATE_FUTURE;
-            }
-
-            @Override
             public void remove()
             {
                 throw new UnsupportedOperationException();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Fri Jul 25 14:24:36 2014
@@ -99,12 +99,6 @@ public class MessageConverter_0_8_to_0_1
             }
 
             @Override
-            public StoreFuture flushToStore()
-            {
-                return StoreFuture.IMMEDIATE_FUTURE;
-            }
-
-            @Override
             public void remove()
             {
                 throw new UnsupportedOperationException();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1613440&r1=1613439&r2=1613440&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Fri Jul 25 14:24:36 2014
@@ -115,12 +115,6 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public StoreFuture flushToStore()
-            {
-                return StoreFuture.IMMEDIATE_FUTURE;
-            }
-
-            @Override
             public void remove()
             {
                 throw new UnsupportedOperationException();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org