You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/09 16:44:57 UTC
svn commit: r1768998 - in /qpid/java/branches/remove-queue-runner:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/main/java/org/apache/qpid/server/virtualhost/ broker-c...
Author: rgodfrey
Date: Wed Nov 9 16:44:57 2016
New Revision: 1768998
URL: http://svn.apache.org/viewvc?rev=1768998&view=rev
Log:
Bug fixing and remove redundant code
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Wed Nov 9 16:44:57 2016
@@ -92,10 +92,22 @@ public abstract class AbstractConsumerTa
protected final void setNotifyWorkDesired(final boolean desired)
{
+ // TODO - remove once queue is smarter
+ if(desired && !_notifyWorkDesired)
+ {
+ updateState(State.SUSPENDED, State.ACTIVE);
+
+ notifyWork();
+ }
+ else if(!desired && _notifyWorkDesired)
+ {
+ updateState(State.ACTIVE, State.SUSPENDED);
+ }
+
_notifyWorkDesired = desired;
}
- protected final boolean isNotifyWorkDesired()
+ public final boolean isNotifyWorkDesired()
{
return _notifyWorkDesired;
}
@@ -107,6 +119,7 @@ public abstract class AbstractConsumerTa
{
return false;
}
+
// TODO - if not closed
return sendNextMessage();
}
@@ -162,17 +175,9 @@ public abstract class AbstractConsumerTa
@Override
public final boolean isSuspended()
{
- return isFlowSuspended();
- }
-
- @Override
- public boolean hasCredit()
- {
- return !isFlowSuspended();
+ return !isNotifyWorkDesired();
}
- protected abstract boolean isFlowSuspended();
-
public final State getState()
{
return _state.get();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Wed Nov 9 16:44:57 2016
@@ -39,14 +39,14 @@ public interface ConsumerTarget
String getTargetAddress();
- boolean hasCredit();
-
boolean isMultiQueue();
void notifyWork();
void updateNotifyWorkDesired();
+ boolean isNotifyWorkDesired();
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Nov 9 16:44:57 2016
@@ -2057,7 +2057,7 @@ public abstract class AbstractQueue<X ex
final QueueConsumer consumer = node.getConsumer();
if(consumer.getPriority() > sub.getPriority())
{
- if(getNextAvailableEntry(consumer) != null && consumer.hasCredit())
+ if(getNextAvailableEntry(consumer) != null && consumer.isNotifyWorkDesired())
{
final ConsumerTarget target = consumer.getTarget();
// if the higher priority consumer later becomes suspended we should try notifying this
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Wed Nov 9 16:44:57 2016
@@ -48,7 +48,7 @@ public interface QueueConsumer<X extends
void awaitCredit(QueueEntry entry);
- boolean hasCredit();
+ boolean isNotifyWorkDesired();
void notifyWork();
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Wed Nov 9 16:44:57 2016
@@ -255,9 +255,9 @@ class QueueConsumerImpl
}
@Override
- public boolean hasCredit()
+ public boolean isNotifyWorkDesired()
{
- return _target.hasCredit();
+ return _target.isNotifyWorkDesired();
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Wed Nov 9 16:44:57 2016
@@ -221,7 +221,7 @@ public abstract class AbstractSystemMess
@Override
public boolean isSuspended()
{
- return false;
+ return !isActive();
}
@Override
@@ -251,7 +251,7 @@ public abstract class AbstractSystemMess
@Override
public boolean isActive()
{
- return false;
+ return _target.isNotifyWorkDesired();
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Wed Nov 9 16:44:57 2016
@@ -180,12 +180,6 @@ public class TestConsumerTarget implemen
}
@Override
- public boolean hasCredit()
- {
- return _state == State.ACTIVE;
- }
-
- @Override
public void consumerAdded(final ConsumerImpl sub)
{
_consumer = sub;
@@ -277,6 +271,12 @@ public class TestConsumerTarget implemen
}
@Override
+ public boolean isNotifyWorkDesired()
+ {
+ return _state == State.ACTIVE;
+ }
+
+ @Override
public void updateNotifyWorkDesired()
{
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Wed Nov 9 16:44:57 2016
@@ -135,13 +135,6 @@ public class ConsumerTarget_0_10 extends
}
@Override
- public boolean isFlowSuspended()
- {
- return getState()!=State.ACTIVE || _session.isClosing() || _session.getAMQPConnection().isConnectionStopped();
- // TODO check for Session suspension
- }
-
- @Override
public void updateNotifyWorkDesired()
{
final AMQPConnection_0_10 amqpConnection = _session.getAMQPConnection();
@@ -408,9 +401,9 @@ public class ConsumerTarget_0_10 extends
public void flushCreditState(boolean strict)
{
- if(strict || !isFlowSuspended() || _deferredMessageCredit >= 200
- || !(_creditManager instanceof WindowCreditManager)
- || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+ if(strict || !isSuspended() || _deferredMessageCredit >= 200
+ || !(_creditManager instanceof WindowCreditManager)
+ || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
{
restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Wed Nov 9 16:44:57 2016
@@ -361,6 +361,7 @@ public class ServerSessionDelegate exten
options,
priority));
}
+ target.updateNotifyWorkDesired();
}
catch (Queue.ExistingExclusiveConsumer existing)
{
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Nov 9 16:44:57 2016
@@ -338,6 +338,7 @@ public class AMQChannel
}
ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
+ target.updateNotifyWorkDesired();
target.sendNextMessage();
sub.close();
return getDeliveryMethod.hasDeliveredMessage();
@@ -850,6 +851,7 @@ public class AMQChannel
_consumers.add(modelConsumer);
}
}
+ target.updateNotifyWorkDesired();
}
catch (AccessControlException
| MessageSource.ExistingExclusiveConsumer
@@ -1371,7 +1373,12 @@ public class AMQChannel
{
_logChannelFlowMessages = false;
}
+ boolean hasCredit = _creditManager.hasCredit();
_creditManager.setCreditLimits(prefetchSize, prefetchCount);
+ if(hasCredit != _creditManager.hasCredit())
+ {
+ updateAllConsumerNotifyWorkDesired();
+ }
}
public MessageStore getMessageStore()
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Wed Nov 9 16:44:57 2016
@@ -357,13 +357,6 @@ public abstract class ConsumerTarget_0_8
}
@Override
- public boolean isFlowSuspended()
- {
- // (credit || tcp backpressure) || (channel flow || channel closing || connection closing) || connection being stopped by vh
- return getState()!=State.ACTIVE || _channel.isSuspended() || _channel.getAMQPConnection().isConnectionStopped();
- }
-
- @Override
public void updateNotifyWorkDesired()
{
final AMQPConnection_0_8<?> amqpConnection = _channel.getAMQPConnection();
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Wed Nov 9 16:44:57 2016
@@ -85,12 +85,6 @@ class ConsumerTarget_1_0 extends Abstrac
}
@Override
- public boolean isFlowSuspended()
- {
- return _link.getSession().getAMQPConnection().isConnectionStopped() || getState() != State.ACTIVE;
- }
-
- @Override
public void updateNotifyWorkDesired()
{
final AMQPConnection<?> amqpConnection =
@@ -329,7 +323,7 @@ class ConsumerTarget_1_0 extends Abstrac
updateNotifyWorkDesired();
ProtocolEngine protocolEngine = getSession().getConnection();
- if (isFlowSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
+ if (isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
{
updateState(State.SUSPENDED, State.ACTIVE);
_transactionId = _link.getTransactionId();
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1768998&r1=1768997&r2=1768998&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Wed Nov 9 16:44:57 2016
@@ -378,6 +378,7 @@ public class SendingLink_1_0 implements
name,
options,
getEndpoint().getPriority());
+ _target.updateNotifyWorkDesired();
}
catch (MessageSource.ExistingExclusiveConsumer e)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org