You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/03 16:02:53 UTC

svn commit: r1767916 - in /qpid/java/branches/remove-queue-runner: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/sr...

Author: lquack
Date: Thu Nov  3 16:02:52 2016
New Revision: 1767916

URL: http://svn.apache.org/viewvc?rev=1767916&view=rev
Log:
get rid of flush

Modified:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Nov  3 16:02:52 2016
@@ -299,17 +299,27 @@ public abstract class AbstractConsumerTa
     @Override
     public boolean sendNextMessage()
     {
-        if(_pullIterator == null || !_pullIterator.hasNext())
+        boolean iteratedCompleteList = false;
+        while (_queue.isEmpty())
         {
-            _pullIterator = getConsumers().iterator();
-        }
-        if(_pullIterator.hasNext())
-        {
-            ConsumerImpl consumer = _pullIterator.next();
+            if (_pullIterator == null || !_pullIterator.hasNext())
+            {
+                if (iteratedCompleteList)
+                {
+                    break;
+                }
+                iteratedCompleteList = true;
+
+                _pullIterator = getConsumers().iterator();
+            }
+            if (_pullIterator.hasNext())
+            {
+                ConsumerImpl consumer = _pullIterator.next();
 
-            _waitingOnStateChange.set(true);
+                _waitingOnStateChange.set(true);
 
-            consumer.pullMessage();
+                consumer.pullMessage();
+            }
         }
 
         ConsumerMessageInstancePair consumerMessage = _queue.poll();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Thu Nov  3 16:02:52 2016
@@ -81,6 +81,4 @@ public interface ConsumerImpl
     boolean isActive();
 
     String getName();
-
-    void flush();
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Thu Nov  3 16:02:52 2016
@@ -90,10 +90,6 @@ public interface Queue<X extends Queue<X
     @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
     long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l;
 
-    String MAX_ASYNCHRONOUS_DELIVERIES = "queue.maxAsynchronousDeliveries";
-    @ManagedContextDefault(name = MAX_ASYNCHRONOUS_DELIVERIES )
-    int DEFAULT_MAX_ASYNCHRONOUS_DELIVERIES = 80;
-
     String MIME_TYPE_TO_FILE_EXTENSION = "qpid.mimeTypeToFileExtension";
     @SuppressWarnings("unused")
     @ManagedContextDefault(name = MIME_TYPE_TO_FILE_EXTENSION, description = "A mapping of MIME types to file extensions.")

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Nov  3 16:02:52 2016
@@ -222,7 +222,6 @@ public abstract class AbstractQueue<X ex
             Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class));
 
 
-    private volatile int _maxAsyncDeliveries;
     private volatile long _estimatedAverageMessageHeaderSize;
 
     private AtomicInteger _deliveredMessages = new AtomicInteger();
@@ -489,7 +488,6 @@ public abstract class AbstractQueue<X ex
         }
 
         _estimatedAverageMessageHeaderSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
-        _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES);
         _mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
 
         if(_defaultFilters != null)
@@ -2120,50 +2118,22 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    void flushConsumer(QueueConsumer<?> sub)
-    {
-        flushConsumer(sub, Long.MAX_VALUE);
-    }
-
-    boolean flushConsumer(QueueConsumer<?> sub, long iterations)
+    boolean deliverSingleMessage(QueueConsumer<?> sub)
     {
         boolean atTail = false;
-        final boolean keepSendLockHeld = iterations <=  getMaxAsyncDeliveries();
         boolean queueEmpty = false;
         boolean deliveryAttempted = false;
 
+        sub.getSendLock();
         try
         {
-            if(keepSendLockHeld)
+            if (!sub.isSuspended())
             {
-                sub.getSendLock();
-            }
-            while (!sub.isSuspended() && !atTail && iterations != 0)
-            {
-                try
+                atTail = attemptDelivery(sub, true);
+                deliveryAttempted = true;
+                if (atTail && getNextAvailableEntry(sub) == null)
                 {
-                    if(!keepSendLockHeld)
-                    {
-                        sub.getSendLock();
-                    }
-
-                    atTail = attemptDelivery(sub, true);
-                    deliveryAttempted = true;
-                    if (atTail && getNextAvailableEntry(sub) == null)
-                    {
-                        queueEmpty = true;
-                    }
-                    else if (!atTail)
-                    {
-                        iterations--;
-                    }
-                }
-                finally
-                {
-                    if(!keepSendLockHeld)
-                    {
-                        sub.releaseSendLock();
-                    }
+                    queueEmpty = true;
                 }
             }
 
@@ -2175,10 +2145,7 @@ public abstract class AbstractQueue<X ex
         }
         finally
         {
-            if(keepSendLockHeld)
-            {
-                sub.releaseSendLock();
-            }
+            sub.releaseSendLock();
             if(queueEmpty)
             {
                 sub.queueEmpty();
@@ -2188,7 +2155,6 @@ public abstract class AbstractQueue<X ex
 
         }
 
-
         // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
         // next entry they are interested in yet.  This would lead to holding on to references to expired messages, etc
         // which would give us memory "leak".
@@ -3294,13 +3260,6 @@ public abstract class AbstractQueue<X ex
 
     }
 
-    int getMaxAsyncDeliveries()
-    {
-        return _maxAsyncDeliveries;
-    }
-
-
-
     private static final String[] NON_NEGATIVE_NUMBERS = {
         ALERT_REPEAT_GAP,
         ALERT_THRESHOLD_MESSAGE_AGE,

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Nov  3 16:02:52 2016
@@ -59,7 +59,6 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.StateChangeListener;
 
 class QueueConsumerImpl
@@ -390,21 +389,9 @@ class QueueConsumerImpl
     }
 
     @Override
-    public final void flush()
-    {
-        AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
-        _queue.flushConsumer(this);
-        _target.processPending();
-
-    }
-
-    @Override
     public void pullMessage()
     {
-        AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
-        _queue.flushConsumer(this, 1);
-
-
+        _queue.deliverSingleMessage(this);
     }
 
     public boolean resend(final QueueEntry entry)

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Thu Nov  3 16:02:52 2016
@@ -273,14 +273,6 @@ public abstract class AbstractSystemMess
             return _name;
         }
 
-        @Override
-        public void flush()
-        {
-            AMQPConnection<?> connection = getSessionModel().getAMQPConnection();
-            deliverMessages();
-            _target.processPending();
-        }
-
 
         public void send(final InternalMessage response)
         {

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Thu Nov  3 16:02:52 2016
@@ -149,7 +149,6 @@ public class TCPandSSLTransportTest exte
         when(port.getNumberOfSelectors()).thenReturn(1);
         when(port.getSSLContext()).thenReturn(sslContext);
         when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
-        when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
         when(port.getContextValue(Integer.class, AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
         when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT);
         ObjectMapper mapper = new ObjectMapper();

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Thu Nov  3 16:02:52 2016
@@ -608,10 +608,7 @@ public class ConsumerTarget_0_10 extends
     public void flush()
     {
         flushCreditState(true);
-        for(ConsumerImpl consumer : getConsumers())
-        {
-            consumer.flush();
-        }
+        while(sendNextMessage());
         stop();
     }
 

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Nov  3 16:02:52 2016
@@ -333,7 +333,7 @@ public class AMQChannel
         }
 
         ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
-        sub.flush();
+        target.sendNextMessage();
         sub.close();
         return getDeliveryMethod.hasDeliveredMessage();
 

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Nov  3 16:02:52 2016
@@ -323,10 +323,7 @@ class ConsumerTarget_1_0 extends Abstrac
 
     public void flush()
     {
-        for(ConsumerImpl consumer : getConsumers())
-        {
-            consumer.flush();
-        }
+        while(sendNextMessage());
     }
 
     private class DispositionAction implements UnsettledAction

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1767916&r1=1767915&r2=1767916&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Thu Nov  3 16:02:52 2016
@@ -205,12 +205,6 @@ class ManagementNodeConsumer implements
     }
 
     @Override
-    public void flush()
-    {
-
-    }
-
-    @Override
     public ConsumerTarget getTarget()
     {
         return _target;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org