You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/03 16:02:53 UTC
svn commit: r1767916 - in /qpid/java/branches/remove-queue-runner:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/sr...
Author: lquack
Date: Thu Nov 3 16:02:52 2016
New Revision: 1767916
URL: http://svn.apache.org/viewvc?rev=1767916&view=rev
Log:
get rid of flush
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Nov 3 16:02:52 2016
@@ -299,17 +299,27 @@ public abstract class AbstractConsumerTa
@Override
public boolean sendNextMessage()
{
- if(_pullIterator == null || !_pullIterator.hasNext())
+ boolean iteratedCompleteList = false;
+ while (_queue.isEmpty())
{
- _pullIterator = getConsumers().iterator();
- }
- if(_pullIterator.hasNext())
- {
- ConsumerImpl consumer = _pullIterator.next();
+ if (_pullIterator == null || !_pullIterator.hasNext())
+ {
+ if (iteratedCompleteList)
+ {
+ break;
+ }
+ iteratedCompleteList = true;
+
+ _pullIterator = getConsumers().iterator();
+ }
+ if (_pullIterator.hasNext())
+ {
+ ConsumerImpl consumer = _pullIterator.next();
- _waitingOnStateChange.set(true);
+ _waitingOnStateChange.set(true);
- consumer.pullMessage();
+ consumer.pullMessage();
+ }
}
ConsumerMessageInstancePair consumerMessage = _queue.poll();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Thu Nov 3 16:02:52 2016
@@ -81,6 +81,4 @@ public interface ConsumerImpl
boolean isActive();
String getName();
-
- void flush();
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Thu Nov 3 16:02:52 2016
@@ -90,10 +90,6 @@ public interface Queue<X extends Queue<X
@ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l;
- String MAX_ASYNCHRONOUS_DELIVERIES = "queue.maxAsynchronousDeliveries";
- @ManagedContextDefault(name = MAX_ASYNCHRONOUS_DELIVERIES )
- int DEFAULT_MAX_ASYNCHRONOUS_DELIVERIES = 80;
-
String MIME_TYPE_TO_FILE_EXTENSION = "qpid.mimeTypeToFileExtension";
@SuppressWarnings("unused")
@ManagedContextDefault(name = MIME_TYPE_TO_FILE_EXTENSION, description = "A mapping of MIME types to file extensions.")
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Nov 3 16:02:52 2016
@@ -222,7 +222,6 @@ public abstract class AbstractQueue<X ex
Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class));
- private volatile int _maxAsyncDeliveries;
private volatile long _estimatedAverageMessageHeaderSize;
private AtomicInteger _deliveredMessages = new AtomicInteger();
@@ -489,7 +488,6 @@ public abstract class AbstractQueue<X ex
}
_estimatedAverageMessageHeaderSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
- _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES);
_mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
if(_defaultFilters != null)
@@ -2120,50 +2118,22 @@ public abstract class AbstractQueue<X ex
}
}
- void flushConsumer(QueueConsumer<?> sub)
- {
- flushConsumer(sub, Long.MAX_VALUE);
- }
-
- boolean flushConsumer(QueueConsumer<?> sub, long iterations)
+ boolean deliverSingleMessage(QueueConsumer<?> sub)
{
boolean atTail = false;
- final boolean keepSendLockHeld = iterations <= getMaxAsyncDeliveries();
boolean queueEmpty = false;
boolean deliveryAttempted = false;
+ sub.getSendLock();
try
{
- if(keepSendLockHeld)
+ if (!sub.isSuspended())
{
- sub.getSendLock();
- }
- while (!sub.isSuspended() && !atTail && iterations != 0)
- {
- try
+ atTail = attemptDelivery(sub, true);
+ deliveryAttempted = true;
+ if (atTail && getNextAvailableEntry(sub) == null)
{
- if(!keepSendLockHeld)
- {
- sub.getSendLock();
- }
-
- atTail = attemptDelivery(sub, true);
- deliveryAttempted = true;
- if (atTail && getNextAvailableEntry(sub) == null)
- {
- queueEmpty = true;
- }
- else if (!atTail)
- {
- iterations--;
- }
- }
- finally
- {
- if(!keepSendLockHeld)
- {
- sub.releaseSendLock();
- }
+ queueEmpty = true;
}
}
@@ -2175,10 +2145,7 @@ public abstract class AbstractQueue<X ex
}
finally
{
- if(keepSendLockHeld)
- {
- sub.releaseSendLock();
- }
+ sub.releaseSendLock();
if(queueEmpty)
{
sub.queueEmpty();
@@ -2188,7 +2155,6 @@ public abstract class AbstractQueue<X ex
}
-
// if there's (potentially) more than one consumer the others will potentially not have been advanced to the
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
@@ -3294,13 +3260,6 @@ public abstract class AbstractQueue<X ex
}
- int getMaxAsyncDeliveries()
- {
- return _maxAsyncDeliveries;
- }
-
-
-
private static final String[] NON_NEGATIVE_NUMBERS = {
ALERT_REPEAT_GAP,
ALERT_THRESHOLD_MESSAGE_AGE,
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Nov 3 16:02:52 2016
@@ -59,7 +59,6 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.StateChangeListener;
class QueueConsumerImpl
@@ -390,21 +389,9 @@ class QueueConsumerImpl
}
@Override
- public final void flush()
- {
- AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
- _queue.flushConsumer(this);
- _target.processPending();
-
- }
-
- @Override
public void pullMessage()
{
- AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
- _queue.flushConsumer(this, 1);
-
-
+ _queue.deliverSingleMessage(this);
}
public boolean resend(final QueueEntry entry)
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Thu Nov 3 16:02:52 2016
@@ -273,14 +273,6 @@ public abstract class AbstractSystemMess
return _name;
}
- @Override
- public void flush()
- {
- AMQPConnection<?> connection = getSessionModel().getAMQPConnection();
- deliverMessages();
- _target.processPending();
- }
-
public void send(final InternalMessage response)
{
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Thu Nov 3 16:02:52 2016
@@ -149,7 +149,6 @@ public class TCPandSSLTransportTest exte
when(port.getNumberOfSelectors()).thenReturn(1);
when(port.getSSLContext()).thenReturn(sslContext);
when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
- when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
when(port.getContextValue(Integer.class, AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT);
ObjectMapper mapper = new ObjectMapper();
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Thu Nov 3 16:02:52 2016
@@ -608,10 +608,7 @@ public class ConsumerTarget_0_10 extends
public void flush()
{
flushCreditState(true);
- for(ConsumerImpl consumer : getConsumers())
- {
- consumer.flush();
- }
+ while(sendNextMessage());
stop();
}
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Nov 3 16:02:52 2016
@@ -333,7 +333,7 @@ public class AMQChannel
}
ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
- sub.flush();
+ target.sendNextMessage();
sub.close();
return getDeliveryMethod.hasDeliveredMessage();
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Nov 3 16:02:52 2016
@@ -323,10 +323,7 @@ class ConsumerTarget_1_0 extends Abstrac
public void flush()
{
- for(ConsumerImpl consumer : getConsumers())
- {
- consumer.flush();
- }
+ while(sendNextMessage());
}
private class DispositionAction implements UnsettledAction
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Thu Nov 3 16:02:52 2016
@@ -205,12 +205,6 @@ class ManagementNodeConsumer implements
}
@Override
- public void flush()
- {
-
- }
-
- @Override
public ConsumerTarget getTarget()
{
return _target;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org