You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/04/25 16:09:52 UTC

qpid-broker-j git commit: QPID-7639: [AMQP 1.0] Implement large transaction guard honouring "connection.maxUncommittedInMemorySize" and flowing messages to disk on breaching threashold

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 4e6a665df -> 20a23dc68


QPID-7639: [AMQP 1.0] Implement large transaction guard honouring "connection.maxUncommittedInMemorySize" and flowing messages to disk on breaching threashold


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/20a23dc6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/20a23dc6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/20a23dc6

Branch: refs/heads/master
Commit: 20a23dc68714c3635028378e10e12d598c5cc1b5
Parents: 4e6a665
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Apr 25 16:46:56 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Apr 25 17:09:10 2017 +0100

----------------------------------------------------------------------
 .../server/session/AbstractAMQPSession.java     |   7 +-
 .../qpid/server/txn/LocalTransaction.java       | 101 ++++++++++++++++++-
 .../txn/FlowToDiskMessageObserverTest.java      |  87 ++++++++++++++++
 .../qpid/server/txn/LocalTransactionTest.java   |  20 ++++
 .../server/protocol/v0_10/ServerSession.java    |  54 +---------
 .../server/protocol/v0_10/Session_0_10.java     |   5 -
 .../qpid/server/protocol/v0_8/AMQChannel.java   |  59 ++---------
 .../protocol/v1_0/AMQPConnection_1_0.java       |   2 +-
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   |   9 +-
 .../TxnCoordinatorReceivingLinkEndpoint.java    |   9 +-
 10 files changed, 234 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 2410ef9..5f61e7d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -81,7 +81,7 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
     protected final SecurityToken _token;
     protected final PublishAuthorisationCache _publishAuthCache;
 
-    protected final long _maxUncommittedInMemorySize;
+    private final long _maxUncommittedInMemorySize;
 
     protected final LogSubject _logSubject;
 
@@ -411,6 +411,11 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
         }
     }
 
+    public long getMaxUncommittedInMemorySize()
+    {
+        return _maxUncommittedInMemorySize;
+    }
+
     protected abstract void updateBlockedStateIfNecessary();
 
     public abstract boolean isClosing();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 9c96083..1d4e7b5 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -29,11 +29,16 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -48,7 +53,8 @@ public class LocalTransaction implements ServerTransaction
 {
     protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class);
 
-    private final List<Action> _postTransactionActions = new ArrayList<Action>();
+    private final List<Action> _postTransactionActions = new ArrayList<>();
+    private final MessageObserver _messageObserver;
 
     private volatile Transaction _transaction;
     private final ActivityTimeAccessor _activityTime;
@@ -60,6 +66,11 @@ public class LocalTransaction implements ServerTransaction
 
     public LocalTransaction(MessageStore transactionLog)
     {
+        this(transactionLog, MessageObserver.NOOP_MESSAGE_OBSERVER);
+    }
+
+    public LocalTransaction(MessageStore transactionLog, MessageObserver messageObserver)
+    {
         this(transactionLog, new ActivityTimeAccessor()
         {
             @Override
@@ -67,13 +78,16 @@ public class LocalTransaction implements ServerTransaction
             {
                 return System.currentTimeMillis();
             }
-        });
+        }, messageObserver);
     }
 
-    public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime)
+    public LocalTransaction(MessageStore transactionLog,
+                            ActivityTimeAccessor activityTime,
+                            MessageObserver messageObserver)
     {
         _transactionLog = transactionLog;
         _activityTime = activityTime;
+        _messageObserver = messageObserver;
     }
 
     @Override
@@ -262,6 +276,7 @@ public class LocalTransaction implements ServerTransaction
                 });
             }
         }
+        _messageObserver.onMessageEnqueue(message);
     }
 
     public void enqueue(Collection<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction)
@@ -332,6 +347,7 @@ public class LocalTransaction implements ServerTransaction
             }
             tidyUpOnError(e);
         }
+        _messageObserver.onMessageEnqueue(message);
     }
 
     public void commit()
@@ -513,6 +529,7 @@ public class LocalTransaction implements ServerTransaction
 
     private void resetDetails()
     {
+        _messageObserver.reset();
         _asyncTran = null;
         _transaction = null;
         _postTransactionActions.clear();
@@ -540,4 +557,82 @@ public class LocalTransaction implements ServerTransaction
     {
         return _isRollbackOnly;
     }
+
+
+    public interface MessageObserver
+    {
+        MessageObserver NOOP_MESSAGE_OBSERVER = new NoopMessageObserver();
+
+        void onMessageEnqueue(EnqueueableMessage<? extends StorableMessageMetaData> message);
+
+        void reset();
+    }
+
+    private static class NoopMessageObserver implements MessageObserver
+    {
+        @Override
+        public void onMessageEnqueue(final EnqueueableMessage<? extends StorableMessageMetaData> message)
+        {
+            // noop
+        }
+
+        @Override
+        public void reset()
+        {
+            // noop
+        }
+    }
+
+    public static class FlowToDiskMessageObserver implements MessageObserver
+    {
+        private volatile long _uncommittedMessageSize;
+        private final List<StoredMessage<? extends StorableMessageMetaData>> _uncommittedMessages = new ArrayList<>();
+        private final LogSubject _logSubject;
+        private final EventLogger _eventLogger;
+        private final long _maxUncommittedInMemorySize;
+
+        public FlowToDiskMessageObserver(final long maxUncommittedInMemorySize,
+                                         final LogSubject logSubject,
+                                         final EventLogger eventLogger)
+        {
+            _logSubject = logSubject;
+            _eventLogger = eventLogger;
+            _maxUncommittedInMemorySize = maxUncommittedInMemorySize;
+        }
+
+        @Override
+        public void onMessageEnqueue(final EnqueueableMessage<? extends StorableMessageMetaData> message)
+        {
+            StoredMessage<? extends StorableMessageMetaData> handle = message.getStoredMessage();
+            _uncommittedMessageSize += handle.getContentSize();
+            if (_uncommittedMessageSize > _maxUncommittedInMemorySize)
+            {
+                handle.flowToDisk();
+                if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize())
+                {
+                    _eventLogger.message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+                }
+
+                if(!_uncommittedMessages.isEmpty())
+                {
+                    for (StoredMessage<? extends StorableMessageMetaData> uncommittedHandle : _uncommittedMessages)
+                    {
+                        uncommittedHandle.flowToDisk();
+                    }
+                    _uncommittedMessages.clear();
+                }
+            }
+            else
+            {
+                _uncommittedMessages.add(handle);
+            }
+        }
+
+        @Override
+        public void reset()
+        {
+            _uncommittedMessageSize = 0L;
+            _uncommittedMessages.clear();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java b/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java
new file mode 100644
index 0000000..b402e9c
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java
@@ -0,0 +1,87 @@
+package org.apache.qpid.server.txn;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class FlowToDiskMessageObserverTest extends QpidTestCase
+{
+    private static final int MAX_UNCOMMITTED_IN_MEMORY_SIZE = 100;
+    private LocalTransaction.FlowToDiskMessageObserver _flowToDiskMessageObserver;
+    private EventLogger _eventLogger    ;
+    private LogSubject _logSubject;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _eventLogger = mock(EventLogger.class);
+        _logSubject = mock(LogSubject.class);
+        _flowToDiskMessageObserver = new LocalTransaction.FlowToDiskMessageObserver(MAX_UNCOMMITTED_IN_MEMORY_SIZE,
+                                                                                    _logSubject,
+                                                                                    _eventLogger);
+    }
+
+    public void testOnMessageEnqueue() throws Exception
+    {
+        EnqueueableMessage<?> message1 = createMessage(MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+        EnqueueableMessage<?> message2 = createMessage(1);
+        EnqueueableMessage<?> message3 = createMessage(1);
+
+        _flowToDiskMessageObserver.onMessageEnqueue(message1);
+
+        StoredMessage handle1 = message1.getStoredMessage();
+        verify(handle1, never()).flowToDisk();
+        verify(_eventLogger, never()).message(same(_logSubject), any(LogMessage.class));
+
+        _flowToDiskMessageObserver.onMessageEnqueue(message2);
+
+        StoredMessage handle2 = message2.getStoredMessage();
+        verify(handle1).flowToDisk();
+        verify(handle2).flowToDisk();
+        verify(_eventLogger).message(same(_logSubject), any(LogMessage.class));
+
+        _flowToDiskMessageObserver.onMessageEnqueue(message3);
+
+        StoredMessage handle3 = message2.getStoredMessage();
+        verify(handle1).flowToDisk();
+        verify(handle2).flowToDisk();
+        verify(handle3).flowToDisk();
+        verify(_eventLogger).message(same(_logSubject), any(LogMessage.class));
+    }
+
+    public void testReset() throws Exception
+    {
+        EnqueueableMessage<?> message1 = createMessage(MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+        EnqueueableMessage<?> message2 = createMessage(1);
+
+        _flowToDiskMessageObserver.onMessageEnqueue(message1);
+        _flowToDiskMessageObserver.reset();
+        _flowToDiskMessageObserver.onMessageEnqueue(message2);
+
+        StoredMessage handle1 = message1.getStoredMessage();
+        StoredMessage handle2 = message2.getStoredMessage();
+        verify(handle1, never()).flowToDisk();
+        verify(handle2, never()).flowToDisk();
+        verify(_eventLogger, never()).message(same(_logSubject), any(LogMessage.class));
+    }
+
+    private EnqueueableMessage<?> createMessage(int size)
+    {
+        EnqueueableMessage message = mock(EnqueueableMessage.class);
+        StoredMessage handle = mock(StoredMessage.class);
+        when(message.getStoredMessage()).thenReturn(handle);
+        when(handle.getContentSize()).thenReturn(size);
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
index 4cc20a6..b4db5c1 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.txn;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -602,6 +604,24 @@ public class LocalTransactionTest extends QpidTestCase
         assertEquals("Transaction update time should be reset after rollback", 0, _transaction.getTransactionUpdateTime());
     }
 
+    public void testEnqueueInvokesMessageObserver() throws Exception
+    {
+
+        final LocalTransaction.MessageObserver messageObserver = mock(LocalTransaction.MessageObserver.class);
+        _transaction = new LocalTransaction(_transactionLog, messageObserver);
+
+        _message = createTestMessage(true);
+        _queues = createTestBaseQueues(new boolean[] {false, true, false, true});
+
+        _transaction.enqueue(_queues, _message, _action1);
+
+        verify(messageObserver).onMessageEnqueue(_message);
+
+        _transaction.enqueue(createQueue(true), _message, _action1);
+
+        verify(messageObserver, times(2)).onMessageEnqueue(_message);
+    }
+
     private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
     {
         Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index afc69ec..23fcb13 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -86,7 +86,6 @@ import org.apache.qpid.server.protocol.v0_10.transport.*;
 import org.apache.qpid.server.protocol.v0_10.transport.Xid;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.protocol.v0_10.transport.Frame;
 import org.apache.qpid.server.txn.*;
@@ -156,9 +155,6 @@ public class ServerSession extends SessionInvoker
     private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>();
 
     private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
-    private volatile long _uncommittedMessageSize;
-
-    private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
 
     public ServerSession(ServerConnection connection, ServerSessionDelegate delegate, Binary name, long expiry)
     {
@@ -969,47 +965,9 @@ public class ServerSession extends SessionInvoker
         int enqueues = result.send(_transaction, null);
         getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
         incrementOutstandingTxnsIfNecessary();
-        incrementUncommittedMessageSize(message.getStoredMessage());
         return enqueues;
     }
 
-    private void resetUncommittedMessages()
-    {
-        _uncommittedMessageSize = 0l;
-        _uncommittedMessages.clear();
-    }
-
-
-    private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData_0_10> handle)
-    {
-        if (isTransactional() && !(_transaction instanceof DistributedTransaction))
-        {
-            _uncommittedMessageSize += handle.getContentSize();
-            if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
-            {
-                handle.flowToDisk();
-                if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize())
-                {
-                    getAMQPConnection().getEventLogger()
-                                       .message(getLogSubject(), ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
-                }
-
-                if(!_uncommittedMessages.isEmpty())
-                {
-                    for (StoredMessage<MessageMetaData_0_10> uncommittedHandle : _uncommittedMessages)
-                    {
-                        uncommittedHandle.flowToDisk();
-                    }
-                    _uncommittedMessages.clear();
-                }
-            }
-            else
-            {
-                _uncommittedMessages.add(handle);
-            }
-        }
-    }
-
     public void sendMessage(MessageTransfer xfr,
                             Runnable postIdSettingAction)
     {
@@ -1270,7 +1228,10 @@ public class ServerSession extends SessionInvoker
 
     public void selectTx()
     {
-        _transaction = new LocalTransaction(this.getMessageStore());
+        _transaction = new LocalTransaction(this.getMessageStore(),
+                                            new LocalTransaction.FlowToDiskMessageObserver(_modelObject.getMaxUncommittedInMemorySize(),
+                                                                                           getLogSubject(),
+                                                                                           getAMQPConnection().getEventLogger()));
         _txnStarts.incrementAndGet();
     }
 
@@ -1377,7 +1338,6 @@ public class ServerSession extends SessionInvoker
         _txnCommits.incrementAndGet();
         _txnStarts.incrementAndGet();
         decrementOutstandingTxnsIfNecessary();
-        resetUncommittedMessages();
     }
 
     public void rollback()
@@ -1387,7 +1347,6 @@ public class ServerSession extends SessionInvoker
         _txnRejects.incrementAndGet();
         _txnStarts.incrementAndGet();
         decrementOutstandingTxnsIfNecessary();
-        resetUncommittedMessages();
     }
 
 
@@ -1818,11 +1777,6 @@ public class ServerSession extends SessionInvoker
                                               AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
     }
 
-    public final long getMaxUncommittedInMemorySize()
-    {
-        return _modelObject.getMaxUncommittedInMemorySize();
-    }
-
     private class ResultFuture<T> implements Future<T>
     {
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index ae69352..649d290 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -196,9 +196,4 @@ public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarg
     {
         return _serverSession;
     }
-
-    long getMaxUncommittedInMemorySize()
-    {
-        return _maxUncommittedInMemorySize;
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 84ae994..67b3406 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -183,8 +183,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
     private long _blockingTimeout;
     private boolean _confirmOnPublish;
     private long _confirmedMessageCounter;
-    private volatile long _uncommittedMessageSize;
-    private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>();
 
     private boolean _wireBlockingState;
 
@@ -287,14 +285,12 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
     /** Sets this channel to be part of a local transaction */
     private void setLocalTransactional()
     {
-        _transaction = new LocalTransaction(_messageStore, new ActivityTimeAccessor()
-        {
-            @Override
-            public long getActivityTime()
-            {
-                return _connection.getLastReadTime();
-            }
-        });
+        _transaction = new LocalTransaction(_messageStore,
+                                            () -> _connection.getLastReadTime(),
+                                            new LocalTransaction.FlowToDiskMessageObserver(
+                                                    getMaxUncommittedInMemorySize(),
+                                                    _logSubject,
+                                                    getEventLogger()));
         _txnStarts.incrementAndGet();
     }
 
@@ -473,7 +469,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                                                                        .createBasicAckBody(_confirmedMessageCounter, false);
                                 _connection.writeFrame(responseBody.generateFrame(_channelId));
                             }
-                            incrementUncommittedMessageSize(storedMessage);
                             incrementOutstandingTxnsIfNecessary();
                         }
 
@@ -503,35 +498,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
     }
 
-    private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData> handle)
-    {
-        if (isTransactional())
-        {
-            _uncommittedMessageSize += handle.getContentSize();
-            if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
-            {
-                handle.flowToDisk();
-                if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize())
-                {
-                    messageWithSubject(ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
-                }
-
-                if(!_uncommittedMessages.isEmpty())
-                {
-                    for (StoredMessage<MessageMetaData> uncommittedHandle : _uncommittedMessages)
-                    {
-                        uncommittedHandle.flowToDisk();
-                    }
-                    _uncommittedMessages.clear();
-                }
-            }
-            else
-            {
-                _uncommittedMessages.add(handle);
-            }
-        }
-    }
-
     /**
      * Either throws a {@link AMQConnectionException} or returns the message
      *
@@ -1193,13 +1159,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
             _txnStarts.incrementAndGet();
             decrementOutstandingTxnsIfNecessary();
         }
-        resetUncommittedMessages();
-    }
-
-    private void resetUncommittedMessages()
-    {
-        _uncommittedMessageSize = 0l;
-        _uncommittedMessages.clear();
     }
 
     private void rollback(Runnable postRollbackTask)
@@ -1221,7 +1180,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
             _txnRejects.incrementAndGet();
             _txnStarts.incrementAndGet();
             decrementOutstandingTxnsIfNecessary();
-            resetUncommittedMessages();
         }
 
         postRollbackTask.run();
@@ -1324,11 +1282,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
         return _currentMessage != null;
     }
 
-    private long getMaxUncommittedInMemorySize()
-    {
-        return _maxUncommittedInMemorySize;
-    }
-
     public boolean isChannelFlow()
     {
         return _channelFlow;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index c75bd2f..b2ab892 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -74,7 +74,7 @@ public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQ
     void close(Error error);
 
     Iterator<IdentifiedTransaction> getOpenTransactions();
-    IdentifiedTransaction createLocalTransaction();
+    IdentifiedTransaction createLocalTransaction(final Session_1_0 transactionSession);
     ServerTransaction getTransaction(int txnId);
     void removeTransaction(int txnId);
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 7be265d..7fa95d0 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -1837,7 +1837,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     }
 
     @Override
-    public IdentifiedTransaction createLocalTransaction()
+    public IdentifiedTransaction createLocalTransaction(final Session_1_0 transactionSession)
     {
         ServerTransaction[] openTransactions = _openTransactions;
         final int maxOpenTransactions = openTransactions.length;
@@ -1862,7 +1862,12 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         }
 
         final LocalTransaction serverTransaction =
-                new LocalTransaction(getAddressSpace().getMessageStore());
+                new LocalTransaction(getAddressSpace().getMessageStore(),
+                                     new LocalTransaction.FlowToDiskMessageObserver(
+                                             transactionSession.getMaxUncommittedInMemorySize(),
+                                             transactionSession.getLogSubject(),
+                                             transactionSession.getEventLogger()));
+
         _openTransactions[id] = serverTransaction;
         return new IdentifiedTransaction(id, serverTransaction);
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 6b9ee81..a2f4d2d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -119,16 +119,17 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
                     amqpValueSectionFound = true;
                     Object command = section.getValue();
 
+                    Session_1_0 session = getSession();
                     if(command instanceof Declare)
                     {
-                        final IdentifiedTransaction txn = getSession().getConnection().createLocalTransaction();
+                        final IdentifiedTransaction txn = session.getConnection().createLocalTransaction(session);
                         _createdTransactions.put(txn.getId(), txn.getServerTransaction());
 
                         Declared state = new Declared();
 
-                        getSession().incrementStartedTransactions();
+                        session.incrementStartedTransactions();
 
-                        state.setTxnId(getSession().integerToBinary(txn.getId()));
+                        state.setTxnId(session.integerToBinary(txn.getId()));
                         updateDisposition(deliveryTag, state, true);
 
                     }
@@ -136,7 +137,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
                     {
                         Discharge discharge = (Discharge) command;
 
-                        final Error error = discharge(getSession().binaryToInteger(discharge.getTxnId()),
+                        final Error error = discharge(session.binaryToInteger(discharge.getTxnId()),
                                                       Boolean.TRUE.equals(discharge.getFail()));
                         updateDisposition(deliveryTag, error == null ? new Accepted() : null, true);
                         return error;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org