You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/10/16 15:51:08 UTC
svn commit: r1709000 -
/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
Author: rgodfrey
Date: Fri Oct 16 13:51:07 2015
New Revision: 1709000
URL: http://svn.apache.org/viewvc?rev=1709000&view=rev
Log:
QPID-6797 : merge queue entry state change listeners, and use a single instance per consumer
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1709000&r1=1708999&r2=1709000&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Oct 16 13:51:07 2015
@@ -37,7 +37,6 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
@@ -51,22 +50,6 @@ import org.apache.qpid.server.util.State
public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
- private final StateChangeListener<MessageInstance, MessageInstance.State> _entryReleaseListener =
- new StateChangeListener<MessageInstance, MessageInstance.State>()
- {
- @Override
- public void stateChanged(final MessageInstance entry,
- final MessageInstance.State oldSate,
- final MessageInstance.State newState)
- {
- if (oldSate == QueueEntry.State.ACQUIRED && newState != QueueEntry.State.ACQUIRED)
- {
- restoreCredit(entry.getMessage());
- }
- entry.removeStateChangeListener(this);
- }
- };
-
private final ClientDeliveryMethod _deliveryMethod;
private final RecordDeliveryMethod _recordMethod;
@@ -289,7 +272,6 @@ public abstract class ConsumerTarget_0_8
addUnacknowledgedMessage(entry);
recordMessageDelivery(consumer, entry, deliveryTag);
- entry.addStateChangeListener(getReleasedStateChangeListener());
long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
}
@@ -459,11 +441,6 @@ public abstract class ConsumerTarget_0_8
_creditManager.restoreCredit(1, message.getSize());
}
- protected final StateChangeListener<MessageInstance, MessageInstance.State> getReleasedStateChangeListener()
- {
- return _entryReleaseListener;
- }
-
public void creditStateChanged(boolean hasCredit)
{
@@ -551,18 +528,7 @@ public abstract class ConsumerTarget_0_8
final long size = entry.getMessage().getSize();
_unacknowledgedBytes.addAndGet(size);
_unacknowledgedCount.incrementAndGet();
- entry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>()
- {
- public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
- {
- if(oldState.equals(MessageInstance.State.ACQUIRED) && !newState.equals(MessageInstance.State.ACQUIRED))
- {
- _unacknowledgedBytes.addAndGet(-size);
- _unacknowledgedCount.decrementAndGet();
- entry.removeStateChangeListener(this);
- }
- }
- });
+ entry.addStateChangeListener(_unacknowledgedMessageListener);
}
@Override
@@ -579,4 +545,23 @@ public abstract class ConsumerTarget_0_8
{
return _unacknowledgedCount.longValue();
}
+
+ private final StateChangeListener<MessageInstance, MessageInstance.State> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.State>()
+ {
+
+ public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
+ {
+ if(oldState == MessageInstance.State.ACQUIRED && newState != MessageInstance.State.ACQUIRED)
+ {
+ final long _size = entry.getMessage().getSize();
+ _unacknowledgedBytes.addAndGet(-_size);
+ _unacknowledgedCount.decrementAndGet();
+
+ _creditManager.restoreCredit(1, _size);
+
+ entry.removeStateChangeListener(this);
+ }
+
+ }
+ };
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org