You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/08 15:15:36 UTC
svn commit: r1616742 - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/main/java/org/...
Author: rgodfrey
Date: Fri Aug 8 13:15:35 2014
New Revision: 1616742
URL: http://svn.apache.org/r1616742
Log:
QPID-3978 : [Java Broker] Allow for acquired messages to be removed from a queue due to TTL or management actions
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Fri Aug 8 13:15:35 2014
@@ -29,6 +29,8 @@ public interface ConsumerTarget
{
+ void acquisitionRemoved(MessageInstance node);
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Fri Aug 8 13:15:35 2014
@@ -51,6 +51,8 @@ public interface MessageInstance
boolean isAcquiredBy(ConsumerImpl consumer);
+ boolean removeAcquisitionFromConsumer(ConsumerImpl consumer);
+
void setRedelivered();
boolean isRedelivered();
@@ -67,6 +69,10 @@ public interface MessageInstance
boolean acquire(ConsumerImpl sub);
+ boolean lockAcquisition();
+
+ boolean unlockAcquisition();
+
int getMaximumDeliveryCount();
int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn);
@@ -99,6 +105,7 @@ public interface MessageInstance
State currentState = getState();
return currentState == State.DEQUEUED || currentState == State.DELETED;
}
+
}
@@ -162,10 +169,12 @@ public interface MessageInstance
public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
{
private final C _consumer;
+ private final LockedAcquiredState<C> _lockedState;
public ConsumerAcquiredState(C consumer)
{
_consumer = consumer;
+ _lockedState = new LockedAcquiredState<>(this);
}
@@ -183,6 +192,43 @@ public interface MessageInstance
{
return "{" + getState().name() + " : " + _consumer +"}";
}
+
+ public LockedAcquiredState<C> getLockedState()
+ {
+ return _lockedState;
+ }
+
+ }
+
+ public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState
+ {
+ private final ConsumerAcquiredState<C> _acquiredState;
+
+ public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState)
+ {
+ _acquiredState = acquiredState;
+ }
+
+ @Override
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
+
+ public C getConsumer()
+ {
+ return _acquiredState.getConsumer();
+ }
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}";
+ }
+
+ public ConsumerAcquiredState<C> getUnlockedState()
+ {
+ return _acquiredState;
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Aug 8 13:15:35 2014
@@ -1164,6 +1164,10 @@ public abstract class AbstractQueue<X ex
else
{
deliverMessage(sub, entry, false);
+ if(sub.acquires())
+ {
+ entry.unlockAcquisition();
+ }
}
}
}
@@ -2001,6 +2005,10 @@ public abstract class AbstractQueue<X ex
else
{
deliverMessage(sub, node, batch);
+ if(sub.acquires())
+ {
+ node.unlockAcquisition();
+ }
}
}
@@ -2253,14 +2261,28 @@ public abstract class AbstractQueue<X ex
if (!node.isDeleted())
{
// If the node has expired then acquire it
- if (node.expired() && node.acquire())
+ if (node.expired())
{
- if (_logger.isDebugEnabled())
+ boolean acquiredForDequeueing = node.acquire();
+ if(!acquiredForDequeueing && node.getDeliveredToConsumer())
+ {
+ QueueConsumer consumer = (QueueConsumer) node.getDeliveredConsumer();
+ acquiredForDequeueing = node.removeAcquisitionFromConsumer(consumer);
+ if(acquiredForDequeueing)
+ {
+ consumer.acquisitionRemoved(node);
+ }
+ }
+
+ if(acquiredForDequeueing)
{
- _logger.debug("Dequeuing expired node " + node);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Dequeuing expired node " + node);
+ }
+ // Then dequeue it.
+ dequeueEntry(node);
}
- // Then dequeue it.
- dequeueEntry(node);
}
else
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Fri Aug 8 13:15:35 2014
@@ -39,6 +39,8 @@ public interface QueueConsumer<X extends
void send(QueueEntry entry, boolean batch);
+ void acquisitionRemoved(QueueEntry node);
+
void queueDeleted();
SubFlushRunner getRunner();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Aug 8 13:15:35 2014
@@ -477,6 +477,13 @@ class QueueConsumerImpl
}
@Override
+ public void acquisitionRemoved(final QueueEntry node)
+ {
+ _target.acquisitionRemoved(node);
+ _queue.decrementUnackedMsgCount(node);
+ }
+
+ @Override
public String getDistributionMode()
{
return _distributionMode;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Aug 8 13:15:35 2014
@@ -210,7 +210,7 @@ public abstract class QueueEntryImpl imp
public boolean acquire(ConsumerImpl sub)
{
- final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState());
+ final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState());
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -218,17 +218,57 @@ public abstract class QueueEntryImpl imp
return acquired;
}
+ @Override
+ public boolean lockAcquisition()
+ {
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState)
+ {
+ return _stateUpdater.compareAndSet(this, state, ((ConsumerAcquiredState)state).getLockedState());
+ }
+ return state instanceof LockedAcquiredState;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ EntryState state = _state;
+ if(state instanceof LockedAcquiredState)
+ {
+ return _stateUpdater.compareAndSet(this, state, ((LockedAcquiredState)state).getUnlockedState());
+ }
+ return false;
+ }
+
public boolean acquiredByConsumer()
{
- return (_state instanceof ConsumerAcquiredState);
+ return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState);
}
+ @Override
public boolean isAcquiredBy(ConsumerImpl consumer)
{
EntryState state = _state;
- return state instanceof ConsumerAcquiredState
- && ((ConsumerAcquiredState)state).getConsumer() == consumer;
+ return (state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+ || (state instanceof LockedAcquiredState
+ && ((LockedAcquiredState)state).getConsumer() == consumer);
+ }
+
+ @Override
+ public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
+ {
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+ {
+ return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE);
+ }
+ else
+ {
+ return false;
+ }
}
public void release()
@@ -238,7 +278,7 @@ public abstract class QueueEntryImpl imp
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- if(state instanceof ConsumerAcquiredState)
+ if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
@@ -268,6 +308,10 @@ public abstract class QueueEntryImpl imp
{
return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
}
+ else if (state instanceof LockedAcquiredState)
+ {
+ return (QueueConsumer) ((LockedAcquiredState) state).getConsumer();
+ }
else
{
return null;
@@ -312,7 +356,7 @@ public abstract class QueueEntryImpl imp
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- if (state instanceof ConsumerAcquiredState)
+ if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Fri Aug 8 13:15:35 2014
@@ -68,6 +68,8 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -953,15 +955,26 @@ public abstract class AbstractVirtualHos
op.withinTransaction(new Transaction()
{
- public void dequeue(final MessageInstance entry)
+ public void dequeue(final MessageInstance messageInstance)
{
- if(entry.acquire())
+ boolean acquired = messageInstance.acquire();
+ if(!acquired && messageInstance instanceof QueueEntry)
+ {
+ QueueEntry entry = (QueueEntry) messageInstance;
+ QueueConsumer consumer = (QueueConsumer) entry.getDeliveredConsumer();
+ acquired = messageInstance.removeAcquisitionFromConsumer(consumer);
+ if(acquired)
+ {
+ consumer.acquisitionRemoved((QueueEntry)messageInstance);
+ }
+ }
+ if(acquired)
{
- txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action()
+ txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action()
{
public void postCommit()
{
- entry.delete();
+ messageInstance.delete();
}
public void onRollback()
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Fri Aug 8 13:15:35 2014
@@ -181,6 +181,12 @@ public class MockConsumer implements Con
}
+ @Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+
+ }
+
public State getState()
{
return _state;
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Fri Aug 8 13:15:35 2014
@@ -65,6 +65,12 @@ public class MockMessageInstance impleme
return false;
}
+ @Override
+ public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ {
+ return false;
+ }
+
public void delete()
{
@@ -81,6 +87,18 @@ public class MockMessageInstance impleme
return false;
}
+ @Override
+ public boolean lockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return false;
+ }
+
public boolean isAvailable()
{
return false;
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Fri Aug 8 13:15:35 2014
@@ -137,6 +137,40 @@ public abstract class QueueEntryImplTest
return consumer;
}
+
+ public void testLocking()
+ {
+ QueueConsumer consumer = newConsumer();
+ QueueConsumer consumer2 = newConsumer();
+
+ _queueEntry.acquire(consumer);
+ assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+ _queueEntry.isAcquired());
+
+ assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
+ assertFalse("Acquisition should not be able to be removed from the wrong consumer",_queueEntry.removeAcquisitionFromConsumer(consumer2));
+ assertTrue("Acquisition should be able to be removed once unlocked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Queue Entry should still be acquired", _queueEntry.isAcquired());
+ assertFalse("Queue Entry should not be marked as acquired by a consumer", _queueEntry.acquiredByConsumer());
+
+ _queueEntry.release();
+
+ assertFalse("Hijacked queue entry should be able to be released", _queueEntry.isAcquired());
+
+ _queueEntry.acquire(consumer);
+ assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+ _queueEntry.isAcquired());
+
+ assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
+ assertTrue("Should be able to unlock locked queue entry",_queueEntry.lockAcquisition());
+ assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+
+ _queueEntry.delete();
+ assertTrue("Locked queue entry should be able to be deleted", _queueEntry.isDeleted());
+ }
+
/**
* A helper method to get entry state
*
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Fri Aug 8 13:15:35 2014
@@ -342,5 +342,17 @@ public class StandardQueueTest extends A
return super.acquire(sub);
}
}
+
+ @Override
+ public boolean lockAcquisition()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return true;
+ }
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Aug 8 13:15:35 2014
@@ -534,15 +534,25 @@ public class ConsumerTarget_0_10 extends
return _stopped.get();
}
- public void acknowledge(MessageInstance entry)
+ public boolean deleteAcquired(MessageInstance entry)
{
- // TODO Fix Store Context / cleanup
if(entry.isAcquiredBy(getConsumer()))
{
- _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
- _unacknowledgedCount.decrementAndGet();
+ acquisitionRemoved(entry);
entry.delete();
+ return true;
}
+ else
+ {
+ return false;
+ }
+ }
+
+ @Override
+ public void acquisitionRemoved(final MessageInstance entry)
+ {
+ _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
+ _unacknowledgedCount.decrementAndGet();
}
public void flush()
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Fri Aug 8 13:15:35 2014
@@ -41,13 +41,13 @@ class ExplicitAcceptDispositionChangeLis
public void onAccept()
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition())
{
_target.getSessionModel().acknowledge(_target, _entry);
}
else
{
- _logger.warn("MessageAccept received for message which has not been acquired (likely client error)");
+ _logger.info("MessageAccept received for message which is not been acquired - message may have expired or been removed");
}
}
@@ -60,7 +60,7 @@ class ExplicitAcceptDispositionChangeLis
}
else
{
- _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
+ _logger.warn("MessageRelease received for message which has not been acquired - message may have expired or been removed");
}
}
@@ -72,7 +72,7 @@ class ExplicitAcceptDispositionChangeLis
}
else
{
- _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
+ _logger.warn("MessageReject received for message which has not been acquired - message may have expired or been removed");
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Fri Aug 8 13:15:35 2014
@@ -29,6 +29,7 @@ public class MessageAcceptCompletionList
private final ConsumerTarget_0_10 _sub;
private final MessageInstance _entry;
private final ServerSession _session;
+ private long _messageSize;
private boolean _restoreCredit;
public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
@@ -38,15 +39,19 @@ public class MessageAcceptCompletionList
_entry = entry;
_session = session;
_restoreCredit = restoreCredit;
+ if(restoreCredit)
+ {
+ _messageSize = entry.getMessage().getSize();
+ }
}
public void onComplete(Method method)
{
if(_restoreCredit)
{
- _sub.restoreCredit(_entry.getMessage());
+ _sub.getCreditManager().restoreCredit(1l, _messageSize);
}
- if(_entry.isAcquiredBy(_sub.getConsumer()))
+ if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition())
{
_session.acknowledge(_sub, _entry);
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Aug 8 13:15:35 2014
@@ -460,7 +460,7 @@ public class ServerSession extends Sessi
public void postCommit()
{
- sub.acknowledge(entry);
+ sub.deleteAcquired(entry);
}
public void onRollback()
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Aug 8 13:15:35 2014
@@ -1413,7 +1413,11 @@ public class AMQChannel<T extends AMQPro
// explicit rollbacks resend the message after the rollback-ok is sent
if(_rollingBack)
{
- _resendList.addAll(_ackedMessages);
+ for(MessageInstance entry : _ackedMessages)
+ {
+ entry.unlockAcquisition();
+ }
+ _resendList.addAll(_ackedMessages);
}
else
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Aug 8 13:15:35 2014
@@ -20,11 +20,16 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
@@ -34,14 +39,10 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
* that was given out by the broker and the channel id. <p/>
@@ -57,7 +58,7 @@ public abstract class ConsumerTarget_0_8
final MessageInstance.State oldSate,
final MessageInstance.State newState)
{
- if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
+ if (oldSate == QueueEntry.State.ACQUIRED && newState != QueueEntry.State.ACQUIRED)
{
restoreCredit(entry.getMessage());
}
@@ -74,8 +75,8 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager) throws AMQException
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager) throws AMQException
{
return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
@@ -557,6 +558,11 @@ public abstract class ConsumerTarget_0_8
});
}
+ @Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+ }
+
public long getUnacknowledgedBytes()
{
return _unacknowledgedBytes.longValue();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Fri Aug 8 13:15:35 2014
@@ -20,31 +20,28 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
-
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
+
public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
private final Object _lock = new Object();
- private long _unackedSize;
-
private Map<Long, MessageInstance> _map;
- private long _lastDeliveryTag;
-
private final int _prefetchLimit;
public UnacknowledgedMessageMapImpl(int prefetchLimit)
{
_prefetchLimit = prefetchLimit;
- _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit);
+ _map = new LinkedHashMap<>(prefetchLimit);
}
public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
@@ -81,12 +78,6 @@ public class UnacknowledgedMessageMapImp
{
MessageInstance message = _map.remove(deliveryTag);
- if(message != null)
- {
- _unackedSize -= message.getMessage().getSize();
-
- }
-
return message;
}
}
@@ -109,8 +100,6 @@ public class UnacknowledgedMessageMapImp
synchronized (_lock)
{
_map.put(deliveryTag, message);
- _unackedSize += message.getMessage().getSize();
- _lastDeliveryTag = deliveryTag;
}
}
@@ -119,8 +108,7 @@ public class UnacknowledgedMessageMapImp
synchronized (_lock)
{
Collection<MessageInstance> currentEntries = _map.values();
- _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit);
- _unackedSize = 0l;
+ _map = new LinkedHashMap<>(_prefetchLimit);
return currentEntries;
}
}
@@ -138,7 +126,6 @@ public class UnacknowledgedMessageMapImp
synchronized (_lock)
{
_map.clear();
- _unackedSize = 0l;
}
}
@@ -163,6 +150,14 @@ public class UnacknowledgedMessageMapImp
Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>();
collect(deliveryTag, multiple, ackedMessageMap);
remove(ackedMessageMap);
+ List<MessageInstance> acknowledged = new ArrayList<>();
+ for(MessageInstance instance : ackedMessageMap.values())
+ {
+ if(instance.lockAcquisition())
+ {
+ acknowledged.add(instance);
+ }
+ }
return ackedMessageMap.values();
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Aug 8 13:15:35 2014
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.nio.ByteBuffer;
+import java.util.List;
+
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
@@ -37,19 +40,16 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
private final boolean _acquires;
@@ -378,6 +378,7 @@ class ConsumerTarget_1_0 extends Abstrac
if(outcome instanceof Accepted)
{
+ _queueEntry.lockAcquisition();
txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(),
new ServerTransaction.Action()
{
@@ -412,6 +413,7 @@ class ConsumerTarget_1_0 extends Abstrac
modified.setDeliveryFailed(true);
_link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
_link.getEndpoint().sendFlowConditional();
+ _queueEntry.unlockAcquisition();
}
}
});
@@ -498,6 +500,11 @@ class ConsumerTarget_1_0 extends Abstrac
}
@Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+ }
+
+ @Override
public void consumerAdded(final ConsumerImpl sub)
{
_consumer = sub;
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Fri Aug 8 13:15:35 2014
@@ -636,19 +636,21 @@ public class SendingLink_1_0 implements
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
if(_consumer.acquires())
{
- txn.dequeue(Collections.singleton(queueEntry),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- queueEntry.delete();
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
+ if(queueEntry.acquire() || queueEntry.isAcquired())
+ {
+ txn.dequeue(Collections.singleton(queueEntry),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ queueEntry.delete();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
}
}
else if(outcome instanceof Released)
Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Fri Aug 8 13:15:35 2014
@@ -1071,6 +1071,12 @@ class ManagementNode implements MessageS
}
@Override
+ public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ {
+ return false;
+ }
+
+ @Override
public void setRedelivered()
{
@@ -1119,6 +1125,18 @@ class ManagementNode implements MessageS
}
@Override
+ public boolean lockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
public int getMaximumDeliveryCount()
{
return 0;
Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Fri Aug 8 13:15:35 2014
@@ -90,6 +90,12 @@ class ManagementResponse implements Mess
}
@Override
+ public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ {
+ return consumer == _consumer;
+ }
+
+ @Override
public void setRedelivered()
{
_isRedelivered = true;
@@ -138,6 +144,18 @@ class ManagementResponse implements Mess
}
@Override
+ public boolean lockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
public int getMaximumDeliveryCount()
{
return 0;
@@ -190,7 +208,7 @@ class ManagementResponse implements Mess
@Override
public void delete()
{
- // TODO
+ _isDeleted = true;
}
@Override
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java Fri Aug 8 13:15:35 2014
@@ -21,15 +21,8 @@
package org.apache.qpid.server.queue;
-import org.junit.Assert;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -39,8 +32,17 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import javax.naming.NamingException;
+
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class TimeToLiveTest extends QpidBrokerTestCase
{
@@ -53,18 +55,29 @@ public class TimeToLiveTest extends Qpid
private static final int MSG_COUNT = 50;
private static final long SERVER_TTL_TIMEOUT = 60000L;
+ public void testPassiveTTLWithPrefetch() throws Exception
+ {
+ doTestPassiveTTL(true);
+ }
+
public void testPassiveTTL() throws Exception
{
+ doTestPassiveTTL(false);
+
+ }
+
+ private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException
+ {
//Create Client 1
Connection clientConnection = getConnection();
-
+
Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = clientSession.createQueue(QUEUE);
-
+ Queue queue = clientSession.createQueue(QUEUE);
+
// Create then close the consumer so the queue is actually created
// Closing it then reopening it ensures that the consumer shouldn't get messages
// which should have expired and allows a shorter sleep period. See QPID-1418
-
+
MessageConsumer consumer = clientSession.createConsumer(queue);
consumer.close();
@@ -79,6 +92,12 @@ public class TimeToLiveTest extends Qpid
MessageProducer producer = producerSession.createProducer(queue);
+ consumer = clientSession.createConsumer(queue);
+ if(prefetchMessages)
+ {
+ clientConnection.start();
+ }
+
//Set TTL
int msg = 0;
producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
@@ -96,7 +115,6 @@ public class TimeToLiveTest extends Qpid
producerSession.commit();
- consumer = clientSession.createConsumer(queue);
// Ensure we sleep the required amount of time.
ReentrantLock waitLock = new ReentrantLock();
@@ -124,6 +142,16 @@ public class TimeToLiveTest extends Qpid
}
+ if(prefetchMessages)
+ {
+ clientConnection.close();
+ clientConnection = getConnection();
+
+ clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = clientSession.createQueue(QUEUE);
+ consumer = clientSession.createConsumer(queue);
+ }
+
clientConnection.start();
//Receive Message 0
@@ -131,14 +159,14 @@ public class TimeToLiveTest extends Qpid
Message receivedFirst = consumer.receive(5000);
Message receivedSecond = consumer.receive(5000);
Message receivedThird = consumer.receive(1000);
-
+
// Log the messages to help diagnosis incase of failure
_logger.info("First:"+receivedFirst);
_logger.info("Second:"+receivedSecond);
_logger.info("Third:"+receivedThird);
// Only first and last messages sent should survive expiry
- Assert.assertNull("More messages received", receivedThird);
+ Assert.assertNull("More messages received", receivedThird);
Assert.assertNotNull("First message not received", receivedFirst);
Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org