You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/09/14 14:43:16 UTC
svn commit: r575663 - in
/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client:
AMQConnection.java AMQSession.java
Author: rgodfrey
Date: Fri Sep 14 05:43:13 2007
New Revision: 575663
URL: http://svn.apache.org/viewvc?rev=575663&view=rev
Log:
QPID-600 : Deadlock on connection.close
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=575663&r1=575662&r2=575663&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Sep 14 05:43:13 2007
@@ -69,11 +69,7 @@
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -91,6 +87,8 @@
*/
private final Object _failoverMutex = new Object();
+ private final Object _sessionCreationLock = new Object();
+
/**
* A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
* and we must prevent the client from opening too many. Zero means unlimited.
@@ -503,6 +501,8 @@
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException
{
+ synchronized(_sessionCreationLock)
+ {
checkNotClosed();
if (channelLimitReached())
@@ -566,6 +566,7 @@
return session;
}
}, this).execute();
+ }
}
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
@@ -754,44 +755,63 @@
public void close(long timeout) throws JMSException
{
- synchronized (getFailoverMutex())
+ close(new ArrayList<AMQSession>(_sessions.values()),timeout);
+ }
+
+ public void close(List<AMQSession> sessions, long timeout) throws JMSException
+ {
+ synchronized(_sessionCreationLock)
{
- if (!_closed.getAndSet(true))
+ if(!sessions.isEmpty())
{
- try
+ AMQSession session = sessions.remove(0);
+ synchronized(session.getMessageDeliveryLock())
{
- long startCloseTime = System.currentTimeMillis();
+ close(sessions, timeout);
+ }
+ }
+ else
+ {
+ synchronized (getFailoverMutex())
+ {
+ if (!_closed.getAndSet(true))
+ {
+ try
+ {
+ long startCloseTime = System.currentTimeMillis();
- _taskPool.shutdown();
- closeAllSessions(null, timeout, startCloseTime);
+ _taskPool.shutdown();
+ closeAllSessions(null, timeout, startCloseTime);
- if (!_taskPool.isTerminated())
- {
- try
+ if (!_taskPool.isTerminated())
{
- // adjust timeout
- long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+ try
+ {
+ // adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
- _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- _logger.info("Interrupted while shutting down connection thread pool.");
+ _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interrupted while shutting down connection thread pool.");
+ }
}
- }
- // adjust timeout
- timeout = adjustTimeout(timeout, startCloseTime);
+ // adjust timeout
+ timeout = adjustTimeout(timeout, startCloseTime);
- _protocolHandler.closeConnection(timeout);
+ _protocolHandler.closeConnection(timeout);
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
}
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error closing connection: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
+ }
}
}
}
Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=575663&r1=575662&r2=575663&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Sep 14 05:43:13 2007
@@ -513,13 +513,16 @@
+ Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
+
+
synchronized(_messageDeliveryLock)
{
-
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session.
synchronized (_connection.getFailoverMutex())
{
+
+
// Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
{
@@ -572,6 +575,7 @@
*/
public void closed(Throwable e) throws JMSException
{
+
synchronized(_messageDeliveryLock)
{
synchronized (_connection.getFailoverMutex())
@@ -2543,6 +2547,11 @@
throw new AMQException("Fail-over interrupted suspend/unsuspend channel.", e);
}
}
+ }
+
+ Object getMessageDeliveryLock()
+ {
+ return _messageDeliveryLock;
}
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */