You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/05/03 11:43:56 UTC

qpid-broker-j git commit: QPID-7639: Correct large transaction guard accounting

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 085dca339 -> d9afda2db


QPID-7639: Correct large transaction guard accounting


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d9afda2d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d9afda2d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d9afda2d

Branch: refs/heads/master
Commit: d9afda2dbcf92dbd628b4021d887c2428e22a69b
Parents: 085dca3
Author: Keith Wall <kw...@apache.org>
Authored: Wed May 3 12:05:57 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Wed May 3 12:05:57 2017 +0100

----------------------------------------------------------------------
 .../logging/messages/ChannelMessages.java       | 60 --------------------
 .../messages/Channel_logmessages.properties     |  2 -
 .../logging/messages/ConnectionMessages.java    | 60 ++++++++++++++++++++
 .../messages/Connection_logmessages.properties  |  2 +
 .../apache/qpid/server/model/Connection.java    |  5 ++
 .../qpid/server/transport/AMQPConnection.java   |  5 +-
 .../transport/AbstractAMQPConnection.java       | 41 ++++++-------
 .../txn/FlowToDiskTransactionObserver.java      | 37 ++++++------
 .../txn/FlowToDiskTransactionObserverTest.java  | 30 ++++++++++
 .../server/protocol/v0_10/ServerSession.java    |  2 +-
 .../qpid/server/protocol/v0_8/AMQChannel.java   |  4 +-
 .../protocol/v1_0/AMQPConnection_1_0.java       |  2 +-
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   |  4 +-
 .../TxnCoordinatorReceivingLinkEndpoint.java    |  2 +-
 ...er-Appendix-Operational-Logging-Messages.xml | 26 +++++----
 15 files changed, 159 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
index 1c8aa86..3c41e95 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
@@ -76,7 +76,6 @@ public class ChannelMessages
     public static final String IDLE_TXN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.idle_txn";
     public static final String FLOW_ENFORCED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.flow_enforced";
     public static final String FLOW_REMOVED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.flow_removed";
-    public static final String LARGE_TRANSACTION_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.large_transaction_warn";
     public static final String DISCARDMSG_NOALTEXCH_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noaltexch";
 
     static
@@ -95,7 +94,6 @@ public class ChannelMessages
         LoggerFactory.getLogger(IDLE_TXN_LOG_HIERARCHY);
         LoggerFactory.getLogger(FLOW_ENFORCED_LOG_HIERARCHY);
         LoggerFactory.getLogger(FLOW_REMOVED_LOG_HIERARCHY);
-        LoggerFactory.getLogger(LARGE_TRANSACTION_WARN_LOG_HIERARCHY);
         LoggerFactory.getLogger(DISCARDMSG_NOALTEXCH_LOG_HIERARCHY);
 
         _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Channel_logmessages", _currentLocale);
@@ -837,64 +835,6 @@ public class ChannelMessages
 
     /**
      * Log a Channel message of the Format:
-     * <pre>CHN-1013 : Uncommitted transaction contains {0,number} bytes of incoming message data.</pre>
-     * Optional values are contained in [square brackets] and are numbered
-     * sequentially in the method call.
-     *
-     */
-    public static LogMessage LARGE_TRANSACTION_WARN(Number param1)
-    {
-        String rawMessage = _messages.getString("LARGE_TRANSACTION_WARN");
-
-        final Object[] messageArguments = {param1};
-        // Create a new MessageFormat to ensure thread safety.
-        // Sharing a MessageFormat and using applyPattern is not thread safe
-        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
-        final String message = formatter.format(messageArguments);
-
-        return new LogMessage()
-        {
-            public String toString()
-            {
-                return message;
-            }
-
-            public String getLogHierarchy()
-            {
-                return LARGE_TRANSACTION_WARN_LOG_HIERARCHY;
-            }
-
-            @Override
-            public boolean equals(final Object o)
-            {
-                if (this == o)
-                {
-                    return true;
-                }
-                if (o == null || getClass() != o.getClass())
-                {
-                    return false;
-                }
-
-                final LogMessage that = (LogMessage) o;
-
-                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
-            }
-
-            @Override
-            public int hashCode()
-            {
-                int result = toString().hashCode();
-                result = 31 * result + getLogHierarchy().hashCode();
-                return result;
-            }
-        };
-    }
-
-    /**
-     * Log a Channel message of the Format:
      * <pre>CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
index 97f80e8..c873e23 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
@@ -41,7 +41,5 @@ DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}
 
 FLOW_CONTROL_IGNORED = CHN-1012 : Flow Control Ignored. Channel will be closed.
 
-LARGE_TRANSACTION_WARN = CHN-1013 : Uncommitted transaction contains {0,number} bytes of incoming message data.
-
 # 0 - operation name
 OPERATION = CHN-1014 : Operation : {0}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ConnectionMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ConnectionMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ConnectionMessages.java
index 5f8a3a1..8f0b144 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ConnectionMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ConnectionMessages.java
@@ -70,6 +70,7 @@ public class ConnectionMessages
     public static final String MODEL_DELETE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "connection.model_delete";
     public static final String IDLE_CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "connection.idle_close";
     public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "connection.close";
+    public static final String LARGE_TRANSACTION_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "connection.large_transaction_warn";
     public static final String OPEN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "connection.open";
 
     static
@@ -82,6 +83,7 @@ public class ConnectionMessages
         LoggerFactory.getLogger(MODEL_DELETE_LOG_HIERARCHY);
         LoggerFactory.getLogger(IDLE_CLOSE_LOG_HIERARCHY);
         LoggerFactory.getLogger(CLOSE_LOG_HIERARCHY);
+        LoggerFactory.getLogger(LARGE_TRANSACTION_WARN_LOG_HIERARCHY);
         LoggerFactory.getLogger(OPEN_LOG_HIERARCHY);
 
         _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Connection_logmessages", _currentLocale);
@@ -503,6 +505,64 @@ public class ConnectionMessages
 
     /**
      * Log a Connection message of the Format:
+     * <pre>CON-1009 : Uncommitted transaction(s) contains {0,number} bytes of incoming message data exceeding {1,number} bytes limit. Messages will be flown to disk.</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage LARGE_TRANSACTION_WARN(Number param1, Number param2)
+    {
+        String rawMessage = _messages.getString("LARGE_TRANSACTION_WARN");
+
+        final Object[] messageArguments = {param1, param2};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return LARGE_TRANSACTION_WARN_LOG_HIERARCHY;
+            }
+
+            @Override
+            public boolean equals(final Object o)
+            {
+                if (this == o)
+                {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass())
+                {
+                    return false;
+                }
+
+                final LogMessage that = (LogMessage) o;
+
+                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+            }
+
+            @Override
+            public int hashCode()
+            {
+                int result = toString().hashCode();
+                result = 31 * result + getLogHierarchy().hashCode();
+                return result;
+            }
+        };
+    }
+
+    /**
+     * Log a Connection message of the Format:
      * <pre>CON-1001 : Open : Destination : {0}({1}) : Protocol Version : {2}[ : SSL][ : Client ID : {3}][ : Client Version : {4}][ : Client Product : {5}]</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties
index 2bd3dd6..4e6564e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties
@@ -27,3 +27,5 @@ CLIENT_VERSION_REJECT = CON-1006 : Client version "{0}" rejected by validation
 MODEL_DELETE = CON-1007 : Connection close initiated by operator
 # 0 - operation name
 OPERATION = CON-1008 : Operation : {0}
+LARGE_TRANSACTION_WARN = CON-1009 : Uncommitted transaction(s) contains {0,number} bytes of incoming message data exceeding {1,number} bytes limit. Messages will be flown to disk.
+

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index a0b96b8..1da929a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -108,6 +108,11 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
     @DerivedAttribute
     Port<?> getPort();
 
+    @DerivedAttribute(description = "The maximum size in bytes that uncommitted transactions associated with this connection"
+                                    + " may grow before the messages contained within the transactions will be flown to disk. "
+                                    + " Disabled if negative.")
+    long getMaxUncommittedInMemorySize();
+
     @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Inbound")
     long getBytesIn();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 5b74264..48e7cc5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -33,11 +33,12 @@ import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.session.AMQPSession;
+import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.TransactionObserver;
 import org.apache.qpid.server.util.Deletable;
 
 public interface AMQPConnection<C extends AMQPConnection<C>>
-        extends Connection<C>, Deletable<C>, EventLoggerProvider, TransactionObserver
+        extends Connection<C>, Deletable<C>, EventLoggerProvider
 {
     Broker<?> getBroker();
 
@@ -81,6 +82,8 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
 
     AggregateTicker getAggregateTicker();
 
+    LocalTransaction createLocalTransaction();
+
     enum CloseReason
     {
         MANAGEMENT,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 3252980..36fb0d5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -73,6 +73,7 @@ import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.FlowToDiskTransactionObserver;
+import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.TransactionObserver;
 import org.apache.qpid.server.util.Action;
@@ -124,6 +125,7 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
     private final AtomicLong _maxMessageSize = new AtomicLong(Long.MAX_VALUE);
     private volatile int _messageCompressionThreshold;
     private volatile TransactionObserver _transactionObserver;
+    private long _maxUncommittedInMemorySize;
 
     public AbstractAMQPConnection(Broker<?> broker,
                                   ServerNetworkConnection network,
@@ -212,11 +214,8 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
         SlowConnectionOpenTicker slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
         _aggregateTicker.addTicker(slowConnectionOpenTicker);
         _lastReadTime = _lastWriteTime = getCreatedTime().getTime();
-        _transactionObserver = new FlowToDiskTransactionObserver(getContextValue(Long.class,
-                                                                                 Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE),
-                                                                 _logSubject,
-                                                                 _eventLoggerProvider.getEventLogger());
-
+        _maxUncommittedInMemorySize = getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+        _transactionObserver = _maxUncommittedInMemorySize < 0 ? FlowToDiskTransactionObserver.NOOP_TRANSACTION_OBSERVER : new FlowToDiskTransactionObserver(_maxUncommittedInMemorySize, _logSubject, _eventLoggerProvider.getEventLogger());
         logConnectionOpen();
     }
 
@@ -847,6 +846,12 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
     }
 
     @Override
+    public long getMaxUncommittedInMemorySize()
+    {
+        return _maxUncommittedInMemorySize;
+    }
+
+    @Override
     public String toString()
     {
         return getNetwork().getRemoteAddress() + "(" + ((getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName()) + ")");
@@ -872,6 +877,13 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
 
     }
 
+    @Override
+    public LocalTransaction createLocalTransaction()
+    {
+        return new LocalTransaction(getAddressSpace().getMessageStore(),
+                                    () -> getLastReadTime(),
+                                    _transactionObserver);
+    }
 
     private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener
     {
@@ -948,23 +960,4 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
         return getNetwork().getPeerPrincipal();
     }
 
-    @Override
-    public void onMessageEnqueue(final ServerTransaction transaction,
-                                 final EnqueueableMessage<? extends StorableMessageMetaData> message)
-    {
-        _transactionObserver.onMessageEnqueue(transaction, message);
-    }
-
-    @Override
-    public void onDischarge(final ServerTransaction transaction)
-    {
-        _transactionObserver.onDischarge(transaction);
-    }
-
-    @Override
-    public void reset()
-    {
-        _transactionObserver.reset();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
index 644c73d..d794238 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
@@ -40,6 +40,7 @@ public class FlowToDiskTransactionObserver implements TransactionObserver
     private final LogSubject _logSubject;
     private final EventLogger _eventLogger;
     private final long _maxUncommittedInMemorySize;
+    private volatile boolean _reported;
 
     public FlowToDiskTransactionObserver(final long maxUncommittedInMemorySize,
                                          final LogSubject logSubject,
@@ -57,34 +58,30 @@ public class FlowToDiskTransactionObserver implements TransactionObserver
                                  final EnqueueableMessage<? extends StorableMessageMetaData> message)
     {
         StoredMessage<? extends StorableMessageMetaData> handle = message.getStoredMessage();
-        long size = handle.getContentSize() + handle.getMetaData().getStorableSize();
-        long uncommittedMessages = _uncommittedMessageSize.accumulateAndGet(size,
-                                                                            (left, right) -> left + right);
-        if (uncommittedMessages > _maxUncommittedInMemorySize)
+        long messageSize = handle.getContentSize() + handle.getMetaData().getStorableSize();
+
+        long newUncommittedSize = _uncommittedMessageSize.get() + messageSize;
+        if (newUncommittedSize > _maxUncommittedInMemorySize)
         {
             handle.flowToDisk();
-            if(!_uncommittedMessages.isEmpty() || uncommittedMessages == size)
+            if (!_reported)
             {
-                _eventLogger.message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(uncommittedMessages));
+                _eventLogger.message(_logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, _maxUncommittedInMemorySize));
+                _reported = true;
             }
 
-            if(!_uncommittedMessages.isEmpty())
+            if (!_uncommittedMessages.isEmpty())
             {
                 for (TransactionDetails transactionDetails : _uncommittedMessages.values())
                 {
                     transactionDetails.flowToDisk();
                 }
-                _uncommittedMessages.clear();
             }
         }
         else
         {
-            TransactionDetails newDetails = new TransactionDetails();
-            TransactionDetails details = _uncommittedMessages.putIfAbsent(transaction, newDetails);
-            if (details == null)
-            {
-                details = newDetails;
-            }
+            _uncommittedMessageSize.addAndGet(messageSize);
+            TransactionDetails details = _uncommittedMessages.computeIfAbsent(transaction, key -> new TransactionDetails());
             details.messageEnqueued(handle);
         }
     }
@@ -95,10 +92,13 @@ public class FlowToDiskTransactionObserver implements TransactionObserver
         TransactionDetails transactionDetails = _uncommittedMessages.remove(transaction);
         if (transactionDetails != null)
         {
-            _uncommittedMessageSize.accumulateAndGet(transactionDetails.getUncommittedMessageSize(),
-                                                     (left, right) -> left - right);
+            _uncommittedMessageSize.addAndGet(-transactionDetails.getUncommittedMessageSize());
 
         }
+        if (_maxUncommittedInMemorySize > _uncommittedMessageSize.get())
+        {
+            _reported = false;
+        }
     }
 
     @Override
@@ -122,7 +122,7 @@ public class FlowToDiskTransactionObserver implements TransactionObserver
         private void messageEnqueued(StoredMessage<? extends StorableMessageMetaData> handle)
         {
             long size = handle.getContentSize() + handle.getMetaData().getStorableSize();
-            _uncommittedMessageSize.accumulateAndGet(size, (left, right) -> left + right);
+            _uncommittedMessageSize.addAndGet(size);
             _uncommittedMessages.add(handle);
         }
 
@@ -132,6 +132,7 @@ public class FlowToDiskTransactionObserver implements TransactionObserver
             {
                 uncommittedHandle.flowToDisk();
             }
+            _uncommittedMessages.clear();
         }
 
         private long getUncommittedMessageSize()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserverTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserverTest.java b/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserverTest.java
index 8e8eed5..04c86c0 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserverTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserverTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -105,6 +106,35 @@ public class FlowToDiskTransactionObserverTest extends QpidTestCase
         verify(_eventLogger, never()).message(same(_logSubject), any(LogMessage.class));
     }
 
+    public void testBreachLimitTwice() throws Exception
+    {
+        EnqueueableMessage<?> message1 = createMessage(MAX_UNCOMMITTED_IN_MEMORY_SIZE + 1);
+
+        _flowToDiskMessageObserver.onMessageEnqueue(_transaction, message1);
+
+        StoredMessage handle1 = message1.getStoredMessage();
+        verify(handle1).flowToDisk();
+        verify(_eventLogger, times(1)).message(same(_logSubject), any(LogMessage.class));
+
+        _flowToDiskMessageObserver.onDischarge(_transaction);
+
+        EnqueueableMessage<?> message2 = createMessage(MAX_UNCOMMITTED_IN_MEMORY_SIZE / 2);
+        EnqueueableMessage<?> message3 = createMessage((MAX_UNCOMMITTED_IN_MEMORY_SIZE / 2) + 1);
+
+        _flowToDiskMessageObserver.onMessageEnqueue(_transaction, message2);
+
+        StoredMessage handle2 = message2.getStoredMessage();
+        verify(handle2, never()).flowToDisk();
+
+        _flowToDiskMessageObserver.onMessageEnqueue(_transaction, message3);
+
+        StoredMessage handle3 = message3.getStoredMessage();
+        verify(handle2).flowToDisk();
+        verify(handle3).flowToDisk();
+
+        verify(_eventLogger, times(2)).message(same(_logSubject), any(LogMessage.class));
+    }
+
     private EnqueueableMessage<?> createMessage(int size)
     {
         EnqueueableMessage message = mock(EnqueueableMessage.class);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 48e37c5..805b54b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1228,7 +1228,7 @@ public class ServerSession extends SessionInvoker
 
     public void selectTx()
     {
-        _transaction = new LocalTransaction(this.getMessageStore(), getAMQPConnection());
+        _transaction = getConnection().getAmqpConnection().createLocalTransaction();
         _txnStarts.incrementAndGet();
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 2aaa69b..61c8c26 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -284,9 +284,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
     /** Sets this channel to be part of a local transaction */
     private void setLocalTransactional()
     {
-        _transaction = new LocalTransaction(_messageStore,
-                                            () -> _connection.getLastReadTime(),
-                                            _connection);
+        _transaction = _connection.createLocalTransaction();
         _txnStarts.incrementAndGet();
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index c75bd2f..9ea7a11 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -74,7 +74,7 @@ public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQ
     void close(Error error);
 
     Iterator<IdentifiedTransaction> getOpenTransactions();
-    IdentifiedTransaction createLocalTransaction();
+    IdentifiedTransaction createIdentifiedTransaction();
     ServerTransaction getTransaction(int txnId);
     void removeTransaction(int txnId);
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 7aac360..07777a4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -1837,7 +1837,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     }
 
     @Override
-    public IdentifiedTransaction createLocalTransaction()
+    public IdentifiedTransaction createIdentifiedTransaction()
     {
         ServerTransaction[] openTransactions = _openTransactions;
         final int maxOpenTransactions = openTransactions.length;
@@ -1861,7 +1861,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
 
         }
 
-        final LocalTransaction serverTransaction = new LocalTransaction(getAddressSpace().getMessageStore(), this);
+        final LocalTransaction serverTransaction = createLocalTransaction();
 
         _openTransactions[id] = serverTransaction;
         return new IdentifiedTransaction(id, serverTransaction);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 4ad9bce..9bb3406 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -122,7 +122,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
                     Session_1_0 session = getSession();
                     if(command instanceof Declare)
                     {
-                        final IdentifiedTransaction txn = session.getConnection().createLocalTransaction();
+                        final IdentifiedTransaction txn = session.getConnection().createIdentifiedTransaction();
                         _createdTransactions.put(txn.getId(), txn.getServerTransaction());
 
                         Declared state = new Declared();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/doc/java-broker/src/docbkx/Java-Broker-Appendix-Operational-Logging-Messages.xml
----------------------------------------------------------------------
diff --git a/doc/java-broker/src/docbkx/Java-Broker-Appendix-Operational-Logging-Messages.xml b/doc/java-broker/src/docbkx/Java-Broker-Appendix-Operational-Logging-Messages.xml
index 7f9baa1..16e6fa9 100644
--- a/doc/java-broker/src/docbkx/Java-Broker-Appendix-Operational-Logging-Messages.xml
+++ b/doc/java-broker/src/docbkx/Java-Broker-Appendix-Operational-Logging-Messages.xml
@@ -839,6 +839,22 @@
             Operator using manangement.</para>
           </entry>
         </row>
+        <row xml:id="Java-Broker-Appendix-Operation-Logging-Message-CON-1009">
+          <entry morerows="1">CON-1009</entry>
+          <entry>Uncommitted transaction(s) contains <replaceable>size</replaceable> bytes of incoming message data
+            exceeding <replaceable>size</replaceable> bytes limit. Messages will be flown to disk.</entry>
+        </row>
+        <row>
+          <entry>
+            <para>Warns that the transactions associated with this connection contain so much uncommitted data that
+              a threshold has been breached.  The connection responds by flowing the messages already associated with the
+              transactions and any new messages to disk.  The connection reverts to normal behaviour once the
+              quantity of uncommitted data falls beneath the threshold.  Normally this happens when the transactions
+              commit or rollback.
+            </para>
+          </entry>
+        </row>
+
       </tbody>
     </tgroup>
   </table>
@@ -982,16 +998,6 @@
             </para>
           </entry>
         </row>
-        <row xml:id="Java-Broker-Appendix-Operation-Logging-Message-CHN-1013">
-          <entry morerows="1">CHN-1013</entry>
-          <entry>Uncommitted transaction contains <replaceable>size</replaceable> bytes of incoming message data.</entry>
-        </row>
-        <row>
-          <entry>
-              <para>Warns about uncommitted transaction with large message(s)
-              </para>
-          </entry>
-        </row>
       </tbody>
     </tgroup>
   </table>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org