You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/07/23 00:31:18 UTC

svn commit: r1612716 - /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java

Author: kwall
Date: Tue Jul 22 22:31:17 2014
New Revision: 1612716

URL: http://svn.apache.org/r1612716
Log:
QPID-5796: [Java Broker] Prevent possibility of AOOBE if connection registry is closed at the same time as closes are received

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1612716&r1=1612715&r2=1612716&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Tue Jul 22 22:31:17 2014
@@ -20,10 +20,14 @@
  */
 package org.apache.qpid.server.connection;
 
+import static java.util.Collections.newSetFromMap;
+
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
 
@@ -32,85 +36,86 @@ import org.apache.qpid.server.protocol.A
 
 public class ConnectionRegistry implements IConnectionRegistry
 {
-    private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
-
     private Logger _logger = Logger.getLogger(ConnectionRegistry.class);
-    private final Collection<RegistryChangeListener> _listeners =
-            new ArrayList<RegistryChangeListener>();
 
+    private final Set<AMQConnectionModel> _registry = newSetFromMap(new ConcurrentHashMap<AMQConnectionModel, Boolean>());
+    private final Collection<RegistryChangeListener> _listeners = new ArrayList<>();
+
+    @Override
     public void initialise()
     {
         // None required
     }
 
     /** Close all of the currently open connections. */
+    @Override
     public void close()
     {
         close(IConnectionRegistry.BROKER_SHUTDOWN_REPLY_TEXT);
     }
 
+    @Override
     public void close(final String replyText)
     {
-        synchronized(this)
-        {
-            for(AMQConnectionModel conn : _registry)
-            {
-                conn.stop();
-            }
-        }
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
         }
-        while (!_registry.isEmpty())
+        for(AMQConnectionModel conn : _registry)
         {
-            AMQConnectionModel connection = _registry.get(0);
+            conn.stop();
+        }
 
-            try
-            {
-                connection.close(AMQConstant.CONNECTION_FORCED, replyText);
-            }
-            catch (Exception e)
+        while (!_registry.isEmpty())
+        {
+            Iterator<AMQConnectionModel> itr = _registry.iterator();
+            while(itr.hasNext())
             {
-                //remove this connection to ensure that we don't loop forever if it fails to close
-                _registry.remove(connection);
-
-                _logger.warn("Exception closing connection " + connection.getConnectionId() + " from " + connection.getRemoteAddressString(), e);
+                AMQConnectionModel connection = itr.next();
+                try
+                {
+                    connection.close(AMQConstant.CONNECTION_FORCED, replyText);
+                }
+                catch (Exception e)
+                {
+                    _logger.warn("Exception closing connection " + connection.getConnectionId() + " from " + connection.getRemoteAddressString(), e);
+                }
+                finally
+                {
+                    itr.remove();
+                }
             }
         }
     }
 
+    @Override
     public void registerConnection(AMQConnectionModel connection)
     {
-        synchronized (this)
+        _registry.add(connection);
+        synchronized (_listeners)
         {
-            _registry.add(connection);
-            synchronized (_listeners)
+            for(RegistryChangeListener listener : _listeners)
             {
-                for(RegistryChangeListener listener : _listeners)
-                {
-                    listener.connectionRegistered(connection);
-                }
+                listener.connectionRegistered(connection);
             }
         }
     }
 
+    @Override
     public void deregisterConnection(AMQConnectionModel connection)
     {
-        synchronized (this)
-        {
-            _registry.remove(connection);
+        _registry.remove(connection);
 
-            synchronized (_listeners)
+        synchronized (_listeners)
+        {
+            for(RegistryChangeListener listener : _listeners)
             {
-                for(RegistryChangeListener listener : _listeners)
-                {
-                    listener.connectionUnregistered(connection);
-                }
+                listener.connectionUnregistered(connection);
             }
         }
     }
 
+    @Override
     public void addRegistryChangeListener(RegistryChangeListener listener)
     {
         synchronized (_listeners)
@@ -119,11 +124,9 @@ public class ConnectionRegistry implemen
         }
     }
 
+    @Override
     public List<AMQConnectionModel> getConnections()
     {
-        synchronized (this)
-        {
-            return new ArrayList<AMQConnectionModel>(_registry);
-        }
+            return new ArrayList<>(_registry);
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org