You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/04/25 16:09:52 UTC
qpid-broker-j git commit: QPID-7639: [AMQP 1.0] Implement large
transaction guard honouring "connection.maxUncommittedInMemorySize" and
flowing messages to disk on breaching threashold
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 4e6a665df -> 20a23dc68
QPID-7639: [AMQP 1.0] Implement large transaction guard honouring "connection.maxUncommittedInMemorySize" and flowing messages to disk on breaching threashold
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/20a23dc6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/20a23dc6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/20a23dc6
Branch: refs/heads/master
Commit: 20a23dc68714c3635028378e10e12d598c5cc1b5
Parents: 4e6a665
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Apr 25 16:46:56 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Apr 25 17:09:10 2017 +0100
----------------------------------------------------------------------
.../server/session/AbstractAMQPSession.java | 7 +-
.../qpid/server/txn/LocalTransaction.java | 101 ++++++++++++++++++-
.../txn/FlowToDiskMessageObserverTest.java | 87 ++++++++++++++++
.../qpid/server/txn/LocalTransactionTest.java | 20 ++++
.../server/protocol/v0_10/ServerSession.java | 54 +---------
.../server/protocol/v0_10/Session_0_10.java | 5 -
.../qpid/server/protocol/v0_8/AMQChannel.java | 59 ++---------
.../protocol/v1_0/AMQPConnection_1_0.java | 2 +-
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 9 +-
.../TxnCoordinatorReceivingLinkEndpoint.java | 9 +-
10 files changed, 234 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 2410ef9..5f61e7d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -81,7 +81,7 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
protected final SecurityToken _token;
protected final PublishAuthorisationCache _publishAuthCache;
- protected final long _maxUncommittedInMemorySize;
+ private final long _maxUncommittedInMemorySize;
protected final LogSubject _logSubject;
@@ -411,6 +411,11 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
}
}
+ public long getMaxUncommittedInMemorySize()
+ {
+ return _maxUncommittedInMemorySize;
+ }
+
protected abstract void updateBlockedStateIfNecessary();
public abstract boolean isClosing();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 9c96083..1d4e7b5 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -29,11 +29,16 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -48,7 +53,8 @@ public class LocalTransaction implements ServerTransaction
{
protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class);
- private final List<Action> _postTransactionActions = new ArrayList<Action>();
+ private final List<Action> _postTransactionActions = new ArrayList<>();
+ private final MessageObserver _messageObserver;
private volatile Transaction _transaction;
private final ActivityTimeAccessor _activityTime;
@@ -60,6 +66,11 @@ public class LocalTransaction implements ServerTransaction
public LocalTransaction(MessageStore transactionLog)
{
+ this(transactionLog, MessageObserver.NOOP_MESSAGE_OBSERVER);
+ }
+
+ public LocalTransaction(MessageStore transactionLog, MessageObserver messageObserver)
+ {
this(transactionLog, new ActivityTimeAccessor()
{
@Override
@@ -67,13 +78,16 @@ public class LocalTransaction implements ServerTransaction
{
return System.currentTimeMillis();
}
- });
+ }, messageObserver);
}
- public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime)
+ public LocalTransaction(MessageStore transactionLog,
+ ActivityTimeAccessor activityTime,
+ MessageObserver messageObserver)
{
_transactionLog = transactionLog;
_activityTime = activityTime;
+ _messageObserver = messageObserver;
}
@Override
@@ -262,6 +276,7 @@ public class LocalTransaction implements ServerTransaction
});
}
}
+ _messageObserver.onMessageEnqueue(message);
}
public void enqueue(Collection<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction)
@@ -332,6 +347,7 @@ public class LocalTransaction implements ServerTransaction
}
tidyUpOnError(e);
}
+ _messageObserver.onMessageEnqueue(message);
}
public void commit()
@@ -513,6 +529,7 @@ public class LocalTransaction implements ServerTransaction
private void resetDetails()
{
+ _messageObserver.reset();
_asyncTran = null;
_transaction = null;
_postTransactionActions.clear();
@@ -540,4 +557,82 @@ public class LocalTransaction implements ServerTransaction
{
return _isRollbackOnly;
}
+
+
+ public interface MessageObserver
+ {
+ MessageObserver NOOP_MESSAGE_OBSERVER = new NoopMessageObserver();
+
+ void onMessageEnqueue(EnqueueableMessage<? extends StorableMessageMetaData> message);
+
+ void reset();
+ }
+
+ private static class NoopMessageObserver implements MessageObserver
+ {
+ @Override
+ public void onMessageEnqueue(final EnqueueableMessage<? extends StorableMessageMetaData> message)
+ {
+ // noop
+ }
+
+ @Override
+ public void reset()
+ {
+ // noop
+ }
+ }
+
+ public static class FlowToDiskMessageObserver implements MessageObserver
+ {
+ private volatile long _uncommittedMessageSize;
+ private final List<StoredMessage<? extends StorableMessageMetaData>> _uncommittedMessages = new ArrayList<>();
+ private final LogSubject _logSubject;
+ private final EventLogger _eventLogger;
+ private final long _maxUncommittedInMemorySize;
+
+ public FlowToDiskMessageObserver(final long maxUncommittedInMemorySize,
+ final LogSubject logSubject,
+ final EventLogger eventLogger)
+ {
+ _logSubject = logSubject;
+ _eventLogger = eventLogger;
+ _maxUncommittedInMemorySize = maxUncommittedInMemorySize;
+ }
+
+ @Override
+ public void onMessageEnqueue(final EnqueueableMessage<? extends StorableMessageMetaData> message)
+ {
+ StoredMessage<? extends StorableMessageMetaData> handle = message.getStoredMessage();
+ _uncommittedMessageSize += handle.getContentSize();
+ if (_uncommittedMessageSize > _maxUncommittedInMemorySize)
+ {
+ handle.flowToDisk();
+ if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize())
+ {
+ _eventLogger.message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+ }
+
+ if(!_uncommittedMessages.isEmpty())
+ {
+ for (StoredMessage<? extends StorableMessageMetaData> uncommittedHandle : _uncommittedMessages)
+ {
+ uncommittedHandle.flowToDisk();
+ }
+ _uncommittedMessages.clear();
+ }
+ }
+ else
+ {
+ _uncommittedMessages.add(handle);
+ }
+ }
+
+ @Override
+ public void reset()
+ {
+ _uncommittedMessageSize = 0L;
+ _uncommittedMessages.clear();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java b/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java
new file mode 100644
index 0000000..b402e9c
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java
@@ -0,0 +1,87 @@
+package org.apache.qpid.server.txn;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class FlowToDiskMessageObserverTest extends QpidTestCase
+{
+ private static final int MAX_UNCOMMITTED_IN_MEMORY_SIZE = 100;
+ private LocalTransaction.FlowToDiskMessageObserver _flowToDiskMessageObserver;
+ private EventLogger _eventLogger ;
+ private LogSubject _logSubject;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _eventLogger = mock(EventLogger.class);
+ _logSubject = mock(LogSubject.class);
+ _flowToDiskMessageObserver = new LocalTransaction.FlowToDiskMessageObserver(MAX_UNCOMMITTED_IN_MEMORY_SIZE,
+ _logSubject,
+ _eventLogger);
+ }
+
+ public void testOnMessageEnqueue() throws Exception
+ {
+ EnqueueableMessage<?> message1 = createMessage(MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+ EnqueueableMessage<?> message2 = createMessage(1);
+ EnqueueableMessage<?> message3 = createMessage(1);
+
+ _flowToDiskMessageObserver.onMessageEnqueue(message1);
+
+ StoredMessage handle1 = message1.getStoredMessage();
+ verify(handle1, never()).flowToDisk();
+ verify(_eventLogger, never()).message(same(_logSubject), any(LogMessage.class));
+
+ _flowToDiskMessageObserver.onMessageEnqueue(message2);
+
+ StoredMessage handle2 = message2.getStoredMessage();
+ verify(handle1).flowToDisk();
+ verify(handle2).flowToDisk();
+ verify(_eventLogger).message(same(_logSubject), any(LogMessage.class));
+
+ _flowToDiskMessageObserver.onMessageEnqueue(message3);
+
+ StoredMessage handle3 = message2.getStoredMessage();
+ verify(handle1).flowToDisk();
+ verify(handle2).flowToDisk();
+ verify(handle3).flowToDisk();
+ verify(_eventLogger).message(same(_logSubject), any(LogMessage.class));
+ }
+
+ public void testReset() throws Exception
+ {
+ EnqueueableMessage<?> message1 = createMessage(MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+ EnqueueableMessage<?> message2 = createMessage(1);
+
+ _flowToDiskMessageObserver.onMessageEnqueue(message1);
+ _flowToDiskMessageObserver.reset();
+ _flowToDiskMessageObserver.onMessageEnqueue(message2);
+
+ StoredMessage handle1 = message1.getStoredMessage();
+ StoredMessage handle2 = message2.getStoredMessage();
+ verify(handle1, never()).flowToDisk();
+ verify(handle2, never()).flowToDisk();
+ verify(_eventLogger, never()).message(same(_logSubject), any(LogMessage.class));
+ }
+
+ private EnqueueableMessage<?> createMessage(int size)
+ {
+ EnqueueableMessage message = mock(EnqueueableMessage.class);
+ StoredMessage handle = mock(StoredMessage.class);
+ when(message.getStoredMessage()).thenReturn(handle);
+ when(handle.getContentSize()).thenReturn(size);
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
index 4cc20a6..b4db5c1 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.txn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
@@ -602,6 +604,24 @@ public class LocalTransactionTest extends QpidTestCase
assertEquals("Transaction update time should be reset after rollback", 0, _transaction.getTransactionUpdateTime());
}
+ public void testEnqueueInvokesMessageObserver() throws Exception
+ {
+
+ final LocalTransaction.MessageObserver messageObserver = mock(LocalTransaction.MessageObserver.class);
+ _transaction = new LocalTransaction(_transactionLog, messageObserver);
+
+ _message = createTestMessage(true);
+ _queues = createTestBaseQueues(new boolean[] {false, true, false, true});
+
+ _transaction.enqueue(_queues, _message, _action1);
+
+ verify(messageObserver).onMessageEnqueue(_message);
+
+ _transaction.enqueue(createQueue(true), _message, _action1);
+
+ verify(messageObserver, times(2)).onMessageEnqueue(_message);
+ }
+
private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
{
Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index afc69ec..23fcb13 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -86,7 +86,6 @@ import org.apache.qpid.server.protocol.v0_10.transport.*;
import org.apache.qpid.server.protocol.v0_10.transport.Xid;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.protocol.v0_10.transport.Frame;
import org.apache.qpid.server.txn.*;
@@ -156,9 +155,6 @@ public class ServerSession extends SessionInvoker
private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>();
private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
- private volatile long _uncommittedMessageSize;
-
- private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
public ServerSession(ServerConnection connection, ServerSessionDelegate delegate, Binary name, long expiry)
{
@@ -969,47 +965,9 @@ public class ServerSession extends SessionInvoker
int enqueues = result.send(_transaction, null);
getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
incrementOutstandingTxnsIfNecessary();
- incrementUncommittedMessageSize(message.getStoredMessage());
return enqueues;
}
- private void resetUncommittedMessages()
- {
- _uncommittedMessageSize = 0l;
- _uncommittedMessages.clear();
- }
-
-
- private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData_0_10> handle)
- {
- if (isTransactional() && !(_transaction instanceof DistributedTransaction))
- {
- _uncommittedMessageSize += handle.getContentSize();
- if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
- {
- handle.flowToDisk();
- if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize())
- {
- getAMQPConnection().getEventLogger()
- .message(getLogSubject(), ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
- }
-
- if(!_uncommittedMessages.isEmpty())
- {
- for (StoredMessage<MessageMetaData_0_10> uncommittedHandle : _uncommittedMessages)
- {
- uncommittedHandle.flowToDisk();
- }
- _uncommittedMessages.clear();
- }
- }
- else
- {
- _uncommittedMessages.add(handle);
- }
- }
- }
-
public void sendMessage(MessageTransfer xfr,
Runnable postIdSettingAction)
{
@@ -1270,7 +1228,10 @@ public class ServerSession extends SessionInvoker
public void selectTx()
{
- _transaction = new LocalTransaction(this.getMessageStore());
+ _transaction = new LocalTransaction(this.getMessageStore(),
+ new LocalTransaction.FlowToDiskMessageObserver(_modelObject.getMaxUncommittedInMemorySize(),
+ getLogSubject(),
+ getAMQPConnection().getEventLogger()));
_txnStarts.incrementAndGet();
}
@@ -1377,7 +1338,6 @@ public class ServerSession extends SessionInvoker
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
- resetUncommittedMessages();
}
public void rollback()
@@ -1387,7 +1347,6 @@ public class ServerSession extends SessionInvoker
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
- resetUncommittedMessages();
}
@@ -1818,11 +1777,6 @@ public class ServerSession extends SessionInvoker
AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
}
- public final long getMaxUncommittedInMemorySize()
- {
- return _modelObject.getMaxUncommittedInMemorySize();
- }
-
private class ResultFuture<T> implements Future<T>
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index ae69352..649d290 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -196,9 +196,4 @@ public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarg
{
return _serverSession;
}
-
- long getMaxUncommittedInMemorySize()
- {
- return _maxUncommittedInMemorySize;
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 84ae994..67b3406 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -183,8 +183,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
private long _blockingTimeout;
private boolean _confirmOnPublish;
private long _confirmedMessageCounter;
- private volatile long _uncommittedMessageSize;
- private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>();
private boolean _wireBlockingState;
@@ -287,14 +285,12 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
/** Sets this channel to be part of a local transaction */
private void setLocalTransactional()
{
- _transaction = new LocalTransaction(_messageStore, new ActivityTimeAccessor()
- {
- @Override
- public long getActivityTime()
- {
- return _connection.getLastReadTime();
- }
- });
+ _transaction = new LocalTransaction(_messageStore,
+ () -> _connection.getLastReadTime(),
+ new LocalTransaction.FlowToDiskMessageObserver(
+ getMaxUncommittedInMemorySize(),
+ _logSubject,
+ getEventLogger()));
_txnStarts.incrementAndGet();
}
@@ -473,7 +469,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
.createBasicAckBody(_confirmedMessageCounter, false);
_connection.writeFrame(responseBody.generateFrame(_channelId));
}
- incrementUncommittedMessageSize(storedMessage);
incrementOutstandingTxnsIfNecessary();
}
@@ -503,35 +498,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
- private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData> handle)
- {
- if (isTransactional())
- {
- _uncommittedMessageSize += handle.getContentSize();
- if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
- {
- handle.flowToDisk();
- if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize())
- {
- messageWithSubject(ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
- }
-
- if(!_uncommittedMessages.isEmpty())
- {
- for (StoredMessage<MessageMetaData> uncommittedHandle : _uncommittedMessages)
- {
- uncommittedHandle.flowToDisk();
- }
- _uncommittedMessages.clear();
- }
- }
- else
- {
- _uncommittedMessages.add(handle);
- }
- }
- }
-
/**
* Either throws a {@link AMQConnectionException} or returns the message
*
@@ -1193,13 +1159,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
}
- resetUncommittedMessages();
- }
-
- private void resetUncommittedMessages()
- {
- _uncommittedMessageSize = 0l;
- _uncommittedMessages.clear();
}
private void rollback(Runnable postRollbackTask)
@@ -1221,7 +1180,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
- resetUncommittedMessages();
}
postRollbackTask.run();
@@ -1324,11 +1282,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
return _currentMessage != null;
}
- private long getMaxUncommittedInMemorySize()
- {
- return _maxUncommittedInMemorySize;
- }
-
public boolean isChannelFlow()
{
return _channelFlow;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index c75bd2f..b2ab892 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -74,7 +74,7 @@ public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQ
void close(Error error);
Iterator<IdentifiedTransaction> getOpenTransactions();
- IdentifiedTransaction createLocalTransaction();
+ IdentifiedTransaction createLocalTransaction(final Session_1_0 transactionSession);
ServerTransaction getTransaction(int txnId);
void removeTransaction(int txnId);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 7be265d..7fa95d0 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -1837,7 +1837,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
@Override
- public IdentifiedTransaction createLocalTransaction()
+ public IdentifiedTransaction createLocalTransaction(final Session_1_0 transactionSession)
{
ServerTransaction[] openTransactions = _openTransactions;
final int maxOpenTransactions = openTransactions.length;
@@ -1862,7 +1862,12 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
final LocalTransaction serverTransaction =
- new LocalTransaction(getAddressSpace().getMessageStore());
+ new LocalTransaction(getAddressSpace().getMessageStore(),
+ new LocalTransaction.FlowToDiskMessageObserver(
+ transactionSession.getMaxUncommittedInMemorySize(),
+ transactionSession.getLogSubject(),
+ transactionSession.getEventLogger()));
+
_openTransactions[id] = serverTransaction;
return new IdentifiedTransaction(id, serverTransaction);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 6b9ee81..a2f4d2d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -119,16 +119,17 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
amqpValueSectionFound = true;
Object command = section.getValue();
+ Session_1_0 session = getSession();
if(command instanceof Declare)
{
- final IdentifiedTransaction txn = getSession().getConnection().createLocalTransaction();
+ final IdentifiedTransaction txn = session.getConnection().createLocalTransaction(session);
_createdTransactions.put(txn.getId(), txn.getServerTransaction());
Declared state = new Declared();
- getSession().incrementStartedTransactions();
+ session.incrementStartedTransactions();
- state.setTxnId(getSession().integerToBinary(txn.getId()));
+ state.setTxnId(session.integerToBinary(txn.getId()));
updateDisposition(deliveryTag, state, true);
}
@@ -136,7 +137,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
{
Discharge discharge = (Discharge) command;
- final Error error = discharge(getSession().binaryToInteger(discharge.getTxnId()),
+ final Error error = discharge(session.binaryToInteger(discharge.getTxnId()),
Boolean.TRUE.equals(discharge.getFail()));
updateDisposition(deliveryTag, error == null ? new Accepted() : null, true);
return error;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org