You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/09 16:44:57 UTC

svn commit: r1768998 - in /qpid/java/branches/remove-queue-runner: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apache/qpid/server/virtualhost/ broker-c...

Author: rgodfrey
Date: Wed Nov  9 16:44:57 2016
New Revision: 1768998

URL: http://svn.apache.org/viewvc?rev=1768998&view=rev
Log:
Bug fixing and remove redundant code

Modified:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Wed Nov  9 16:44:57 2016
@@ -92,10 +92,22 @@ public abstract class AbstractConsumerTa
 
     protected final void setNotifyWorkDesired(final boolean desired)
     {
+        // TODO - remove once queue is smarter
+        if(desired && !_notifyWorkDesired)
+        {
+            updateState(State.SUSPENDED, State.ACTIVE);
+
+            notifyWork();
+        }
+        else if(!desired && _notifyWorkDesired)
+        {
+            updateState(State.ACTIVE, State.SUSPENDED);
+        }
+
         _notifyWorkDesired = desired;
     }
 
-    protected final boolean isNotifyWorkDesired()
+    public final boolean isNotifyWorkDesired()
     {
         return _notifyWorkDesired;
     }
@@ -107,6 +119,7 @@ public abstract class AbstractConsumerTa
         {
             return false;
         }
+
         // TODO - if not closed
         return sendNextMessage();
     }
@@ -162,17 +175,9 @@ public abstract class AbstractConsumerTa
     @Override
     public final boolean isSuspended()
     {
-        return isFlowSuspended();
-    }
-
-    @Override
-    public boolean hasCredit()
-    {
-        return !isFlowSuspended();
+        return !isNotifyWorkDesired();
     }
 
-    protected abstract boolean isFlowSuspended();
-
     public final State getState()
     {
         return _state.get();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Wed Nov  9 16:44:57 2016
@@ -39,14 +39,14 @@ public interface ConsumerTarget
 
     String getTargetAddress();
 
-    boolean hasCredit();
-
     boolean isMultiQueue();
 
     void notifyWork();
 
     void updateNotifyWorkDesired();
 
+    boolean isNotifyWorkDesired();
+
     enum State
     {
         ACTIVE, SUSPENDED, CLOSED

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Nov  9 16:44:57 2016
@@ -2057,7 +2057,7 @@ public abstract class AbstractQueue<X ex
             final QueueConsumer consumer = node.getConsumer();
             if(consumer.getPriority() > sub.getPriority())
             {
-                if(getNextAvailableEntry(consumer) != null && consumer.hasCredit())
+                if(getNextAvailableEntry(consumer) != null && consumer.isNotifyWorkDesired())
                 {
                     final ConsumerTarget target = consumer.getTarget();
                     // if the higher priority consumer later becomes suspended we should try notifying this

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Wed Nov  9 16:44:57 2016
@@ -48,7 +48,7 @@ public interface QueueConsumer<X extends
 
     void awaitCredit(QueueEntry entry);
 
-    boolean hasCredit();
+    boolean isNotifyWorkDesired();
 
     void notifyWork();
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Wed Nov  9 16:44:57 2016
@@ -255,9 +255,9 @@ class QueueConsumerImpl
     }
 
     @Override
-    public boolean hasCredit()
+    public boolean isNotifyWorkDesired()
     {
-        return _target.hasCredit();
+        return _target.isNotifyWorkDesired();
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Wed Nov  9 16:44:57 2016
@@ -221,7 +221,7 @@ public abstract class AbstractSystemMess
         @Override
         public boolean isSuspended()
         {
-            return false;
+            return !isActive();
         }
 
         @Override
@@ -251,7 +251,7 @@ public abstract class AbstractSystemMess
         @Override
         public boolean isActive()
         {
-            return false;
+            return _target.isNotifyWorkDesired();
         }
 
         @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Wed Nov  9 16:44:57 2016
@@ -180,12 +180,6 @@ public class TestConsumerTarget implemen
     }
 
     @Override
-    public boolean hasCredit()
-    {
-        return _state == State.ACTIVE;
-    }
-
-    @Override
     public void consumerAdded(final ConsumerImpl sub)
     {
         _consumer = sub;
@@ -277,6 +271,12 @@ public class TestConsumerTarget implemen
     }
 
     @Override
+    public boolean isNotifyWorkDesired()
+    {
+        return _state == State.ACTIVE;
+    }
+
+    @Override
     public void updateNotifyWorkDesired()
     {
 

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Wed Nov  9 16:44:57 2016
@@ -135,13 +135,6 @@ public class ConsumerTarget_0_10 extends
     }
 
     @Override
-    public boolean isFlowSuspended()
-    {
-        return getState()!=State.ACTIVE || _session.isClosing() || _session.getAMQPConnection().isConnectionStopped();
-        // TODO check for Session suspension
-    }
-
-    @Override
     public void updateNotifyWorkDesired()
     {
         final AMQPConnection_0_10 amqpConnection = _session.getAMQPConnection();
@@ -408,9 +401,9 @@ public class ConsumerTarget_0_10 extends
 
     public void flushCreditState(boolean strict)
     {
-        if(strict || !isFlowSuspended() || _deferredMessageCredit >= 200
-          || !(_creditManager instanceof WindowCreditManager)
-          || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+        if(strict || !isSuspended() || _deferredMessageCredit >= 200
+           || !(_creditManager instanceof WindowCreditManager)
+           || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
         {
             restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
 

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Wed Nov  9 16:44:57 2016
@@ -361,6 +361,7 @@ public class ServerSessionDelegate exten
                                                        options,
                                                        priority));
                         }
+                        target.updateNotifyWorkDesired();
                     }
                     catch (Queue.ExistingExclusiveConsumer existing)
                     {

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Nov  9 16:44:57 2016
@@ -338,6 +338,7 @@ public class AMQChannel
         }
 
         ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
+        target.updateNotifyWorkDesired();
         target.sendNextMessage();
         sub.close();
         return getDeliveryMethod.hasDeliveredMessage();
@@ -850,6 +851,7 @@ public class AMQChannel
                     _consumers.add(modelConsumer);
                 }
             }
+            target.updateNotifyWorkDesired();
         }
         catch (AccessControlException
                 | MessageSource.ExistingExclusiveConsumer
@@ -1371,7 +1373,12 @@ public class AMQChannel
         {
             _logChannelFlowMessages = false;
         }
+        boolean hasCredit = _creditManager.hasCredit();
         _creditManager.setCreditLimits(prefetchSize, prefetchCount);
+        if(hasCredit != _creditManager.hasCredit())
+        {
+            updateAllConsumerNotifyWorkDesired();
+        }
     }
 
     public MessageStore getMessageStore()

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Wed Nov  9 16:44:57 2016
@@ -357,13 +357,6 @@ public abstract class ConsumerTarget_0_8
     }
 
     @Override
-    public boolean isFlowSuspended()
-    {
-        // (credit || tcp backpressure) || (channel flow || channel closing || connection closing) || connection being stopped by vh
-        return getState()!=State.ACTIVE || _channel.isSuspended() || _channel.getAMQPConnection().isConnectionStopped();
-    }
-
-    @Override
     public void updateNotifyWorkDesired()
     {
         final AMQPConnection_0_8<?> amqpConnection = _channel.getAMQPConnection();

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Wed Nov  9 16:44:57 2016
@@ -85,12 +85,6 @@ class ConsumerTarget_1_0 extends Abstrac
     }
 
     @Override
-    public boolean isFlowSuspended()
-    {
-        return _link.getSession().getAMQPConnection().isConnectionStopped() || getState() != State.ACTIVE;
-    }
-
-    @Override
     public void updateNotifyWorkDesired()
     {
         final AMQPConnection<?> amqpConnection =
@@ -329,7 +323,7 @@ class ConsumerTarget_1_0 extends Abstrac
         updateNotifyWorkDesired();
 
         ProtocolEngine protocolEngine = getSession().getConnection();
-        if (isFlowSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
+        if (isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
         {
             updateState(State.SUSPENDED, State.ACTIVE);
             _transactionId = _link.getTransactionId();

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Wed Nov  9 16:44:57 2016
@@ -378,6 +378,7 @@ public class SendingLink_1_0 implements
                                                name,
                                                options,
                                                getEndpoint().getPriority());
+                _target.updateNotifyWorkDesired();
             }
             catch (MessageSource.ExistingExclusiveConsumer e)
             {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org