You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/04/21 17:23:19 UTC

svn commit: r767185 - in /qpid/trunk/qpid/java: ./ client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/unit/client/

Author: aidan
Date: Tue Apr 21 15:23:17 2009
New Revision: 767185

URL: http://svn.apache.org/viewvc?rev=767185&view=rev
Log:
QPID-1823: Allow recycling of channel IDs

AMQConnection.getNextChannelID: add method to abstract channel id assignment, allow max to be set
AMQConnectionDelegate*: add getMaxChannelID
AMQConnectionDelegate_0_10: use getNextChannelID for this session-id
SessionCreateTest: add test that attempts to create 65555 sessions on one connection
AMQConnectionTest: add unit test for getNextChannelID

SessionCreateTest takes a long, long time to run so is excluded by default

Added:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java
Modified:
    qpid/trunk/qpid/java/010ExcludeList
    qpid/trunk/qpid/java/08ExcludeList
    qpid/trunk/qpid/java/08ExcludeList-nonvm
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java

Modified: qpid/trunk/qpid/java/010ExcludeList
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/010ExcludeList?rev=767185&r1=767184&r2=767185&view=diff
==============================================================================
--- qpid/trunk/qpid/java/010ExcludeList (original)
+++ qpid/trunk/qpid/java/010ExcludeList Tue Apr 21 15:23:17 2009
@@ -69,3 +69,7 @@
 
 //QPID-1818 : 0-10 Client code path does not correctly restore a transacted session after failover.
 org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
+
+// QPID-1823: this takes ages to run
+org.apache.qpid.client.SessionCreateTest#*
+

Modified: qpid/trunk/qpid/java/08ExcludeList
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/08ExcludeList?rev=767185&r1=767184&r2=767185&view=diff
==============================================================================
--- qpid/trunk/qpid/java/08ExcludeList (original)
+++ qpid/trunk/qpid/java/08ExcludeList Tue Apr 21 15:23:17 2009
@@ -10,3 +10,6 @@
 
 //QPID-1818 : Client code path does not correctly restore a transacted session after failover.
 org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
+
+// QPID-1823: this takes ages to run
+org.apache.qpid.client.SessionCreateTest#*

Modified: qpid/trunk/qpid/java/08ExcludeList-nonvm
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/08ExcludeList-nonvm?rev=767185&r1=767184&r2=767185&view=diff
==============================================================================
--- qpid/trunk/qpid/java/08ExcludeList-nonvm (original)
+++ qpid/trunk/qpid/java/08ExcludeList-nonvm Tue Apr 21 15:23:17 2009
@@ -36,3 +36,6 @@
 org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#*
 //QPID-1818 : Client code path does not correctly restore a transacted session after failover.
 org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
+// QPID-1823: this takes ages to run
+org.apache.qpid.client.SessionCreateTest#*
+

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=767185&r1=767184&r2=767185&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Apr 21 15:23:17 2009
@@ -90,6 +90,9 @@
         private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
         private int _size = 0;
         private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+        private AtomicInteger _idFactory = new AtomicInteger(0);
+        private int _maxChannelID;
+        private boolean _cycledIds;
 
         public AMQSession get(int channelId)
         {
@@ -179,11 +182,57 @@
                 _fastAccessSessions[i] = null;
             }
         }
+
+        /*
+         * Synchronized on whole method so that we don't need to consider the
+         * increment-then-reset path in too much detail
+         */
+        public synchronized int getNextChannelId()
+        {
+            int id = 0;
+            if (!_cycledIds)
+            {
+                id = _idFactory.incrementAndGet();
+                if (id == _maxChannelID)
+                {
+                    _cycledIds = true;
+                    _idFactory.set(0); // Go back to the start
+                }
+            }
+            else
+            {
+                boolean done = false;
+                while (!done)
+                {
+                    // Needs to work second time through
+                    id = _idFactory.incrementAndGet();
+                    if (id > _maxChannelID)
+                    {
+                        _idFactory.set(0);
+                        id = _idFactory.incrementAndGet();
+                    }
+                    if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
+                    {
+                        done = (_fastAccessSessions[id] == null);
+                    } 
+                    else
+                    {
+                        done = (!_slowAccessSessions.keySet().contains(id));
+                    }
+                }
+            }
+             
+            return id;
+        }
+
+        public void setMaxChannelID(int maxChannelID)
+        {
+            _maxChannelID = maxChannelID;
+        }
     }
 
     private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
 
-    protected AtomicInteger _idFactory = new AtomicInteger(0);
 
     /**
      * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
@@ -415,6 +464,7 @@
         {
             _delegate = new AMQConnectionDelegate_0_10(this);
         }
+        _sessions.setMaxChannelID(_delegate.getMaxChannelID());
 
         if (_logger.isInfoEnabled())
         {
@@ -567,6 +617,7 @@
             Class partypes[] = new Class[1];
             partypes[0] = AMQConnection.class;
             _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
+            _sessions.setMaxChannelID(_delegate.getMaxChannelID());
         }
         catch (ClassNotFoundException e)
         {
@@ -1395,7 +1446,7 @@
         _sessions.put(channelId, session);
     }
 
-    void deregisterSession(int channelId)
+    public void deregisterSession(int channelId)
     {
         _sessions.remove(channelId);
     }
@@ -1540,4 +1591,9 @@
     {
         _delegate.setIdleTimeout(l);
     }
+
+    public int getNextChannelID()
+    {
+        return _sessions.getNextChannelId();
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=767185&r1=767184&r2=767185&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Tue Apr 21 15:23:17 2009
@@ -50,4 +50,6 @@
     <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
     
     void setIdleTimeout(long l);
+    
+    int getMaxChannelID();
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=767185&r1=767184&r2=767185&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Tue Apr 21 15:23:17 2009
@@ -79,7 +79,7 @@
             throws JMSException
     {
         _conn.checkNotClosed();
-        int channelId = _conn._idFactory.incrementAndGet();
+        int channelId = _conn.getNextChannelID();
         AMQSession session;
         try
         {
@@ -105,7 +105,7 @@
     public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
     {
         _conn.checkNotClosed();
-        int channelId = _conn._idFactory.incrementAndGet();
+        int channelId = _conn.getNextChannelID();
         XASessionImpl session;
         try
         {
@@ -284,4 +284,10 @@
     {
         _qpidConnection.setIdleTimeout(l);
     }
+
+    @Override
+    public int getMaxChannelID()
+    {
+       return Integer.MAX_VALUE;
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=767185&r1=767184&r2=767185&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Tue Apr 21 15:23:17 2009
@@ -138,7 +138,7 @@
                 {
                     public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
                     {
-                        int channelId = _conn._idFactory.incrementAndGet();
+                        int channelId = _conn.getNextChannelID();
 
                         if (_logger.isDebugEnabled())
                         {
@@ -289,4 +289,10 @@
     }
     
     public void setIdleTimeout(long l){}
+
+    @Override
+    public int getMaxChannelID()
+    {
+        return (int) (Math.pow(2, 16)-1);
+    }
 }

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java?rev=767185&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java Tue Apr 21 15:23:17 2009
@@ -0,0 +1,63 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.naming.Context;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to check that session creation on a connection has no accidental limit
+ */
+public class SessionCreateTest extends QpidTestCase
+{
+    private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class);
+
+    Context _context;
+
+    private Connection _clientConnection;
+    protected int maxSessions = 65555;
+
+    public void testSessionCreationLimit() throws Exception
+    {
+        // Create Client
+        _clientConnection = getConnection("guest", "guest");
+
+        _clientConnection.start();
+
+        for (int i=0; i < maxSessions; i++)
+        {
+            Session sess = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            assertNotNull(sess);
+            sess.close();
+            System.out.println("created session: " + i); 
+        }
+
+        _clientConnection.close();
+
+    }
+
+}
\ No newline at end of file

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java?rev=767185&r1=767184&r2=767185&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Tue Apr 21 15:23:17 2009
@@ -30,6 +30,7 @@
 import javax.jms.TopicSession;
 
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionDelegate_0_10;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
@@ -244,6 +245,24 @@
         }
     }
     
+    public void testGetChannelID()
+    {
+        int maxChannelID = 65536;
+        if (isBroker010())
+        {
+            maxChannelID = Integer.MAX_VALUE+1;
+        }
+        for (int j = 0; j < 3; j++)
+        {
+            for (int i = 1; i < maxChannelID; i++)
+            {
+                int id = _connection.getNextChannelID();
+                assertEquals("On iterartion "+j, i, id);
+                _connection.deregisterSession(id);
+            }
+        }
+    }
+    
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(AMQConnectionTest.class);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org