You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/29 15:04:37 UTC

svn commit: r661325 - in /incubator/qpid/branches/broker-queue-refactor/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qpid/server/subscription/ systests/src/main/ja...

Author: rgodfrey
Date: Thu May 29 06:04:37 2008
New Revision: 661325

URL: http://svn.apache.org/viewvc?rev=661325&view=rev
Log:
Made subscription sendLock straight lock, re-enabled per subscription async delivery

Modified:
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu May 29 06:04:37 2008
@@ -831,7 +831,7 @@
                 // may need to deliver queued messages
                 for (Subscription s : _tag2SubscriptionMap.values())
                 {
-                    s.getQueue().deliverAsync();
+                    s.getQueue().deliverAsync(s);
                 }
             }
         }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu May 29 06:04:37 2008
@@ -278,7 +278,7 @@
         }
 
 
-        deliverAsync();
+        deliverAsync(subscription);
 
     }
 
@@ -456,8 +456,7 @@
     private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
             throws AMQException
     {
-        // the send lock is a read/write lock that prevents the subscription from changing status while we are in this
-        // block
+
         sub.getSendLock();
         try
         {
@@ -772,7 +771,7 @@
                 _activeSubscriberCount.incrementAndGet();
 
             }
-            deliverAsync();
+            deliverAsync(sub);
         }
     }
 
@@ -1662,7 +1661,7 @@
         public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
         {
             entry.removeStateChangeListener(this);
-            deliverAsync();
+            deliverAsync(_sub);
         }
     }
 }
\ No newline at end of file

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Thu May 29 06:04:37 2008
@@ -73,7 +73,7 @@
 
     boolean wouldSuspend(QueueEntry msg);
 
-    Object getSendLock();
+    void getSendLock();
     void releaseSendLock();
 
     void resend(final QueueEntry entry) throws AMQException;

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Thu May 29 06:04:37 2008
@@ -22,9 +22,8 @@
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -66,9 +65,6 @@
     
     private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
     private final Lock _stateChangeLock;
-    private final Lock _stateChangeExclusiveLock;
-
-
 
     static final class BrowserSubscription extends SubscriptionImpl
     {
@@ -287,9 +283,9 @@
         _deliveryMethod = deliveryMethod;
         _recordMethod = recordMethod;
 
-        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-        _stateChangeLock = readWriteLock.readLock();
-        _stateChangeExclusiveLock = readWriteLock.writeLock();
+
+        _stateChangeLock = new ReentrantLock();
+
 
         if (arguments != null)
         {
@@ -445,7 +441,7 @@
         boolean closed = false;
         State state = getState();
 
-        _stateChangeExclusiveLock.lock();
+        _stateChangeLock.lock();
         try
         {
             while(!closed && state != State.CLOSED)
@@ -464,7 +460,7 @@
         }
         finally
         {
-            _stateChangeExclusiveLock.unlock();
+            _stateChangeLock.unlock();
         }
 
 
@@ -495,10 +491,9 @@
         return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage());
     }
 
-    public Object getSendLock()
+    public void getSendLock()
     {
         _stateChangeLock.lock();
-        return _deleted;
     }
 
     public void releaseSendLock()

Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Thu May 29 06:04:37 2008
@@ -86,7 +86,7 @@
         //no-op
     }
 
-    public Object getSendLock()
+    public void getSendLock()
     {
         return new Object();
     }