You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/10/16 15:51:08 UTC

svn commit: r1709000 - /qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java

Author: rgodfrey
Date: Fri Oct 16 13:51:07 2015
New Revision: 1709000

URL: http://svn.apache.org/viewvc?rev=1709000&view=rev
Log:
QPID-6797 : merge queue entry state change listeners, and use a single instance per consumer

Modified:
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1709000&r1=1708999&r2=1709000&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Oct 16 13:51:07 2015
@@ -37,7 +37,6 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -51,22 +50,6 @@ import org.apache.qpid.server.util.State
 public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
 {
 
-    private final StateChangeListener<MessageInstance, MessageInstance.State> _entryReleaseListener =
-            new StateChangeListener<MessageInstance, MessageInstance.State>()
-            {
-                @Override
-                public void stateChanged(final MessageInstance entry,
-                                         final MessageInstance.State oldSate,
-                                         final MessageInstance.State newState)
-                {
-                    if (oldSate == QueueEntry.State.ACQUIRED && newState != QueueEntry.State.ACQUIRED)
-                    {
-                        restoreCredit(entry.getMessage());
-                    }
-                    entry.removeStateChangeListener(this);
-                }
-            };
-
     private final ClientDeliveryMethod _deliveryMethod;
     private final RecordDeliveryMethod _recordMethod;
 
@@ -289,7 +272,6 @@ public abstract class ConsumerTarget_0_8
 
                 addUnacknowledgedMessage(entry);
                 recordMessageDelivery(consumer, entry, deliveryTag);
-                entry.addStateChangeListener(getReleasedStateChangeListener());
                 long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
                 entry.incrementDeliveryCount();
             }
@@ -459,11 +441,6 @@ public abstract class ConsumerTarget_0_8
         _creditManager.restoreCredit(1, message.getSize());
     }
 
-    protected final StateChangeListener<MessageInstance, MessageInstance.State> getReleasedStateChangeListener()
-    {
-        return _entryReleaseListener;
-    }
-
     public void creditStateChanged(boolean hasCredit)
     {
 
@@ -551,18 +528,7 @@ public abstract class ConsumerTarget_0_8
         final long size = entry.getMessage().getSize();
         _unacknowledgedBytes.addAndGet(size);
         _unacknowledgedCount.incrementAndGet();
-        entry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>()
-        {
-            public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
-            {
-                if(oldState.equals(MessageInstance.State.ACQUIRED) && !newState.equals(MessageInstance.State.ACQUIRED))
-                {
-                    _unacknowledgedBytes.addAndGet(-size);
-                    _unacknowledgedCount.decrementAndGet();
-                    entry.removeStateChangeListener(this);
-                }
-            }
-        });
+        entry.addStateChangeListener(_unacknowledgedMessageListener);
     }
 
     @Override
@@ -579,4 +545,23 @@ public abstract class ConsumerTarget_0_8
     {
         return _unacknowledgedCount.longValue();
     }
+
+    private final StateChangeListener<MessageInstance, MessageInstance.State> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.State>()
+    {
+
+        public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
+        {
+            if(oldState == MessageInstance.State.ACQUIRED && newState != MessageInstance.State.ACQUIRED)
+            {
+                final long _size = entry.getMessage().getSize();
+                _unacknowledgedBytes.addAndGet(-_size);
+                _unacknowledgedCount.decrementAndGet();
+
+                _creditManager.restoreCredit(1, _size);
+
+                entry.removeStateChangeListener(this);
+            }
+
+        }
+    };
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org