You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/05/03 11:43:56 UTC
qpid-broker-j git commit: QPID-7639: Correct large transaction guard
accounting
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 085dca339 -> d9afda2db
QPID-7639: Correct large transaction guard accounting
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d9afda2d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d9afda2d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d9afda2d
Branch: refs/heads/master
Commit: d9afda2dbcf92dbd628b4021d887c2428e22a69b
Parents: 085dca3
Author: Keith Wall <kw...@apache.org>
Authored: Wed May 3 12:05:57 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Wed May 3 12:05:57 2017 +0100
----------------------------------------------------------------------
.../logging/messages/ChannelMessages.java | 60 --------------------
.../messages/Channel_logmessages.properties | 2 -
.../logging/messages/ConnectionMessages.java | 60 ++++++++++++++++++++
.../messages/Connection_logmessages.properties | 2 +
.../apache/qpid/server/model/Connection.java | 5 ++
.../qpid/server/transport/AMQPConnection.java | 5 +-
.../transport/AbstractAMQPConnection.java | 41 ++++++-------
.../txn/FlowToDiskTransactionObserver.java | 37 ++++++------
.../txn/FlowToDiskTransactionObserverTest.java | 30 ++++++++++
.../server/protocol/v0_10/ServerSession.java | 2 +-
.../qpid/server/protocol/v0_8/AMQChannel.java | 4 +-
.../protocol/v1_0/AMQPConnection_1_0.java | 2 +-
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 4 +-
.../TxnCoordinatorReceivingLinkEndpoint.java | 2 +-
...er-Appendix-Operational-Logging-Messages.xml | 26 +++++----
15 files changed, 159 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
index 1c8aa86..3c41e95 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
@@ -76,7 +76,6 @@ public class ChannelMessages
public static final String IDLE_TXN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.idle_txn";
public static final String FLOW_ENFORCED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.flow_enforced";
public static final String FLOW_REMOVED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.flow_removed";
- public static final String LARGE_TRANSACTION_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.large_transaction_warn";
public static final String DISCARDMSG_NOALTEXCH_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noaltexch";
static
@@ -95,7 +94,6 @@ public class ChannelMessages
LoggerFactory.getLogger(IDLE_TXN_LOG_HIERARCHY);
LoggerFactory.getLogger(FLOW_ENFORCED_LOG_HIERARCHY);
LoggerFactory.getLogger(FLOW_REMOVED_LOG_HIERARCHY);
- LoggerFactory.getLogger(LARGE_TRANSACTION_WARN_LOG_HIERARCHY);
LoggerFactory.getLogger(DISCARDMSG_NOALTEXCH_LOG_HIERARCHY);
_messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Channel_logmessages", _currentLocale);
@@ -837,64 +835,6 @@ public class ChannelMessages
/**
* Log a Channel message of the Format:
- * <pre>CHN-1013 : Uncommitted transaction contains {0,number} bytes of incoming message data.</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage LARGE_TRANSACTION_WARN(Number param1)
- {
- String rawMessage = _messages.getString("LARGE_TRANSACTION_WARN");
-
- final Object[] messageArguments = {param1};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return LARGE_TRANSACTION_WARN_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Channel message of the Format:
* <pre>CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
index 97f80e8..c873e23 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
@@ -41,7 +41,5 @@ DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}
FLOW_CONTROL_IGNORED = CHN-1012 : Flow Control Ignored. Channel will be closed.
-LARGE_TRANSACTION_WARN = CHN-1013 : Uncommitted transaction contains {0,number} bytes of incoming message data.
-
# 0 - operation name
OPERATION = CHN-1014 : Operation : {0}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ConnectionMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ConnectionMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ConnectionMessages.java
index 5f8a3a1..8f0b144 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ConnectionMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ConnectionMessages.java
@@ -70,6 +70,7 @@ public class ConnectionMessages
public static final String MODEL_DELETE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "connection.model_delete";
public static final String IDLE_CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "connection.idle_close";
public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "connection.close";
+ public static final String LARGE_TRANSACTION_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "connection.large_transaction_warn";
public static final String OPEN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "connection.open";
static
@@ -82,6 +83,7 @@ public class ConnectionMessages
LoggerFactory.getLogger(MODEL_DELETE_LOG_HIERARCHY);
LoggerFactory.getLogger(IDLE_CLOSE_LOG_HIERARCHY);
LoggerFactory.getLogger(CLOSE_LOG_HIERARCHY);
+ LoggerFactory.getLogger(LARGE_TRANSACTION_WARN_LOG_HIERARCHY);
LoggerFactory.getLogger(OPEN_LOG_HIERARCHY);
_messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Connection_logmessages", _currentLocale);
@@ -503,6 +505,64 @@ public class ConnectionMessages
/**
* Log a Connection message of the Format:
+ * <pre>CON-1009 : Uncommitted transaction(s) contains {0,number} bytes of incoming message data exceeding {1,number} bytes limit. Messages will be flown to disk.</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage LARGE_TRANSACTION_WARN(Number param1, Number param2)
+ {
+ String rawMessage = _messages.getString("LARGE_TRANSACTION_WARN");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return LARGE_TRANSACTION_WARN_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Connection message of the Format:
* <pre>CON-1001 : Open : Destination : {0}({1}) : Protocol Version : {2}[ : SSL][ : Client ID : {3}][ : Client Version : {4}][ : Client Product : {5}]</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties
index 2bd3dd6..4e6564e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties
@@ -27,3 +27,5 @@ CLIENT_VERSION_REJECT = CON-1006 : Client version "{0}" rejected by validation
MODEL_DELETE = CON-1007 : Connection close initiated by operator
# 0 - operation name
OPERATION = CON-1008 : Operation : {0}
+LARGE_TRANSACTION_WARN = CON-1009 : Uncommitted transaction(s) contains {0,number} bytes of incoming message data exceeding {1,number} bytes limit. Messages will be flown to disk.
+
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index a0b96b8..1da929a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -108,6 +108,11 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
@DerivedAttribute
Port<?> getPort();
+ @DerivedAttribute(description = "The maximum size in bytes that uncommitted transactions associated with this connection"
+ + " may grow before the messages contained within the transactions will be flown to disk. "
+ + " Disabled if negative.")
+ long getMaxUncommittedInMemorySize();
+
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Inbound")
long getBytesIn();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 5b74264..48e7cc5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -33,11 +33,12 @@ import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.session.AMQPSession;
+import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.TransactionObserver;
import org.apache.qpid.server.util.Deletable;
public interface AMQPConnection<C extends AMQPConnection<C>>
- extends Connection<C>, Deletable<C>, EventLoggerProvider, TransactionObserver
+ extends Connection<C>, Deletable<C>, EventLoggerProvider
{
Broker<?> getBroker();
@@ -81,6 +82,8 @@ public interface AMQPConnection<C extends AMQPConnection<C>>
AggregateTicker getAggregateTicker();
+ LocalTransaction createLocalTransaction();
+
enum CloseReason
{
MANAGEMENT,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 3252980..36fb0d5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -73,6 +73,7 @@ import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.FlowToDiskTransactionObserver;
+import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.TransactionObserver;
import org.apache.qpid.server.util.Action;
@@ -124,6 +125,7 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
private final AtomicLong _maxMessageSize = new AtomicLong(Long.MAX_VALUE);
private volatile int _messageCompressionThreshold;
private volatile TransactionObserver _transactionObserver;
+ private long _maxUncommittedInMemorySize;
public AbstractAMQPConnection(Broker<?> broker,
ServerNetworkConnection network,
@@ -212,11 +214,8 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
SlowConnectionOpenTicker slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
_aggregateTicker.addTicker(slowConnectionOpenTicker);
_lastReadTime = _lastWriteTime = getCreatedTime().getTime();
- _transactionObserver = new FlowToDiskTransactionObserver(getContextValue(Long.class,
- Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE),
- _logSubject,
- _eventLoggerProvider.getEventLogger());
-
+ _maxUncommittedInMemorySize = getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+ _transactionObserver = _maxUncommittedInMemorySize < 0 ? FlowToDiskTransactionObserver.NOOP_TRANSACTION_OBSERVER : new FlowToDiskTransactionObserver(_maxUncommittedInMemorySize, _logSubject, _eventLoggerProvider.getEventLogger());
logConnectionOpen();
}
@@ -847,6 +846,12 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
}
@Override
+ public long getMaxUncommittedInMemorySize()
+ {
+ return _maxUncommittedInMemorySize;
+ }
+
+ @Override
public String toString()
{
return getNetwork().getRemoteAddress() + "(" + ((getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName()) + ")");
@@ -872,6 +877,13 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
}
+ @Override
+ public LocalTransaction createLocalTransaction()
+ {
+ return new LocalTransaction(getAddressSpace().getMessageStore(),
+ () -> getLastReadTime(),
+ _transactionObserver);
+ }
private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener
{
@@ -948,23 +960,4 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,
return getNetwork().getPeerPrincipal();
}
- @Override
- public void onMessageEnqueue(final ServerTransaction transaction,
- final EnqueueableMessage<? extends StorableMessageMetaData> message)
- {
- _transactionObserver.onMessageEnqueue(transaction, message);
- }
-
- @Override
- public void onDischarge(final ServerTransaction transaction)
- {
- _transactionObserver.onDischarge(transaction);
- }
-
- @Override
- public void reset()
- {
- _transactionObserver.reset();
- }
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
index 644c73d..d794238 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
@@ -40,6 +40,7 @@ public class FlowToDiskTransactionObserver implements TransactionObserver
private final LogSubject _logSubject;
private final EventLogger _eventLogger;
private final long _maxUncommittedInMemorySize;
+ private volatile boolean _reported;
public FlowToDiskTransactionObserver(final long maxUncommittedInMemorySize,
final LogSubject logSubject,
@@ -57,34 +58,30 @@ public class FlowToDiskTransactionObserver implements TransactionObserver
final EnqueueableMessage<? extends StorableMessageMetaData> message)
{
StoredMessage<? extends StorableMessageMetaData> handle = message.getStoredMessage();
- long size = handle.getContentSize() + handle.getMetaData().getStorableSize();
- long uncommittedMessages = _uncommittedMessageSize.accumulateAndGet(size,
- (left, right) -> left + right);
- if (uncommittedMessages > _maxUncommittedInMemorySize)
+ long messageSize = handle.getContentSize() + handle.getMetaData().getStorableSize();
+
+ long newUncommittedSize = _uncommittedMessageSize.get() + messageSize;
+ if (newUncommittedSize > _maxUncommittedInMemorySize)
{
handle.flowToDisk();
- if(!_uncommittedMessages.isEmpty() || uncommittedMessages == size)
+ if (!_reported)
{
- _eventLogger.message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(uncommittedMessages));
+ _eventLogger.message(_logSubject, ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, _maxUncommittedInMemorySize));
+ _reported = true;
}
- if(!_uncommittedMessages.isEmpty())
+ if (!_uncommittedMessages.isEmpty())
{
for (TransactionDetails transactionDetails : _uncommittedMessages.values())
{
transactionDetails.flowToDisk();
}
- _uncommittedMessages.clear();
}
}
else
{
- TransactionDetails newDetails = new TransactionDetails();
- TransactionDetails details = _uncommittedMessages.putIfAbsent(transaction, newDetails);
- if (details == null)
- {
- details = newDetails;
- }
+ _uncommittedMessageSize.addAndGet(messageSize);
+ TransactionDetails details = _uncommittedMessages.computeIfAbsent(transaction, key -> new TransactionDetails());
details.messageEnqueued(handle);
}
}
@@ -95,10 +92,13 @@ public class FlowToDiskTransactionObserver implements TransactionObserver
TransactionDetails transactionDetails = _uncommittedMessages.remove(transaction);
if (transactionDetails != null)
{
- _uncommittedMessageSize.accumulateAndGet(transactionDetails.getUncommittedMessageSize(),
- (left, right) -> left - right);
+ _uncommittedMessageSize.addAndGet(-transactionDetails.getUncommittedMessageSize());
}
+ if (_maxUncommittedInMemorySize > _uncommittedMessageSize.get())
+ {
+ _reported = false;
+ }
}
@Override
@@ -122,7 +122,7 @@ public class FlowToDiskTransactionObserver implements TransactionObserver
private void messageEnqueued(StoredMessage<? extends StorableMessageMetaData> handle)
{
long size = handle.getContentSize() + handle.getMetaData().getStorableSize();
- _uncommittedMessageSize.accumulateAndGet(size, (left, right) -> left + right);
+ _uncommittedMessageSize.addAndGet(size);
_uncommittedMessages.add(handle);
}
@@ -132,6 +132,7 @@ public class FlowToDiskTransactionObserver implements TransactionObserver
{
uncommittedHandle.flowToDisk();
}
+ _uncommittedMessages.clear();
}
private long getUncommittedMessageSize()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserverTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserverTest.java b/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserverTest.java
index 8e8eed5..04c86c0 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserverTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserverTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -105,6 +106,35 @@ public class FlowToDiskTransactionObserverTest extends QpidTestCase
verify(_eventLogger, never()).message(same(_logSubject), any(LogMessage.class));
}
+ public void testBreachLimitTwice() throws Exception
+ {
+ EnqueueableMessage<?> message1 = createMessage(MAX_UNCOMMITTED_IN_MEMORY_SIZE + 1);
+
+ _flowToDiskMessageObserver.onMessageEnqueue(_transaction, message1);
+
+ StoredMessage handle1 = message1.getStoredMessage();
+ verify(handle1).flowToDisk();
+ verify(_eventLogger, times(1)).message(same(_logSubject), any(LogMessage.class));
+
+ _flowToDiskMessageObserver.onDischarge(_transaction);
+
+ EnqueueableMessage<?> message2 = createMessage(MAX_UNCOMMITTED_IN_MEMORY_SIZE / 2);
+ EnqueueableMessage<?> message3 = createMessage((MAX_UNCOMMITTED_IN_MEMORY_SIZE / 2) + 1);
+
+ _flowToDiskMessageObserver.onMessageEnqueue(_transaction, message2);
+
+ StoredMessage handle2 = message2.getStoredMessage();
+ verify(handle2, never()).flowToDisk();
+
+ _flowToDiskMessageObserver.onMessageEnqueue(_transaction, message3);
+
+ StoredMessage handle3 = message3.getStoredMessage();
+ verify(handle2).flowToDisk();
+ verify(handle3).flowToDisk();
+
+ verify(_eventLogger, times(2)).message(same(_logSubject), any(LogMessage.class));
+ }
+
private EnqueueableMessage<?> createMessage(int size)
{
EnqueueableMessage message = mock(EnqueueableMessage.class);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 48e37c5..805b54b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1228,7 +1228,7 @@ public class ServerSession extends SessionInvoker
public void selectTx()
{
- _transaction = new LocalTransaction(this.getMessageStore(), getAMQPConnection());
+ _transaction = getConnection().getAmqpConnection().createLocalTransaction();
_txnStarts.incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 2aaa69b..61c8c26 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -284,9 +284,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
/** Sets this channel to be part of a local transaction */
private void setLocalTransactional()
{
- _transaction = new LocalTransaction(_messageStore,
- () -> _connection.getLastReadTime(),
- _connection);
+ _transaction = _connection.createLocalTransaction();
_txnStarts.incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index c75bd2f..9ea7a11 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -74,7 +74,7 @@ public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQ
void close(Error error);
Iterator<IdentifiedTransaction> getOpenTransactions();
- IdentifiedTransaction createLocalTransaction();
+ IdentifiedTransaction createIdentifiedTransaction();
ServerTransaction getTransaction(int txnId);
void removeTransaction(int txnId);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 7aac360..07777a4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -1837,7 +1837,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
@Override
- public IdentifiedTransaction createLocalTransaction()
+ public IdentifiedTransaction createIdentifiedTransaction()
{
ServerTransaction[] openTransactions = _openTransactions;
final int maxOpenTransactions = openTransactions.length;
@@ -1861,7 +1861,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
- final LocalTransaction serverTransaction = new LocalTransaction(getAddressSpace().getMessageStore(), this);
+ final LocalTransaction serverTransaction = createLocalTransaction();
_openTransactions[id] = serverTransaction;
return new IdentifiedTransaction(id, serverTransaction);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 4ad9bce..9bb3406 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -122,7 +122,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
Session_1_0 session = getSession();
if(command instanceof Declare)
{
- final IdentifiedTransaction txn = session.getConnection().createLocalTransaction();
+ final IdentifiedTransaction txn = session.getConnection().createIdentifiedTransaction();
_createdTransactions.put(txn.getId(), txn.getServerTransaction());
Declared state = new Declared();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9afda2d/doc/java-broker/src/docbkx/Java-Broker-Appendix-Operational-Logging-Messages.xml
----------------------------------------------------------------------
diff --git a/doc/java-broker/src/docbkx/Java-Broker-Appendix-Operational-Logging-Messages.xml b/doc/java-broker/src/docbkx/Java-Broker-Appendix-Operational-Logging-Messages.xml
index 7f9baa1..16e6fa9 100644
--- a/doc/java-broker/src/docbkx/Java-Broker-Appendix-Operational-Logging-Messages.xml
+++ b/doc/java-broker/src/docbkx/Java-Broker-Appendix-Operational-Logging-Messages.xml
@@ -839,6 +839,22 @@
Operator using manangement.</para>
</entry>
</row>
+ <row xml:id="Java-Broker-Appendix-Operation-Logging-Message-CON-1009">
+ <entry morerows="1">CON-1009</entry>
+ <entry>Uncommitted transaction(s) contains <replaceable>size</replaceable> bytes of incoming message data
+ exceeding <replaceable>size</replaceable> bytes limit. Messages will be flown to disk.</entry>
+ </row>
+ <row>
+ <entry>
+ <para>Warns that the transactions associated with this connection contain so much uncommitted data that
+ a threshold has been breached. The connection responds by flowing the messages already associated with the
+ transactions and any new messages to disk. The connection reverts to normal behaviour once the
+ quantity of uncommitted data falls beneath the threshold. Normally this happens when the transactions
+ commit or rollback.
+ </para>
+ </entry>
+ </row>
+
</tbody>
</tgroup>
</table>
@@ -982,16 +998,6 @@
</para>
</entry>
</row>
- <row xml:id="Java-Broker-Appendix-Operation-Logging-Message-CHN-1013">
- <entry morerows="1">CHN-1013</entry>
- <entry>Uncommitted transaction contains <replaceable>size</replaceable> bytes of incoming message data.</entry>
- </row>
- <row>
- <entry>
- <para>Warns about uncommitted transaction with large message(s)
- </para>
- </entry>
- </row>
</tbody>
</tgroup>
</table>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org