You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/29 15:04:37 UTC
svn commit: r661325 - in /incubator/qpid/branches/broker-queue-refactor/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/queue/
broker/src/main/java/org/apache/qpid/server/subscription/
systests/src/main/ja...
Author: rgodfrey
Date: Thu May 29 06:04:37 2008
New Revision: 661325
URL: http://svn.apache.org/viewvc?rev=661325&view=rev
Log:
Made subscription sendLock straight lock, re-enabled per subscription async delivery
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu May 29 06:04:37 2008
@@ -831,7 +831,7 @@
// may need to deliver queued messages
for (Subscription s : _tag2SubscriptionMap.values())
{
- s.getQueue().deliverAsync();
+ s.getQueue().deliverAsync(s);
}
}
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu May 29 06:04:37 2008
@@ -278,7 +278,7 @@
}
- deliverAsync();
+ deliverAsync(subscription);
}
@@ -456,8 +456,7 @@
private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
throws AMQException
{
- // the send lock is a read/write lock that prevents the subscription from changing status while we are in this
- // block
+
sub.getSendLock();
try
{
@@ -772,7 +771,7 @@
_activeSubscriberCount.incrementAndGet();
}
- deliverAsync();
+ deliverAsync(sub);
}
}
@@ -1662,7 +1661,7 @@
public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
- deliverAsync();
+ deliverAsync(_sub);
}
}
}
\ No newline at end of file
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Thu May 29 06:04:37 2008
@@ -73,7 +73,7 @@
boolean wouldSuspend(QueueEntry msg);
- Object getSendLock();
+ void getSendLock();
void releaseSendLock();
void resend(final QueueEntry entry) throws AMQException;
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Thu May 29 06:04:37 2008
@@ -22,9 +22,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -66,9 +65,6 @@
private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final Lock _stateChangeLock;
- private final Lock _stateChangeExclusiveLock;
-
-
static final class BrowserSubscription extends SubscriptionImpl
{
@@ -287,9 +283,9 @@
_deliveryMethod = deliveryMethod;
_recordMethod = recordMethod;
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- _stateChangeLock = readWriteLock.readLock();
- _stateChangeExclusiveLock = readWriteLock.writeLock();
+
+ _stateChangeLock = new ReentrantLock();
+
if (arguments != null)
{
@@ -445,7 +441,7 @@
boolean closed = false;
State state = getState();
- _stateChangeExclusiveLock.lock();
+ _stateChangeLock.lock();
try
{
while(!closed && state != State.CLOSED)
@@ -464,7 +460,7 @@
}
finally
{
- _stateChangeExclusiveLock.unlock();
+ _stateChangeLock.unlock();
}
@@ -495,10 +491,9 @@
return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage());
}
- public Object getSendLock()
+ public void getSendLock()
{
_stateChangeLock.lock();
- return _deleted;
}
public void releaseSendLock()
Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Thu May 29 06:04:37 2008
@@ -86,7 +86,7 @@
//no-op
}
- public Object getSendLock()
+ public void getSendLock()
{
return new Object();
}