You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/11/06 15:28:30 UTC
svn commit: r1637118 -
/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
Author: kwall
Date: Thu Nov 6 14:28:30 2014
New Revision: 1637118
URL: http://svn.apache.org/r1637118
Log:
QPID-6125: [Java Broker] Partial revert so that closeConnection will once again await for an already closing connection to become closed
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1637118&r1=1637117&r2=1637118&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Thu Nov 6 14:28:30 2014
@@ -103,6 +103,7 @@ public class AMQProtocolEngine implement
private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
+ private static final long AWAIT_CLOSED_TIMEOUT = 60000;
private final AmqpPort<?> _port;
private final long _creationTime;
@@ -765,7 +766,6 @@ public class AMQProtocolEngine implement
}
}
- /** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession()
{
@@ -813,34 +813,7 @@ public class AMQProtocolEngine implement
}
else
{
- synchronized(this)
- {
-
- boolean lockHeld = _receivedLock.isHeldByCurrentThread();
-
- while(!_closed)
- {
- try
- {
- if(lockHeld)
- {
- _receivedLock.unlock();
- }
- wait(1000);
- }
- catch (InterruptedException e)
- {
- // do nothing
- }
- finally
- {
- if(lockHeld)
- {
- _receivedLock.lock();
- }
- }
- }
- }
+ awaitClosed();
}
}
else
@@ -858,6 +831,44 @@ public class AMQProtocolEngine implement
}
}
+ private void awaitClosed()
+ {
+ synchronized(this)
+ {
+ final boolean lockHeld = _receivedLock.isHeldByCurrentThread();
+ final long endTime = System.currentTimeMillis() + AWAIT_CLOSED_TIMEOUT;
+
+ while(!_closed && endTime > System.currentTimeMillis())
+ {
+ try
+ {
+ if(lockHeld)
+ {
+ _receivedLock.unlock();
+ }
+ wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ finally
+ {
+ if(lockHeld)
+ {
+ _receivedLock.lock();
+ }
+ }
+ }
+
+ if (!_closed)
+ {
+ throw new ConnectionScopedRuntimeException("Connection " + this + " failed to become closed within " + AWAIT_CLOSED_TIMEOUT + "ms.");
+ }
+ }
+ }
+
private void closeConnection(int channelId, AMQConnectionException e)
{
@@ -901,8 +912,10 @@ public class AMQProtocolEngine implement
}
}
}
-
-
+ else
+ {
+ awaitClosed();
+ }
}
public void closeProtocolSession()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org