You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/09/13 10:25:26 UTC
svn commit: r1760524 - in
/qpid/java/trunk/client/src/main/java/org/apache/qpid/client:
AMQSession_0_8.java BasicMessageConsumer_0_8.java
Author: lquack
Date: Tue Sep 13 10:25:26 2016
New Revision: 1760524
URL: http://svn.apache.org/viewvc?rev=1760524&view=rev
Log:
QPID-7426: [Java Client] Improve naming of methods/names in AMQSession_0_8
No functional changes.
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1760524&r1=1760523&r2=1760524&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Sep 13 10:25:26 2016
@@ -88,7 +88,7 @@ public class AMQSession_0_8 extends AMQS
*/
private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
- private AtomicInteger _currentPrefetch = new AtomicInteger();
+ private AtomicInteger _unacknowledgedMessages = new AtomicInteger();
/** Flow control */
private FlowControlIndicator _flowControl = new FlowControlIndicator();
@@ -112,7 +112,7 @@ public class AMQSession_0_8 extends AMQS
{
super(con,channelId,transacted,acknowledgeMode, defaultPrefetchHighMark,defaultPrefetchLowMark);
- _currentPrefetch.set(0);
+ _unacknowledgedMessages.set(0);
}
@@ -126,11 +126,11 @@ public class AMQSession_0_8 extends AMQS
boolean syncRequired = false;
try
{
- reduceCreditAfterAcknowledge();
+ reduceCreditToOriginalSize();
}
catch (QpidException e)
{
- throw JMSExceptionHelper.chainJMSException(new JMSException("Session.reduceCreditAfterAcknowledge failed"),
+ throw JMSExceptionHelper.chainJMSException(new JMSException("Session.reduceCreditToOriginalSize failed"),
e);
}
while (true)
@@ -144,7 +144,7 @@ public class AMQSession_0_8 extends AMQS
acknowledgeMessage(tag, false);
syncRequired = true;
}
- _currentPrefetch.set(0);
+ _unacknowledgedMessages.set(0);
try
{
if (syncRequired && getAMQConnection().getSyncClientAck())
@@ -267,9 +267,9 @@ public class AMQSession_0_8 extends AMQS
}
final AMQProtocolHandler handler = getProtocolHandler();
- reduceCreditAfterAcknowledge();
+ reduceCreditToOriginalSize();
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class);
- _currentPrefetch.set(0);
+ _unacknowledgedMessages.set(0);
}
public void sendCreateQueue(String name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws
@@ -308,7 +308,7 @@ public class AMQSession_0_8 extends AMQS
getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
}
}
- _currentPrefetch.set(0);
+ _unacknowledgedMessages.set(0);
}
private void enforceRejectBehaviourDuringRecover()
@@ -810,13 +810,13 @@ public class AMQSession_0_8 extends AMQS
TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
AMQFrame frame = body.generateFrame(getChannelId());
getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
- _currentPrefetch.set(0);
+ _unacknowledgedMessages.set(0);
}
public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch)
throws QpidException, FailoverException
{
- _currentPrefetch.set(0);
+ _unacknowledgedMessages.set(0);
if(messagePrefetch > 0 || sizePrefetch > 0)
{
BasicQosBody basicQosBody =
@@ -835,7 +835,7 @@ public class AMQSession_0_8 extends AMQS
{
public Boolean execute() throws QpidException, FailoverException
{
- int currentPrefetch = _currentPrefetch.get();
+ int currentPrefetch = _unacknowledgedMessages.get();
if (currentPrefetch >= getPrefetch() && getPrefetch() >= 0)
{
BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry()
@@ -859,7 +859,7 @@ public class AMQSession_0_8 extends AMQS
}
- protected void reduceCreditAfterAcknowledge() throws QpidException
+ protected void reduceCreditToOriginalSize() throws QpidException
{
boolean manageCredit = isManagingCredit();
@@ -890,27 +890,28 @@ public class AMQSession_0_8 extends AMQS
}
}
- protected void reduceCreditInPostDeliver()
+ protected void stopFlowIfNeccessary()
{
int acknowledgeMode = getAcknowledgeMode();
- boolean manageCredit = (acknowledgeMode == AUTO_ACKNOWLEDGE || acknowledgeMode == DUPS_OK_ACKNOWLEDGE) && getPrefetch() == 0;
+ boolean autoAckLike = (acknowledgeMode == AUTO_ACKNOWLEDGE || acknowledgeMode == DUPS_OK_ACKNOWLEDGE);
- if(manageCredit && _creditChanged.compareAndSet(true,false))
+ if (autoAckLike && getPrefetch() == 0)
{
- ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(false);
- AMQFrame channelFlowFrame = body.generateFrame(getChannelId());
- getProtocolHandler().writeFrame(channelFlowFrame, true);
+ if (_creditChanged.compareAndSet(true,false))
+ {
+ ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(false);
+ AMQFrame channelFlowFrame = body.generateFrame(getChannelId());
+ getProtocolHandler().writeFrame(channelFlowFrame, true);
+ }
}
}
-
- protected void updateCurrentPrefetch(int delta)
+ protected void incUnacknowledgedMessages()
{
- _currentPrefetch.addAndGet(delta);
+ _unacknowledgedMessages.incrementAndGet();
}
-
public DestinationCache<AMQQueue> getQueueDestinationCache()
{
return _queueDestinationCache;
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1760524&r1=1760523&r2=1760524&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Tue Sep 13 10:25:26 2016
@@ -191,12 +191,12 @@ public class BasicMessageConsumer_0_8 ex
Message message = super.receive(l);
if (creditModified && message == null)
{
- getSession().reduceCreditAfterAcknowledge();
+ getSession().reduceCreditToOriginalSize();
}
if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE
|| getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null)
{
- getSession().updateCurrentPrefetch(1);
+ getSession().incUnacknowledgedMessages();
}
return message;
}
@@ -224,12 +224,12 @@ public class BasicMessageConsumer_0_8 ex
Message message = super.receiveNoWait();
if (creditModified && message == null)
{
- getSession().reduceCreditAfterAcknowledge();
+ getSession().reduceCreditToOriginalSize();
}
if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE
|| getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null)
{
- getSession().updateCurrentPrefetch(1);
+ getSession().incUnacknowledgedMessages();
}
return message;
}
@@ -240,10 +240,10 @@ public class BasicMessageConsumer_0_8 ex
}
}
-
+ @Override
void postDeliver(AbstractJMSMessage msg)
{
- getSession().reduceCreditInPostDeliver();
+ getSession().stopFlowIfNeccessary();
super.postDeliver(msg);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org