You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/09/08 17:37:20 UTC

svn commit: r1623422 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ systests/src/test/java/org/apache/qpid/client/prefetch/ test-profiles/

Author: rgodfrey
Date: Mon Sep  8 15:37:20 2014
New Revision: 1623422

URL: http://svn.apache.org/r1623422
Log:
QPID-6088 : [Java Client] AMQP 0-8/8/9-1 prefetch should auto expand when receive is called in a situation where the prefetch buffer is full

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
    qpid/trunk/qpid/java/test-profiles/Java010Excludes

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Sep  8 15:37:20 2014
@@ -67,13 +67,7 @@ import org.apache.qpid.client.protocol.A
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxSelectOkBody;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.Connection;
 import org.apache.qpid.jms.ConnectionListener;
@@ -696,36 +690,6 @@ public class AMQConnection extends Close
         }
     }
 
-    private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
-            throws AMQException, FailoverException
-    {
-
-        ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
-
-        // TODO: Be aware of possible changes to parameter order as versions change.
-
-        _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
-
-        BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
-
-        // todo send low water mark when protocol allows.
-        // todo Be aware of possible changes to parameter order as versions change.
-        _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
-
-        if (transacted)
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Issuing TxSelect for " + channelId);
-            }
-
-            TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
-
-            // TODO: Be aware of possible changes to parameter order as versions change.
-            _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
-        }
-    }
-
     public void setFailoverPolicy(FailoverPolicy policy)
     {
         _failoverPolicy = policy;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Mon Sep  8 15:37:20 2014
@@ -44,8 +44,6 @@ import org.apache.qpid.client.state.AMQS
 import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
 import org.apache.qpid.framing.ChannelOpenBody;
 import org.apache.qpid.framing.ChannelOpenOkBody;
 import org.apache.qpid.framing.FieldTable;
@@ -55,6 +53,7 @@ import org.apache.qpid.framing.TxSelectO
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ChannelLimitReachedException;
 import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.Session;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -182,10 +181,10 @@ public class AMQConnectionDelegate_8_0 i
             throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
         }
 
-        return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
-                new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
+        return new FailoverRetrySupport<Session, JMSException>(
+                new FailoverProtectedOperation<Session, JMSException>()
                 {
-                    public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
+                    public Session execute() throws JMSException, FailoverException
                     {
                         int channelId = _conn.getNextChannelID();
 
@@ -197,7 +196,7 @@ public class AMQConnectionDelegate_8_0 i
                         // We must create the session and register it before actually sending the frame to the server to
                         // open it, so that there is no window where we could receive data on the channel and not be set
                         // up to handle it appropriately.
-                        AMQSession session =
+                        AMQSession_0_8 session =
                                 new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh,
                                                prefetchLow);
                         _conn.registerSession(channelId, session);
@@ -205,7 +204,8 @@ public class AMQConnectionDelegate_8_0 i
                         boolean success = false;
                         try
                         {
-                            createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+                            createChannelOverWire(channelId, transacted);
+                            session.setPrefetchLimits(prefetchHigh, 0);
                             success = true;
                         }
                         catch (AMQException e)
@@ -252,18 +252,12 @@ public class AMQConnectionDelegate_8_0 i
         return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
     }
 
-    private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+    private void createChannelOverWire(int channelId, boolean transacted)
             throws AMQException, FailoverException
     {
         ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
-        // TODO: Be aware of possible changes to parameter order as versions change.
         _conn.getProtocolHandler().syncWrite(channelOpenBody.generateFrame(channelId),  ChannelOpenOkBody.class);
 
-        // todo send low water mark when protocol allows.
-        // todo Be aware of possible changes to parameter order as versions change.
-        BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
-        _conn.getProtocolHandler().syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
-
         if (transacted)
         {
             if (_logger.isDebugEnabled())
@@ -292,7 +286,7 @@ public class AMQConnectionDelegate_8_0 i
         _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
         for (Iterator it = sessions.iterator(); it.hasNext();)
         {
-            AMQSession s = (AMQSession) it.next();
+            AMQSession_0_8 s = (AMQSession_0_8) it.next();
 
             // reset the flow control flag
             // on opening channel, broker sends flow blocked if virtual host is blocked
@@ -300,7 +294,7 @@ public class AMQConnectionDelegate_8_0 i
             // that's why we need to reset the flow control flag
             s.setFlowControl(true);
             reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted());
-
+            s.setPrefetchLimits(s.getDefaultPrefetchHigh(), 0);
             s.resubscribe();
         }
     }
@@ -310,7 +304,7 @@ public class AMQConnectionDelegate_8_0 i
     {
         try
         {
-            createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+            createChannelOverWire(channelId, transacted);
         }
         catch (AMQException e)
         {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Mon Sep  8 15:37:20 2014
@@ -32,6 +32,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -93,6 +94,7 @@ public class AMQSession_0_8 extends AMQS
      */
     private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
                                                                   DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+    private AtomicInteger _currentPrefetch = new AtomicInteger();
 
     /** Flow control */
     private FlowControlIndicator _flowControl = new FlowControlIndicator();
@@ -112,7 +114,8 @@ public class AMQSession_0_8 extends AMQS
                              MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
 
-         super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
+        super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
+        _currentPrefetch.set(0);
     }
 
     /**
@@ -140,6 +143,14 @@ public class AMQSession_0_8 extends AMQS
     protected void acknowledgeImpl() throws JMSException
     {
         boolean syncRequired = false;
+        try
+        {
+            reduceCreditAfterAcknowledge();
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException(e);
+        }
         while (true)
         {
             Long tag = getUnacknowledgedMessageTags().poll();
@@ -151,7 +162,7 @@ public class AMQSession_0_8 extends AMQS
             acknowledgeMessage(tag, false);
             syncRequired = true;
         }
-
+        _currentPrefetch.set(0);
         try
         {
             if (syncRequired && _syncAfterClientAck)
@@ -262,8 +273,9 @@ public class AMQSession_0_8 extends AMQS
         }
 
         final AMQProtocolHandler handler = getProtocolHandler();
-
+        reduceCreditAfterAcknowledge();
         handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class);
+        _currentPrefetch.set(0);
     }
 
     public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
@@ -817,25 +829,77 @@ public class AMQSession_0_8 extends AMQS
         getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
     }
 
-    public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
+    public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch)
+            throws AMQException, FailoverException
     {
-        new FailoverRetrySupport<Object, AMQException>(
-                new FailoverProtectedOperation<Object, AMQException>()
+        _currentPrefetch.set(0);
+        BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+
+        // todo send low water mark when protocol allows.
+        // todo Be aware of possible changes to parameter order as versions change.
+        getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+
+    }
+
+
+
+    protected boolean ensureCreditForReceive() throws AMQException
+    {
+        return new FailoverNoopSupport<>(
+                new FailoverProtectedOperation<Boolean, AMQException>()
                 {
-                    public Object execute() throws AMQException, FailoverException
+                    public Boolean execute() throws AMQException, FailoverException
                     {
+                        int currentPrefetch = _currentPrefetch.get();
+                        if (currentPrefetch >= getPrefetch())
+                        {
+                            BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry()
+                                    .createBasicQosBody(0, currentPrefetch + 1, false);
 
-                        BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+                            getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
+                                                           BasicQosOkBody.class);
+                            return true;
+                        }
+                        else
+                        {
+                            return false;
+                        }
+                    }
+                }, getProtocolHandler().getConnection()).execute();
 
-                        // todo send low water mark when protocol allows.
-                        // todo Be aware of possible changes to parameter order as versions change.
-                        getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+    }
 
-                        return null;
-                    }
-                 }, getAMQConnection()).execute();
+    protected void reduceCreditAfterAcknowledge() throws AMQException
+    {
+        int acknowledgeMode = getAcknowledgeMode();
+        boolean manageCredit = acknowledgeMode == javax.jms.Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED;
+
+        if(manageCredit)
+        {
+            new FailoverNoopSupport<>(
+                    new FailoverProtectedOperation<Void, AMQException>()
+                    {
+                        public Void execute() throws AMQException, FailoverException
+                        {
+                            BasicQosBody basicQosBody =
+                                    getProtocolHandler().getMethodRegistry()
+                                            .createBasicQosBody(0, getPrefetch(), false);
+
+                            getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
+                                                           BasicQosOkBody.class);
+                            return null;
+                        }
+                    }, getProtocolHandler().getConnection()).execute();
+        }
     }
 
+    protected void updateCurrentPrefetch(int delta)
+    {
+        _currentPrefetch.addAndGet(delta);
+    }
+
+
+
     public DestinationCache<AMQQueue> getQueueDestinationCache()
     {
         return _queueDestinationCache;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Mon Sep  8 15:37:20 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.Session;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -166,4 +167,65 @@ public class BasicMessageConsumer_0_8 ex
     }
 
 
+    @Override
+    public Message receive(final long l) throws JMSException
+    {
+        int acknowledgeMode = getSession().getAcknowledgeMode();
+        boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED;
+        boolean creditModified = false;
+        try
+        {
+            if (manageCredit)
+            {
+                creditModified = getSession().ensureCreditForReceive();
+            }
+            Message message = super.receive(l);
+            if (creditModified && message == null)
+            {
+                getSession().reduceCreditAfterAcknowledge();
+            }
+            if (manageCredit && message != null)
+            {
+                getSession().updateCurrentPrefetch(1);
+            }
+            return message;
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException(e);
+        }
+    }
+
+    @Override
+    public Message receiveNoWait() throws JMSException
+    {
+        int acknowledgeMode = getSession().getAcknowledgeMode();
+        boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED;
+        boolean creditModified = false;
+        try
+        {
+            if (manageCredit)
+            {
+                creditModified = getSession().ensureCreditForReceive();
+                if (creditModified)
+                {
+                    getSession().sync();
+                }
+            }
+            Message message = super.receiveNoWait();
+            if (creditModified && message == null)
+            {
+                getSession().reduceCreditAfterAcknowledge();
+            }
+            if (manageCredit && message != null)
+            {
+                getSession().updateCurrentPrefetch(1);
+            }
+            return message;
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException(e);
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java Mon Sep  8 15:37:20 2014
@@ -20,11 +20,9 @@
 */
 package org.apache.qpid.client.prefetch;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -35,9 +33,12 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 
 public class PrefetchBehaviourTest extends QpidBrokerTestCase
@@ -231,5 +232,62 @@ public class PrefetchBehaviourTest exten
            assertNotNull("The second consumer should get 9 messages, but received only " + i,m);
         }
     }
+
+    public void testPrefetchWindowExpandsOnReceiveTransaction() throws Exception
+    {
+
+        _normalConnection.start();
+
+        //create a second connection with prefetch set to 1
+
+        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
+        Connection prefetch1Connection = getConnection();
+        Session consumerSession = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getTestQueueName()));
+
+
+        Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue queue = producerSession.createQueue(getTestQueueName());
+        MessageProducer producer = producerSession.createProducer(queue);
+
+        for (int i = 0; i < 5; i++)
+        {
+            producer.send(producerSession.createTextMessage("test"));
+        }
+        producerSession.commit();
+
+
+        prefetch1Connection.start();
+
+
+
+        Message message = consumer.receive(1000l);
+        assertNotNull(message);
+        message = consumer.receive(1000l);
+        assertNotNull(message);
+        message = consumer.receive(1000l);
+        assertNotNull(message);
+
+
+        Connection secondConsumerConnection = getConnection();
+        Session secondConsumerSession = secondConsumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer secondConsumer = secondConsumerSession.createConsumer(consumerSession.createQueue(getTestQueueName()));
+        secondConsumerConnection.start();
+
+        message = secondConsumer.receive(1000l);
+        assertNotNull(message);
+
+        message = secondConsumer.receive(1000l);
+        assertNotNull(message);
+
+        consumerSession.commit();
+        secondConsumerSession.commit();
+
+        message = consumer.receive(1000l);
+        assertNull(message);
+
+    }
+
+
 }
 

Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1623422&r1=1623421&r2=1623422&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Mon Sep  8 15:37:20 2014
@@ -74,3 +74,5 @@ org.apache.qpid.server.queue.QueueBindTe
 org.apache.qpid.systest.management.jmx.QueueManagementTest#testExclusiveQueueHasJmsClientIdAsOwner
 
 org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091
+
+org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org