You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/11 18:01:10 UTC
svn commit: r655330 -
/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
Author: rgodfrey
Date: Sun May 11 09:01:10 2008
New Revision: 655330
URL: http://svn.apache.org/viewvc?rev=655330&view=rev
Log:
Copy over QPID-926
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=655330&r1=655329&r2=655330&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Sun May 11 09:01:10 2008
@@ -110,6 +110,9 @@
boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap)
throws AMQException
{
+
+ final boolean debug = _log.isDebugEnabled();
+ ;
if (multiple)
{
if (deliveryTag == 0)
@@ -123,11 +126,14 @@
{
public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
{
- if (_log.isDebugEnabled())
+ if (debug)
{
_log.debug("Discarding message: " + message.getMessage().getMessageId());
}
-
+ if(message.getMessage().isPersistent())
+ {
+ beginTranIfNecessary();
+ }
message.restoreCredit();
//Message has been ack so discard it. This will dequeue and decrement the reference.
message.discard(_storeContext);
@@ -152,11 +158,14 @@
unacknowledgedMessageMap.drainTo(acked, deliveryTag);
for (QueueEntry msg : acked)
{
- if (_log.isDebugEnabled())
+ if (debug)
{
_log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
-
+ if(msg.getMessage().isPersistent())
+ {
+ beginTranIfNecessary();
+ }
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
@@ -176,20 +185,29 @@
_channel.getChannelId());
}
- if (_log.isDebugEnabled())
+ if (debug)
{
_log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
+ if(msg.getMessage().isPersistent())
+ {
+ beginTranIfNecessary();
+ }
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
- if (_log.isDebugEnabled())
+ if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
msg.getMessage().getMessageId());
}
}
+ if(_inTran)
+ {
+ _messageStore.commitTran(_storeContext);
+ _inTran = false;
+ }
}
public void messageFullyReceived(boolean persistent) throws AMQException