You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/11/06 15:28:30 UTC

svn commit: r1637118 - /qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java

Author: kwall
Date: Thu Nov  6 14:28:30 2014
New Revision: 1637118

URL: http://svn.apache.org/r1637118
Log:
QPID-6125: [Java Broker] Partial revert so that closeConnection will once again await for an already closing connection to become closed

Modified:
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1637118&r1=1637117&r2=1637118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Thu Nov  6 14:28:30 2014
@@ -103,6 +103,7 @@ public class AMQProtocolEngine implement
     private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
     public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
     public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
+    private static final long AWAIT_CLOSED_TIMEOUT = 60000;
     private final AmqpPort<?> _port;
     private final long _creationTime;
 
@@ -765,7 +766,6 @@ public class AMQProtocolEngine implement
         }
     }
 
-    /** This must be called when the session is _closed in order to free up any resources managed by the session. */
     public void closeSession()
     {
 
@@ -813,34 +813,7 @@ public class AMQProtocolEngine implement
             }
             else
             {
-                synchronized(this)
-                {
-
-                    boolean lockHeld = _receivedLock.isHeldByCurrentThread();
-
-                    while(!_closed)
-                    {
-                        try
-                        {
-                            if(lockHeld)
-                            {
-                                _receivedLock.unlock();
-                            }
-                            wait(1000);
-                        }
-                        catch (InterruptedException e)
-                        {
-                            // do nothing
-                        }
-                        finally
-                        {
-                            if(lockHeld)
-                            {
-                                _receivedLock.lock();
-                            }
-                        }
-                    }
-                }
+                awaitClosed();
             }
         }
         else
@@ -858,6 +831,44 @@ public class AMQProtocolEngine implement
         }
     }
 
+    private void awaitClosed()
+    {
+        synchronized(this)
+        {
+            final boolean lockHeld = _receivedLock.isHeldByCurrentThread();
+            final long endTime = System.currentTimeMillis() + AWAIT_CLOSED_TIMEOUT;
+
+            while(!_closed && endTime > System.currentTimeMillis())
+            {
+                try
+                {
+                    if(lockHeld)
+                    {
+                        _receivedLock.unlock();
+                    }
+                    wait(1000);
+                }
+                catch (InterruptedException e)
+                {
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+                finally
+                {
+                    if(lockHeld)
+                    {
+                        _receivedLock.lock();
+                    }
+                }
+            }
+
+            if (!_closed)
+            {
+                throw new ConnectionScopedRuntimeException("Connection " + this + " failed to become closed within " + AWAIT_CLOSED_TIMEOUT + "ms.");
+            }
+        }
+    }
+
     private void closeConnection(int channelId, AMQConnectionException e)
     {
 
@@ -901,8 +912,10 @@ public class AMQProtocolEngine implement
                 }
             }
         }
-
-
+        else
+        {
+            awaitClosed();
+        }
     }
 
     public void closeProtocolSession()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org