You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/09/24 12:22:39 UTC
svn commit: r578745 - in /incubator/qpid/branches/M2.1: ./
java/client/src/main/java/org/apache/qpid/client/
java/client/src/main/java/org/apache/qpid/client/util/
java/client/src/test/java/org/apache/qpid/test/unit/client/connection/
Author: rupertlssmith
Date: Mon Sep 24 03:22:32 2007
New Revision: 578745
URL: http://svn.apache.org/viewvc?rev=578745&view=rev
Log:
Merged revisions 575663-575687,575689-576860,576862-577192,577194-577315,577317-577659,577661-578047,578049-578060,578062-578604 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r575663 | rgodfrey | 2007-09-14 13:43:13 +0100 (Fri, 14 Sep 2007) | 1 line
QPID-600 : Deadlock on connection.close
........
r577931 | rgreig | 2007-09-20 22:26:37 +0100 (Thu, 20 Sep 2007) | 1 line
Adding timeouts to two wait() calls to prevent hanging
........
r578258 | rgreig | 2007-09-21 21:31:18 +0100 (Fri, 21 Sep 2007) | 1 line
QPID-607: dispatcher threads now poll so that the can die when the connection is closed.
........
r578475 | rgreig | 2007-09-22 20:01:59 +0100 (Sat, 22 Sep 2007) | 1 line
QPID-608 Fix the test by adding in creation of the VM broker
........
r578509 | rgreig | 2007-09-22 23:05:30 +0100 (Sat, 22 Sep 2007) | 1 line
QPID-609 : dispatcher thread was being restarted by the code that closed the consumer due to the receipt of a basic.cancel frame. Move the dispatcher shutdown to the end of the consumer close process. Also rename the dispatcher _closed field since it clashes with a field in the container class.
........
r578604 | rgreig | 2007-09-23 22:29:33 +0100 (Sun, 23 Sep 2007) | 4 lines
QPID-589: avoid the deadlock between the session close and the BasicCancelOkHandler by
not sending a BasicCancel when the session is being closed.
........
Modified:
incubator/qpid/branches/M2.1/ (props changed)
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
Propchange: incubator/qpid/branches/M2.1/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Mon Sep 24 03:22:32 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2:1-568187,574873,574876,575253,575688,576861,577193,577316,577660,578048,578061
+/incubator/qpid/branches/M2:1-568187,574873,574876,575253,575663-578604
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=578745&r1=578744&r2=578745&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Sep 24 03:22:32 2007
@@ -70,11 +70,7 @@
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -92,6 +88,8 @@
*/
private final Object _failoverMutex = new Object();
+ private final Object _sessionCreationLock = new Object();
+
/**
* A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
* and we must prevent the client from opening too many. Zero means unlimited.
@@ -509,6 +507,8 @@
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException
{
+ synchronized(_sessionCreationLock)
+ {
checkNotClosed();
if (channelLimitReached())
@@ -572,6 +572,7 @@
return session;
}
}, this).execute();
+ }
}
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
@@ -760,44 +761,63 @@
public void close(long timeout) throws JMSException
{
- synchronized (getFailoverMutex())
+ close(new ArrayList<AMQSession>(_sessions.values()),timeout);
+ }
+
+ public void close(List<AMQSession> sessions, long timeout) throws JMSException
+ {
+ synchronized(_sessionCreationLock)
{
- if (!_closed.getAndSet(true))
+ if(!sessions.isEmpty())
{
- try
+ AMQSession session = sessions.remove(0);
+ synchronized(session.getMessageDeliveryLock())
{
- long startCloseTime = System.currentTimeMillis();
+ close(sessions, timeout);
+ }
+ }
+ else
+ {
+ synchronized (getFailoverMutex())
+ {
+ if (!_closed.getAndSet(true))
+ {
+ try
+ {
+ long startCloseTime = System.currentTimeMillis();
- _taskPool.shutdown();
- closeAllSessions(null, timeout, startCloseTime);
+ _taskPool.shutdown();
+ closeAllSessions(null, timeout, startCloseTime);
- if (!_taskPool.isTerminated())
- {
- try
+ if (!_taskPool.isTerminated())
{
- // adjust timeout
- long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+ try
+ {
+ // adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
- _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- _logger.info("Interrupted while shutting down connection thread pool.");
+ _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interrupted while shutting down connection thread pool.");
+ }
}
- }
- // adjust timeout
- timeout = adjustTimeout(timeout, startCloseTime);
+ // adjust timeout
+ timeout = adjustTimeout(timeout, startCloseTime);
- _protocolHandler.closeConnection(timeout);
+ _protocolHandler.closeConnection(timeout);
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
}
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error closing connection: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
+ }
}
}
}
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=578745&r1=578744&r2=578745&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Sep 24 03:22:32 2007
@@ -106,6 +106,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -519,7 +520,6 @@
synchronized (_messageDeliveryLock)
{
-
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session.
synchronized (_connection.getFailoverMutex())
@@ -666,11 +666,10 @@
else
{
_logger.info("Dispatcher is null so created stopped dispatcher");
-
startDistpatcherIfNecessary(true);
}
- _dispatcher.rejectPending(consumer);
+ _dispatcher.rejectPending(consumer);
}
else
{
@@ -1954,11 +1953,6 @@
*/
private void closeConsumers(Throwable error) throws JMSException
{
- if (_dispatcher != null)
- {
- _dispatcher.close();
- _dispatcher = null;
- }
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
@@ -1973,10 +1967,15 @@
}
else
{
- con.close();
+ con.close(false);
}
}
// at this point the _consumers map will be empty
+ if (_dispatcher != null)
+ {
+ _dispatcher.close();
+ _dispatcher = null;
+ }
}
/**
@@ -2557,12 +2556,17 @@
}
}
+ Object getMessageDeliveryLock()
+ {
+ return _messageDeliveryLock;
+ }
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
/** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
- private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final AtomicBoolean _dispatcherClosed = new AtomicBoolean(false);
private final Object _lock = new Object();
private final AtomicLong _rollbackMark = new AtomicLong(-1);
@@ -2578,7 +2582,7 @@
public void close()
{
- _closed.set(true);
+ _dispatcherClosed.set(true);
interrupt();
// fixme awaitTermination
@@ -2673,30 +2677,33 @@
try
{
- while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null))
+ while (!_dispatcherClosed.get())
{
- synchronized (_lock)
+ message = (UnprocessedMessage) _queue.poll(1000, TimeUnit.MILLISECONDS);
+ if (message != null)
{
-
- while (connectionStopped())
+ synchronized (_lock)
{
- _lock.wait(2000);
- }
- if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
- {
- rejectMessage(message, true);
- }
- else
- {
- synchronized (_messageDeliveryLock)
+ while (connectionStopped())
{
- dispatchMessage(message);
+ _lock.wait(2000);
}
- }
- }
+ if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
+ {
+ rejectMessage(message, true);
+ }
+ else
+ {
+ synchronized (_messageDeliveryLock)
+ {
+ dispatchMessage(message);
+ }
+ }
+ }
+ }
}
}
catch (InterruptedException e)
@@ -2760,7 +2767,7 @@
}
}
// Don't reject if we're already closing
- if (!_closed.get())
+ if (!_dispatcherClosed.get())
{
rejectMessage(message, true);
}
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=578745&r1=578744&r2=578745&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Mon Sep 24 03:22:32 2007
@@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
@@ -69,10 +70,10 @@
_listener = listener;
}
- public Object take() throws InterruptedException
+ public Object poll(long time, TimeUnit unit) throws InterruptedException
{
- Object o = _queue.take();
- if (_listener != null)
+ Object o = _queue.poll(time, unit);
+ if (o != null && _listener != null)
{
synchronized (_listener)
{
Modified: incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?rev=578745&r1=578744&r2=578745&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java Mon Sep 24 03:22:32 2007
@@ -47,12 +47,12 @@
protected void setUp() throws Exception
{
super.setUp();
-// TransportConnection.createVMBroker(1);
+ TransportConnection.createVMBroker(1);
}
protected void tearDown() throws Exception
{
-// TransportConnection.killVMBroker(1);
+ TransportConnection.killVMBroker(1);
}
public void testSimpleConnection()