You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/09/08 17:37:20 UTC
svn commit: r1623422 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
systests/src/test/java/org/apache/qpid/client/prefetch/ test-profiles/
Author: rgodfrey
Date: Mon Sep 8 15:37:20 2014
New Revision: 1623422
URL: http://svn.apache.org/r1623422
Log:
QPID-6088 : [Java Client] AMQP 0-8/8/9-1 prefetch should auto expand when receive is called in a situation where the prefetch buffer is full
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
qpid/trunk/qpid/java/test-profiles/Java010Excludes
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Sep 8 15:37:20 2014
@@ -67,13 +67,7 @@ import org.apache.qpid.client.protocol.A
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Connection;
import org.apache.qpid.jms.ConnectionListener;
@@ -696,36 +690,6 @@ public class AMQConnection extends Close
}
}
- private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException, FailoverException
- {
-
- ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
-
- // TODO: Be aware of possible changes to parameter order as versions change.
-
- _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
-
- BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
-
- // todo send low water mark when protocol allows.
- // todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
-
- if (transacted)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Issuing TxSelect for " + channelId);
- }
-
- TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
-
- // TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
- }
- }
-
public void setFailoverPolicy(FailoverPolicy policy)
{
_failoverPolicy = policy;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Mon Sep 8 15:37:20 2014
@@ -44,8 +44,6 @@ import org.apache.qpid.client.state.AMQS
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.FieldTable;
@@ -55,6 +53,7 @@ import org.apache.qpid.framing.TxSelectO
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.Session;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -182,10 +181,10 @@ public class AMQConnectionDelegate_8_0 i
throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
}
- return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
- new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
+ return new FailoverRetrySupport<Session, JMSException>(
+ new FailoverProtectedOperation<Session, JMSException>()
{
- public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
+ public Session execute() throws JMSException, FailoverException
{
int channelId = _conn.getNextChannelID();
@@ -197,7 +196,7 @@ public class AMQConnectionDelegate_8_0 i
// We must create the session and register it before actually sending the frame to the server to
// open it, so that there is no window where we could receive data on the channel and not be set
// up to handle it appropriately.
- AMQSession session =
+ AMQSession_0_8 session =
new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh,
prefetchLow);
_conn.registerSession(channelId, session);
@@ -205,7 +204,8 @@ public class AMQConnectionDelegate_8_0 i
boolean success = false;
try
{
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ createChannelOverWire(channelId, transacted);
+ session.setPrefetchLimits(prefetchHigh, 0);
success = true;
}
catch (AMQException e)
@@ -252,18 +252,12 @@ public class AMQConnectionDelegate_8_0 i
return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
}
- private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+ private void createChannelOverWire(int channelId, boolean transacted)
throws AMQException, FailoverException
{
ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
- // TODO: Be aware of possible changes to parameter order as versions change.
_conn.getProtocolHandler().syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
- // todo send low water mark when protocol allows.
- // todo Be aware of possible changes to parameter order as versions change.
- BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
- _conn.getProtocolHandler().syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
-
if (transacted)
{
if (_logger.isDebugEnabled())
@@ -292,7 +286,7 @@ public class AMQConnectionDelegate_8_0 i
_logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
for (Iterator it = sessions.iterator(); it.hasNext();)
{
- AMQSession s = (AMQSession) it.next();
+ AMQSession_0_8 s = (AMQSession_0_8) it.next();
// reset the flow control flag
// on opening channel, broker sends flow blocked if virtual host is blocked
@@ -300,7 +294,7 @@ public class AMQConnectionDelegate_8_0 i
// that's why we need to reset the flow control flag
s.setFlowControl(true);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted());
-
+ s.setPrefetchLimits(s.getDefaultPrefetchHigh(), 0);
s.resubscribe();
}
}
@@ -310,7 +304,7 @@ public class AMQConnectionDelegate_8_0 i
{
try
{
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ createChannelOverWire(channelId, transacted);
}
catch (AMQException e)
{
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Mon Sep 8 15:37:20 2014
@@ -32,6 +32,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -93,6 +94,7 @@ public class AMQSession_0_8 extends AMQS
*/
private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+ private AtomicInteger _currentPrefetch = new AtomicInteger();
/** Flow control */
private FlowControlIndicator _flowControl = new FlowControlIndicator();
@@ -112,7 +114,8 @@ public class AMQSession_0_8 extends AMQS
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
- super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
+ super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
+ _currentPrefetch.set(0);
}
/**
@@ -140,6 +143,14 @@ public class AMQSession_0_8 extends AMQS
protected void acknowledgeImpl() throws JMSException
{
boolean syncRequired = false;
+ try
+ {
+ reduceCreditAfterAcknowledge();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
while (true)
{
Long tag = getUnacknowledgedMessageTags().poll();
@@ -151,7 +162,7 @@ public class AMQSession_0_8 extends AMQS
acknowledgeMessage(tag, false);
syncRequired = true;
}
-
+ _currentPrefetch.set(0);
try
{
if (syncRequired && _syncAfterClientAck)
@@ -262,8 +273,9 @@ public class AMQSession_0_8 extends AMQS
}
final AMQProtocolHandler handler = getProtocolHandler();
-
+ reduceCreditAfterAcknowledge();
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class);
+ _currentPrefetch.set(0);
}
public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
@@ -817,25 +829,77 @@ public class AMQSession_0_8 extends AMQS
getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
}
- public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
+ public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch)
+ throws AMQException, FailoverException
{
- new FailoverRetrySupport<Object, AMQException>(
- new FailoverProtectedOperation<Object, AMQException>()
+ _currentPrefetch.set(0);
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+
+ }
+
+
+
+ protected boolean ensureCreditForReceive() throws AMQException
+ {
+ return new FailoverNoopSupport<>(
+ new FailoverProtectedOperation<Boolean, AMQException>()
{
- public Object execute() throws AMQException, FailoverException
+ public Boolean execute() throws AMQException, FailoverException
{
+ int currentPrefetch = _currentPrefetch.get();
+ if (currentPrefetch >= getPrefetch())
+ {
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry()
+ .createBasicQosBody(0, currentPrefetch + 1, false);
- BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
+ BasicQosOkBody.class);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }, getProtocolHandler().getConnection()).execute();
- // todo send low water mark when protocol allows.
- // todo Be aware of possible changes to parameter order as versions change.
- getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+ }
- return null;
- }
- }, getAMQConnection()).execute();
+ protected void reduceCreditAfterAcknowledge() throws AMQException
+ {
+ int acknowledgeMode = getAcknowledgeMode();
+ boolean manageCredit = acknowledgeMode == javax.jms.Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED;
+
+ if(manageCredit)
+ {
+ new FailoverNoopSupport<>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+ BasicQosBody basicQosBody =
+ getProtocolHandler().getMethodRegistry()
+ .createBasicQosBody(0, getPrefetch(), false);
+
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
+ BasicQosOkBody.class);
+ return null;
+ }
+ }, getProtocolHandler().getConnection()).execute();
+ }
}
+ protected void updateCurrentPrefetch(int delta)
+ {
+ _currentPrefetch.addAndGet(delta);
+ }
+
+
+
public DestinationCache<AMQQueue> getQueueDestinationCache()
{
return _queueDestinationCache;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Mon Sep 8 15:37:20 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,4 +167,65 @@ public class BasicMessageConsumer_0_8 ex
}
+ @Override
+ public Message receive(final long l) throws JMSException
+ {
+ int acknowledgeMode = getSession().getAcknowledgeMode();
+ boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED;
+ boolean creditModified = false;
+ try
+ {
+ if (manageCredit)
+ {
+ creditModified = getSession().ensureCreditForReceive();
+ }
+ Message message = super.receive(l);
+ if (creditModified && message == null)
+ {
+ getSession().reduceCreditAfterAcknowledge();
+ }
+ if (manageCredit && message != null)
+ {
+ getSession().updateCurrentPrefetch(1);
+ }
+ return message;
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ @Override
+ public Message receiveNoWait() throws JMSException
+ {
+ int acknowledgeMode = getSession().getAcknowledgeMode();
+ boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED;
+ boolean creditModified = false;
+ try
+ {
+ if (manageCredit)
+ {
+ creditModified = getSession().ensureCreditForReceive();
+ if (creditModified)
+ {
+ getSession().sync();
+ }
+ }
+ Message message = super.receiveNoWait();
+ if (creditModified && message == null)
+ {
+ getSession().reduceCreditAfterAcknowledge();
+ }
+ if (manageCredit && message != null)
+ {
+ getSession().updateCurrentPrefetch(1);
+ }
+ return message;
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
}
Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java Mon Sep 8 15:37:20 2014
@@ -20,11 +20,9 @@
*/
package org.apache.qpid.client.prefetch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -35,9 +33,12 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class PrefetchBehaviourTest extends QpidBrokerTestCase
@@ -231,5 +232,62 @@ public class PrefetchBehaviourTest exten
assertNotNull("The second consumer should get 9 messages, but received only " + i,m);
}
}
+
+ public void testPrefetchWindowExpandsOnReceiveTransaction() throws Exception
+ {
+
+ _normalConnection.start();
+
+ //create a second connection with prefetch set to 1
+
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
+ Connection prefetch1Connection = getConnection();
+ Session consumerSession = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getTestQueueName()));
+
+
+ Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = producerSession.createQueue(getTestQueueName());
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int i = 0; i < 5; i++)
+ {
+ producer.send(producerSession.createTextMessage("test"));
+ }
+ producerSession.commit();
+
+
+ prefetch1Connection.start();
+
+
+
+ Message message = consumer.receive(1000l);
+ assertNotNull(message);
+ message = consumer.receive(1000l);
+ assertNotNull(message);
+ message = consumer.receive(1000l);
+ assertNotNull(message);
+
+
+ Connection secondConsumerConnection = getConnection();
+ Session secondConsumerSession = secondConsumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer secondConsumer = secondConsumerSession.createConsumer(consumerSession.createQueue(getTestQueueName()));
+ secondConsumerConnection.start();
+
+ message = secondConsumer.receive(1000l);
+ assertNotNull(message);
+
+ message = secondConsumer.receive(1000l);
+ assertNotNull(message);
+
+ consumerSession.commit();
+ secondConsumerSession.commit();
+
+ message = consumer.receive(1000l);
+ assertNull(message);
+
+ }
+
+
}
Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Mon Sep 8 15:37:20 2014
@@ -74,3 +74,5 @@ org.apache.qpid.server.queue.QueueBindTe
org.apache.qpid.systest.management.jmx.QueueManagementTest#testExclusiveQueueHasJmsClientIdAsOwner
org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091
+
+org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org