You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/10/10 12:22:23 UTC

svn commit: r703383 - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/configuration/ systests/src/main/java/org/apache/qpid/server/exchange/ systests/src/main/java/org/apache/...

Author: aidan
Date: Fri Oct 10 03:22:21 2008
New Revision: 703383

URL: http://svn.apache.org/viewvc?rev=703383&view=rev
Log:
QPID-1289:  Make 0-8/0-9 client honour the max_preftech system property.

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=703383&r1=703382&r2=703383&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Oct 10 03:22:21 2008
@@ -250,7 +250,7 @@
     protected AMQConnectionDelegate _delegate;
 
     // this connection maximum number of prefetched messages
-    private long _maxPrefetch;
+    protected int _maxPrefetch;
 
     //Indicates whether persistent messages are synchronized
     private boolean _syncPersistence;
@@ -337,13 +337,13 @@
         // set this connection maxPrefetch
         if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
         {
-            _maxPrefetch = Long.parseLong(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+            _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
         }
         else
         {
             // use the defaul value set for all connections
-            _maxPrefetch = Long.valueOf(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
-                                                                           ClientProperties.MAX_PREFETCH_DEFAULT));
+            _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
+                                                                               ClientProperties.MAX_PREFETCH_DEFAULT));
         }
 
         if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
@@ -653,7 +653,7 @@
 
     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
     {
-        return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
+        return createSession(transacted, acknowledgeMode, _maxPrefetch);
     }
 
     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=703383&r1=703382&r2=703383&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Oct 10 03:22:21 2008
@@ -181,13 +181,6 @@
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
-
-    /** The default maximum number of prefetched message at which to suspend the channel. */
-    public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
-
-    /** The default minimum number of prefetched messages at which to resume the channel. */
-    public static final int DEFAULT_PREFETCH_LOW_MARK = 2500;
-
     /**
      * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
      * not need to be attached to a queue.
@@ -233,10 +226,10 @@
     private int _ticket;
 
     /** Holds the high mark for prefetched message, at which the session is suspended. */
-    private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+    private int _defaultPrefetchHighMark;
 
     /** Holds the low mark for prefetched messages, below which the session is resumed. */
-    private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
+    private int _defaultPrefetchLowMark;
 
     /** Holds the message listener, if any, which is attached to this session. */
     private MessageListener _messageListener = null;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java?rev=703383&r1=703382&r2=703383&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java Fri Oct 10 03:22:21 2008
@@ -47,7 +47,7 @@
     public synchronized XASession createXASession() throws JMSException
     {
         checkNotClosed();
-        return _delegate.createXASession(AMQSession.DEFAULT_PREFETCH_HIGH_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
+        return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2);
     }
 
     //-- Interface  XAQueueConnection

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=703383&r1=703382&r2=703383&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java Fri Oct 10 03:22:21 2008
@@ -39,7 +39,7 @@
      * type: long
      */
     public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch";
-    public static final String MAX_PREFETCH_DEFAULT = "1000";
+    public static final String MAX_PREFETCH_DEFAULT = "5000";
 
     /**
      * When true a sync command is sent after every persistent messages.

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?rev=703383&r1=703382&r2=703383&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Fri Oct 10 03:22:21 2008
@@ -26,6 +26,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.util.NullApplicationRegistry;
 import org.apache.qpid.client.*;
+import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
@@ -95,7 +96,7 @@
             queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
             FieldTable ft = new FieldTable();
             ft.setString("F1000", "1");
-            consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
+            consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT) /2 ,  false, false, (String) null, ft);
 
             //force synch to ensure the consumer has resulted in a bound queue
             //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java?rev=703383&r1=703382&r2=703383&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Fri Oct 10 03:22:21 2008
@@ -21,13 +21,19 @@
 package org.apache.qpid.test.unit.client;
 
 import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.TopicSession;
 
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -187,6 +193,57 @@
         }
     }
 
+    public void testPrefetchSystemProperty() throws Exception
+    {
+        String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME);
+        try
+        {
+            _connection.close();
+            System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
+            _connection = (AMQConnection) getConnection();
+            _connection.start();
+            // Create two consumers on different sessions
+            Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumerA = consSessA.createConsumer(_queue);
+
+            Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = producerSession.createProducer(_queue);
+
+            // Send 3 messages
+            for (int i = 0; i < 3; i++)
+            {
+                producer.send(producerSession.createTextMessage(new Integer(i).toString()));
+            }
+            Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumerB = consSessB.createConsumer(_queue);
+
+            Message msg;
+            // Check that one consumer has 2 messages
+            for (int i = 0; i < 2; i++)
+            {
+                msg = consumerA.receive(1500);
+                assertNotNull(msg);
+                assertEquals(new Integer(i).toString(), ((TextMessage) msg).getText());
+            }
+            
+            msg = consumerA.receive(1500);
+            assertNull(msg);
+            
+            // Check that other consumer has last message
+            msg = consumerB.receive(1500);
+            assertNotNull(msg);
+            assertEquals(new Integer(2).toString(), ((TextMessage) msg).getText());
+        }
+        finally
+        {
+            if (oldPrefetch == null)
+            {
+                oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT;
+            }
+            System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch);
+        }
+    }
+    
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(AMQConnectionTest.class);

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?rev=703383&r1=703382&r2=703383&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Fri Oct 10 03:22:21 2008
@@ -24,6 +24,7 @@
 import org.apache.qpid.client.AMQHeadersExchange;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -76,8 +77,7 @@
         FieldTable ft = new FieldTable();
         ft.setString("F1000", "1");
         MessageConsumer consumer =
-            consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK,
-                AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
+            consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft);
 
         // force synch to ensure the consumer has resulted in a bound queue
         // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);