You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/09/13 10:25:26 UTC

svn commit: r1760524 - in /qpid/java/trunk/client/src/main/java/org/apache/qpid/client: AMQSession_0_8.java BasicMessageConsumer_0_8.java

Author: lquack
Date: Tue Sep 13 10:25:26 2016
New Revision: 1760524

URL: http://svn.apache.org/viewvc?rev=1760524&view=rev
Log:
QPID-7426: [Java Client] Improve naming of methods/names in AMQSession_0_8

No functional changes.

Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1760524&r1=1760523&r2=1760524&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Sep 13 10:25:26 2016
@@ -88,7 +88,7 @@ public class AMQSession_0_8 extends AMQS
      */
     private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
                                                                   DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
-    private AtomicInteger _currentPrefetch = new AtomicInteger();
+    private AtomicInteger _unacknowledgedMessages = new AtomicInteger();
 
     /** Flow control */
     private FlowControlIndicator _flowControl = new FlowControlIndicator();
@@ -112,7 +112,7 @@ public class AMQSession_0_8 extends AMQS
     {
 
         super(con,channelId,transacted,acknowledgeMode, defaultPrefetchHighMark,defaultPrefetchLowMark);
-        _currentPrefetch.set(0);
+        _unacknowledgedMessages.set(0);
     }
 
 
@@ -126,11 +126,11 @@ public class AMQSession_0_8 extends AMQS
         boolean syncRequired = false;
         try
         {
-            reduceCreditAfterAcknowledge();
+            reduceCreditToOriginalSize();
         }
         catch (QpidException e)
         {
-            throw JMSExceptionHelper.chainJMSException(new JMSException("Session.reduceCreditAfterAcknowledge failed"),
+            throw JMSExceptionHelper.chainJMSException(new JMSException("Session.reduceCreditToOriginalSize failed"),
                                                        e);
         }
         while (true)
@@ -144,7 +144,7 @@ public class AMQSession_0_8 extends AMQS
             acknowledgeMessage(tag, false);
             syncRequired = true;
         }
-        _currentPrefetch.set(0);
+        _unacknowledgedMessages.set(0);
         try
         {
             if (syncRequired && getAMQConnection().getSyncClientAck())
@@ -267,9 +267,9 @@ public class AMQSession_0_8 extends AMQS
         }
 
         final AMQProtocolHandler handler = getProtocolHandler();
-        reduceCreditAfterAcknowledge();
+        reduceCreditToOriginalSize();
         handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class);
-        _currentPrefetch.set(0);
+        _unacknowledgedMessages.set(0);
     }
 
     public void sendCreateQueue(String name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws
@@ -308,7 +308,7 @@ public class AMQSession_0_8 extends AMQS
                 getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
             }
         }
-        _currentPrefetch.set(0);
+        _unacknowledgedMessages.set(0);
     }
 
     private void enforceRejectBehaviourDuringRecover()
@@ -810,13 +810,13 @@ public class AMQSession_0_8 extends AMQS
         TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
         AMQFrame frame = body.generateFrame(getChannelId());
         getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
-        _currentPrefetch.set(0);
+        _unacknowledgedMessages.set(0);
     }
 
     public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch)
             throws QpidException, FailoverException
     {
-        _currentPrefetch.set(0);
+        _unacknowledgedMessages.set(0);
         if(messagePrefetch > 0 || sizePrefetch > 0)
         {
             BasicQosBody basicQosBody =
@@ -835,7 +835,7 @@ public class AMQSession_0_8 extends AMQS
                 {
                     public Boolean execute() throws QpidException, FailoverException
                     {
-                        int currentPrefetch = _currentPrefetch.get();
+                        int currentPrefetch = _unacknowledgedMessages.get();
                         if (currentPrefetch >= getPrefetch() && getPrefetch() >= 0)
                         {
                             BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry()
@@ -859,7 +859,7 @@ public class AMQSession_0_8 extends AMQS
 
     }
 
-    protected void reduceCreditAfterAcknowledge() throws QpidException
+    protected void reduceCreditToOriginalSize() throws QpidException
     {
         boolean manageCredit = isManagingCredit();
 
@@ -890,27 +890,28 @@ public class AMQSession_0_8 extends AMQS
         }
     }
 
-    protected void reduceCreditInPostDeliver()
+    protected void stopFlowIfNeccessary()
     {
         int acknowledgeMode = getAcknowledgeMode();
-        boolean manageCredit = (acknowledgeMode == AUTO_ACKNOWLEDGE || acknowledgeMode == DUPS_OK_ACKNOWLEDGE) && getPrefetch() == 0;
+        boolean autoAckLike = (acknowledgeMode == AUTO_ACKNOWLEDGE || acknowledgeMode == DUPS_OK_ACKNOWLEDGE);
 
-        if(manageCredit && _creditChanged.compareAndSet(true,false))
+        if (autoAckLike && getPrefetch() == 0)
         {
-            ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(false);
-            AMQFrame channelFlowFrame = body.generateFrame(getChannelId());
-            getProtocolHandler().writeFrame(channelFlowFrame, true);
+            if (_creditChanged.compareAndSet(true,false))
+            {
+                ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(false);
+                AMQFrame channelFlowFrame = body.generateFrame(getChannelId());
+                getProtocolHandler().writeFrame(channelFlowFrame, true);
+            }
         }
     }
 
-
-    protected void updateCurrentPrefetch(int delta)
+    protected void incUnacknowledgedMessages()
     {
-        _currentPrefetch.addAndGet(delta);
+        _unacknowledgedMessages.incrementAndGet();
     }
 
 
-
     public DestinationCache<AMQQueue> getQueueDestinationCache()
     {
         return _queueDestinationCache;

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1760524&r1=1760523&r2=1760524&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Tue Sep 13 10:25:26 2016
@@ -191,12 +191,12 @@ public class BasicMessageConsumer_0_8 ex
             Message message = super.receive(l);
             if (creditModified && message == null)
             {
-                getSession().reduceCreditAfterAcknowledge();
+                getSession().reduceCreditToOriginalSize();
             }
             if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE
                                   || getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null)
             {
-                getSession().updateCurrentPrefetch(1);
+                getSession().incUnacknowledgedMessages();
             }
             return message;
         }
@@ -224,12 +224,12 @@ public class BasicMessageConsumer_0_8 ex
             Message message = super.receiveNoWait();
             if (creditModified && message == null)
             {
-                getSession().reduceCreditAfterAcknowledge();
+                getSession().reduceCreditToOriginalSize();
             }
             if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE
                                   || getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null)
             {
-                getSession().updateCurrentPrefetch(1);
+                getSession().incUnacknowledgedMessages();
             }
             return message;
         }
@@ -240,10 +240,10 @@ public class BasicMessageConsumer_0_8 ex
         }
     }
 
-
+    @Override
     void postDeliver(AbstractJMSMessage msg)
     {
-        getSession().reduceCreditInPostDeliver();
+        getSession().stopFlowIfNeccessary();
         super.postDeliver(msg);
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org