You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/09/16 12:07:45 UTC

svn commit: r815705 - in /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server: connection/ConnectionRegistry.java protocol/AMQProtocolEngine.java protocol/AMQProtocolSession.java virtualhost/VirtualHost.java

Author: aidan
Date: Wed Sep 16 10:07:44 2009
New Revision: 815705

URL: http://svn.apache.org/viewvc?rev=815705&view=rev
Log:
QPID-2106: Don't close connections if the broker has asked it to close and
there's still stuff to process. Let the cleanup thread do that so that publishes
which are denied don't result in instant connection death.


Modified:
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Wed Sep 16 10:07:44 2009
@@ -44,6 +44,14 @@
     {
 
     }
+    
+    public void expireClosedChannels()
+    {
+        for (AMQProtocolSession connection : _registry)
+        {
+            connection.closeIfLingeringClosedChannels();
+        }
+    }
 
     /** Close all of the currently open connections. */
     public void close() throws AMQException

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Wed Sep 16 10:07:44 2009
@@ -29,6 +29,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
@@ -135,7 +137,7 @@
     private FieldTable _clientProperties;
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
 
-    private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
+    private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
     private ProtocolOutputConverter _protocolOutputConverter;
     private Principal _authorizedID;
     private MethodDispatcher _dispatcher;
@@ -293,12 +295,8 @@
                 }
                 else
                 {
-                    if (_logger.isInfoEnabled())
-                    {
-                        _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
-                    }
-
-                    closeProtocolSession();
+                    // The channel has been told to close, we don't process any more frames until
+                    // it's closed. 
                     return;
                 }
             }
@@ -513,7 +511,7 @@
 
     public boolean channelAwaitingClosure(int channelId)
     {
-        return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
+        return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
     }
 
     public void addChannel(AMQChannel channel) throws AMQException
@@ -525,7 +523,7 @@
 
         final int channelId = channel.getChannelId();
 
-        if (_closingChannelsList.contains(channelId))
+        if (_closingChannelsList.containsKey(channelId))
         {
             throw new AMQException("Session is marked awaiting channel close");
         }
@@ -632,7 +630,7 @@
 
     private void markChannelAwaitingCloseOk(int channelId)
     {
-        _closingChannelsList.add(channelId);
+        _closingChannelsList.put(channelId, System.currentTimeMillis());
     }
 
     /**
@@ -1023,7 +1021,19 @@
     {
         return (_clientVersion == null) ? null : _clientVersion.toString();
     }
-    
-    
+
+    @Override
+    public void closeIfLingeringClosedChannels()
+    {
+        for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
+        {
+            if (id.getValue() + 30000 > System.currentTimeMillis())
+            {
+                // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
+                _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
+                closeProtocolSession();
+            }
+        }
+    }
     
 }

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Wed Sep 16 10:07:44 2009
@@ -225,5 +225,7 @@
     void commitTransactions(AMQChannel channel) throws AMQException;
 
     List<AMQChannel> getChannels();
+
+    void closeIfLingeringClosedChannels();
     
 }

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Wed Sep 16 10:07:44 2009
@@ -267,6 +267,14 @@
             _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
                                                    period / 2,
                                                    period);
+            
+            class ForceChannelClosuresTask extends TimerTask
+            {
+                public void run()
+                {
+                    _connectionRegistry.expireClosedChannels();
+                }
+            }
         }
     }
     



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org