You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/03 16:02:33 UTC

svn commit: r1767915 - in /qpid/java/branches/remove-queue-runner: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/model/port/ broker-core/src/main/java/org/apache/qpid/server/protocol/ broker...

Author: lquack
Date: Thu Nov  3 16:02:33 2016
New Revision: 1767915

URL: http://svn.apache.org/viewvc?rev=1767915&view=rev
Log:
get rid of pullOnly, queueRunnerWaitTime, and outbound message buffers size checking

Modified:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/branches/remove-queue-runner/test-profiles/apache-ci.test.overridden.properties

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Nov  3 16:02:33 2016
@@ -47,7 +47,6 @@ import org.apache.qpid.server.util.State
 public abstract class AbstractConsumerTarget implements ConsumerTarget, LogSubject
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
-    protected static final String PULL_ONLY_CONSUMER = "x-pull-only";
     private final AtomicReference<State> _state;
 
     private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
@@ -60,18 +59,15 @@ public abstract class AbstractConsumerTa
     private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
     private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
 
-    private final boolean _isPullOnly;
     private Iterator<ConsumerImpl> _pullIterator;
     private final AtomicBoolean _waitingOnStateChange = new AtomicBoolean();
 
 
     protected AbstractConsumerTarget(final State initialState,
-                                     final boolean isPullOnly,
                                      final boolean isMultiQueue,
                                      final AMQPConnection<?> amqpConnection)
     {
         _state = new AtomicReference<State>(initialState);
-        _isPullOnly = true;
         _isMultiQueue = isMultiQueue;
         _suspendedConsumerLoggingTicker = isMultiQueue
                 ? new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
@@ -271,18 +267,9 @@ public abstract class AbstractConsumerTa
     }
 
     @Override
-    public boolean isPullOnly()
-    {
-        return _isPullOnly;
-    }
-
-    @Override
     public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
-        AMQPConnection<?> amqpConnection = getSessionModel().getAMQPConnection();
-        amqpConnection.reserveOutboundMessageSpace(entry.getMessage().getSize());
         _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
-        amqpConnection.notifyWork();
         return entry.getMessage().getSize();
     }
 
@@ -291,7 +278,7 @@ public abstract class AbstractConsumerTa
     @Override
     public boolean hasMessagesToSend()
     {
-        return !_queue.isEmpty() || (isPullOnly() && messagesAvailable());
+        return !_queue.isEmpty() || messagesAvailable();
     }
 
     private boolean messagesAvailable()
@@ -312,30 +299,23 @@ public abstract class AbstractConsumerTa
     @Override
     public boolean sendNextMessage()
     {
-
-        if(isPullOnly())
+        if(_pullIterator == null || !_pullIterator.hasNext())
         {
+            _pullIterator = getConsumers().iterator();
+        }
+        if(_pullIterator.hasNext())
+        {
+            ConsumerImpl consumer = _pullIterator.next();
 
-            if(_pullIterator == null || !_pullIterator.hasNext())
-            {
-                _pullIterator = getConsumers().iterator();
-            }
-            if(_pullIterator.hasNext())
-            {
-                ConsumerImpl consumer = _pullIterator.next();
-
-                _waitingOnStateChange.set(true);
+            _waitingOnStateChange.set(true);
 
-                consumer.pullMessage();
-            }
+            consumer.pullMessage();
         }
+
         ConsumerMessageInstancePair consumerMessage = _queue.poll();
         if (consumerMessage != null)
         {
-            if(isPullOnly())
-            {
-                _waitingOnStateChange.set(false);
-            }
+            _waitingOnStateChange.set(false);
             try
             {
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Thu Nov  3 16:02:33 2016
@@ -92,6 +92,4 @@ public interface ConsumerTarget
 
     void releaseSendLock();
 
-    boolean isPullOnly();
-
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Thu Nov  3 16:02:33 2016
@@ -57,7 +57,6 @@ public interface AmqpPort<X extends Amqp
 
     String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors";
     String PORT_AMQP_ACCEPT_BACKLOG = "qpid.port.amqp.acceptBacklog";
-    String PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = "qpid.port.amqp.outboundMessageBufferSize";
 
     @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
     String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
@@ -88,11 +87,6 @@ public interface AmqpPort<X extends Amqp
     @ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT)
     int DEFAULT_OPEN_CONNECTIONS_WARN_PERCENT = 80;
 
-    @SuppressWarnings("unused")
-    @ManagedContextDefault(name = PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)
-    long DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = 1024 * 1024;
-
-
     String PROTOCOL_HANDSHAKE_TIMEOUT = "qpid.port.protocol_handshake_timeout";
 
     @SuppressWarnings("unused")

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Thu Nov  3 16:02:33 2016
@@ -106,6 +106,5 @@ public interface AMQSessionModel<T exten
     void addTicker(Ticker ticker);
     void removeTicker(Ticker ticker);
 
-    void notifyConsumerTargetCurrentStates();
     void ensureConsumersNoticedStateChange();
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Nov  3 16:02:33 2016
@@ -225,8 +225,6 @@ public abstract class AbstractQueue<X ex
     private volatile int _maxAsyncDeliveries;
     private volatile long _estimatedAverageMessageHeaderSize;
 
-    private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
-
     private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
@@ -289,7 +287,6 @@ public abstract class AbstractQueue<X ex
     private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
     private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
     private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
-    private volatile boolean _hasPullOnlyConsumers;
 
     private interface HoldMethod
     {
@@ -943,10 +940,6 @@ public abstract class AbstractQueue<X ex
 
         if (!isDeleted())
         {
-            if(consumer.isPullOnly())
-            {
-                _hasPullOnlyConsumers = true;
-            }
             _consumerList.add(consumer);
 
             if (isDeleted())
@@ -965,10 +958,7 @@ public abstract class AbstractQueue<X ex
         {
             consumer.queueEmpty();
         }
-        if(consumer.isPullOnly())
-        {
-            consumer.notifyWork();
-        }
+        consumer.notifyWork();
 
         return consumer;
     }
@@ -1009,18 +999,6 @@ public abstract class AbstractQueue<X ex
                 resetSubPointersForGroups(consumer);
             }
 
-            if(consumer.isPullOnly())
-            {
-                boolean hasOnlyPushConsumers = true;
-                ConsumerNode consumerNode = _consumerList.getHead().findNext();
-                while (consumerNode != null && hasOnlyPushConsumers)
-                {
-                    hasOnlyPushConsumers = !consumerNode.getConsumer().isPullOnly();
-                    consumerNode = consumerNode.findNext();
-                }
-                _hasPullOnlyConsumers = !hasOnlyPushConsumers;
-            }
-
             // auto-delete queues must be deleted if there are no remaining subscribers
 
             if(!consumer.isTransient()
@@ -1091,7 +1069,7 @@ public abstract class AbstractQueue<X ex
                 updateSubRequeueEntry(sub, entry);
             }
         }
-        notifyPullOnlyConsumers();
+        notifyAllConsumers();
     }
 
     public void addBinding(final Binding<?> binding)
@@ -1229,7 +1207,7 @@ public abstract class AbstractQueue<X ex
             if (entry.isAvailable())
             {
                 checkConsumersNotAheadOfDelivery(entry);
-                notifyPullOnlyConsumers();
+                notifyAllConsumers();
             }
 
             checkForNotificationOnNewMessage(entry.getMessage());
@@ -1522,7 +1500,7 @@ public abstract class AbstractQueue<X ex
                 updateSubRequeueEntry(sub, entry);
             }
         }
-        notifyPullOnlyConsumers();
+        notifyAllConsumers();
 
     }
 
@@ -1747,11 +1725,6 @@ public abstract class AbstractQueue<X ex
         _exclusiveSubscriber = exclusiveSubscriber;
     }
 
-    long getStateChangeCount()
-    {
-        return _stateChangeCount.get();
-    }
-
     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
     abstract QueueEntryList getEntries();
 
@@ -2133,26 +2106,22 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    void notifyPullOnlyConsumers()
+    void notifyAllConsumers()
     {
-        if(_hasPullOnlyConsumers)
+        ConsumerNode consumerNode = _consumerList.getHead().findNext();
+        while (consumerNode != null)
         {
-            ConsumerNode consumerNode = _consumerList.getHead().findNext();
-            while (consumerNode != null)
+            QueueConsumer<?> consumer = consumerNode.getConsumer();
+            if (consumer.isActive() && getNextAvailableEntry(consumer) != null)
             {
-                QueueConsumer<?> consumer = consumerNode.getConsumer();
-                if (consumer.isActive() && getNextAvailableEntry(consumer) != null)
-                {
-                    consumer.notifyWork();
-                }
-                consumerNode = consumerNode.findNext();
+                consumer.notifyWork();
             }
+            consumerNode = consumerNode.findNext();
         }
     }
 
     void flushConsumer(QueueConsumer<?> sub)
     {
-
         flushConsumer(sub, Long.MAX_VALUE);
     }
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Thu Nov  3 16:02:33 2016
@@ -54,7 +54,5 @@ public interface QueueConsumer<X extends
 
     boolean hasCredit();
 
-    boolean isPullOnly();
-
     void notifyWork();
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Nov  3 16:02:33 2016
@@ -339,12 +339,6 @@ class QueueConsumerImpl
     }
 
     @Override
-    public boolean isPullOnly()
-    {
-        return _target.isPullOnly();
-    }
-
-    @Override
     public void notifyWork()
     {
         _target.notifyWork();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Thu Nov  3 16:02:33 2016
@@ -72,8 +72,6 @@ public interface AMQPConnection<C extend
 
     void sendConnectionCloseAsync(AMQConstant connectionForced, String reason);
 
-    void reserveOutboundMessageSpace(long size);
-
     boolean isIOThread();
 
     void checkAuthorizedMessagePrincipal(String messageUserId);

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Thu Nov  3 16:02:33 2016
@@ -718,12 +718,6 @@ public abstract class AbstractAMQPConnec
         return getSessionModels().size();
     }
 
-    @Override
-    public void reserveOutboundMessageSpace(final long size)
-    {
-        _network.reserveOutboundMessageSpace(size);
-    }
-
     protected void markTransportClosed()
     {
         _transportClosedFuture.set(null);

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Thu Nov  3 16:02:33 2016
@@ -61,8 +61,6 @@ public class NonBlockingConnection imple
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final ProtocolEngine _protocolEngine;
     private final Runnable _onTransportEncryptionAction;
-    private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
-    private final long _outboundMessageBufferLimit;
 
     private volatile boolean _fullyWritten = true;
 
@@ -97,9 +95,6 @@ public class NonBlockingConnection imple
         _port = port;
         _threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteSocketAddress.toString();
 
-        _outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
-                                                                   AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
-
         protocolEngine.setWorkListener(new Action<ProtocolEngine>()
         {
             @Override
@@ -214,15 +209,6 @@ public class NonBlockingConnection imple
     }
 
     @Override
-    public void reserveOutboundMessageSpace(long size)
-    {
-        if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
-        {
-            // RG - TODO
-        }
-    }
-
-    @Override
     public String getTransportInfo()
     {
         return _delegate.getTransportInfo();
@@ -540,12 +526,7 @@ public class NonBlockingConnection imple
             _buffers.poll();
             buf.dispose();
         }
-        if (_fullyWritten)
-        {
-            _usedOutboundMessageSpace.set(0);
-        }
         return _fullyWritten;
-
     }
 
     protected int readFromNetwork() throws IOException

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java Thu Nov  3 16:02:33 2016
@@ -23,8 +23,6 @@ import org.apache.qpid.transport.network
 
 public interface ServerNetworkConnection extends NetworkConnection
 {
-    void reserveOutboundMessageSpace(long size);
-
     String getTransportInfo();
 
     long getScheduledTime();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Thu Nov  3 16:02:33 2016
@@ -293,12 +293,6 @@ public class MockConsumer implements Con
     }
 
     @Override
-    public boolean isPullOnly()
-    {
-        return true;
-    }
-
-    @Override
     public boolean isMultiQueue()
     {
         return false;
@@ -513,12 +507,6 @@ public class MockConsumer implements Con
         {
 
         }
-
-        @Override
-        public void notifyConsumerTargetCurrentStates()
-        {
-
-        }
 
         @Override
         public void ensureConsumersNoticedStateChange()

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Thu Nov  3 16:02:33 2016
@@ -71,7 +71,6 @@ import org.apache.qpid.test.utils.QpidTe
 abstract class AbstractQueueTestBase extends QpidTestCase
 {
     private static final Logger _logger = LoggerFactory.getLogger(AbstractQueueTestBase.class);
-    private long _queueRunnerWaitTime;
     private Queue<?> _queue;
     private VirtualHost<?> _virtualHost;
     private String _qname = "qname";
@@ -97,8 +96,6 @@ abstract class AbstractQueueTestBase ext
         _queue = _virtualHost.createChild(Queue.class, attributes);
 
         _exchange = (DirectExchange) _virtualHost.getChildByName(Exchange.class, ExchangeDefaults.DIRECT_EXCHANGE_NAME);
-        _queueRunnerWaitTime = Long.getLong("AbstractQueueTestBase.queueRunnerWaitTime", 150L);
-        _logger.debug("Using AbstractQueueTestBase.queueRunnerWaitTime {}", _queueRunnerWaitTime);
     }
 
     @Override
@@ -404,7 +401,7 @@ abstract class AbstractQueueTestBase ext
 
         /* Enqueue one message with expiration set for a short time in the future */
 
-        final long expiration = System.currentTimeMillis() + _queueRunnerWaitTime;
+        final long expiration = System.currentTimeMillis() + 100L;
         when(messageA.getExpiration()).thenReturn(expiration);
 
         _queue.enqueue(messageA, postEnqueueAction, null);
@@ -575,19 +572,9 @@ abstract class AbstractQueueTestBase ext
         // Check sending a message ends up with the subscriber
         _queue.enqueue(messageA, null, null);
 
-        final long timeout = System.currentTimeMillis() + _queueRunnerWaitTime;
-
-        QueueEntry lastSeen = null;
-
         while(_consumerTarget.processPending());
 
-        /*while (timeout > System.currentTimeMillis() &&
-               ((lastSeen = _consumer.getQueueContext().getLastSeenEntry()) == null || lastSeen.getMessage() == null))
-        {
-            Thread.sleep(10);
-        }
-*/
-        assertEquals("Queue context did not see expected message within timeout",
+        assertEquals("Queue context did not see expected message",
                      messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
 
         // Check we cannot add a second subscriber to the queue
@@ -1240,9 +1227,4 @@ abstract class AbstractQueueTestBase ext
         return _consumerTarget;
     }
 
-    public long getQueueRunnerWaitTime()
-    {
-        return _queueRunnerWaitTime;
-    }
-
 }

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Thu Nov  3 16:02:33 2016
@@ -116,7 +116,7 @@ public class ConsumerTarget_0_10 extends
                                Map<String, Object> arguments,
                                boolean multiQueue)
     {
-        super(State.SUSPENDED, isPullOnly(arguments), multiQueue, session.getAMQPConnection());
+        super(State.SUSPENDED, multiQueue, session.getAMQPConnection());
         _session = session;
         _postIdSettingAction = new AddMessageDispositionListenerAction(session);
         _acceptMode = acceptMode;
@@ -135,13 +135,6 @@ public class ConsumerTarget_0_10 extends
         }
     }
 
-    private static boolean isPullOnly(Map<String, Object> arguments)
-    {
-        return arguments != null
-               && arguments.containsKey(PULL_ONLY_CONSUMER)
-               && Boolean.valueOf(String.valueOf(arguments.get(PULL_ONLY_CONSUMER)));
-    }
-
     @Override
     public boolean isFlowSuspended()
     {

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Nov  3 16:02:33 2016
@@ -1250,19 +1250,6 @@ public class ServerSession extends Sessi
     }
 
     @Override
-    public void notifyConsumerTargetCurrentStates()
-    {
-        Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
-        for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
-        {
-            if(!consumerTarget.isPullOnly())
-            {
-                consumerTarget.notifyCurrentState();
-            }
-        }
-    }
-
-    @Override
     public void ensureConsumersNoticedStateChange()
     {
         Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Nov  3 16:02:33 2016
@@ -3817,18 +3817,6 @@ public class AMQChannel
     }
 
     @Override
-    public void notifyConsumerTargetCurrentStates()
-    {
-        for(ConsumerTarget_0_8 consumerTarget : getConsumerTargets())
-        {
-            if(!consumerTarget.isPullOnly())
-            {
-                consumerTarget.notifyCurrentState();
-            }
-        }
-    }
-
-    @Override
     public void ensureConsumersNoticedStateChange()
     {
         for (ConsumerTarget_0_8 consumerTarget : getConsumerTargets())

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Thu Nov  3 16:02:33 2016
@@ -305,7 +305,7 @@ public abstract class ConsumerTarget_0_8
                               RecordDeliveryMethod recordMethod,
                               boolean multiQueue)
     {
-        super(State.ACTIVE, isPullOnly(arguments), multiQueue, channel.getAMQPConnection());
+        super(State.ACTIVE, multiQueue, channel.getAMQPConnection());
 
         _channel = channel;
         _consumerTag = consumerTag;
@@ -344,15 +344,6 @@ public abstract class ConsumerTarget_0_8
         }
     }
 
-    private static boolean isPullOnly(FieldTable arguments)
-    {
-        return arguments != null
-               && arguments.containsKey(PULL_ONLY_CONSUMER)
-               && Boolean.valueOf(String.valueOf(arguments.get(PULL_ONLY_CONSUMER)));
-    }
-
-
-
     @Override
     public String getTargetAddress()
     {

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Nov  3 16:02:33 2016
@@ -75,19 +75,13 @@ class ConsumerTarget_1_0 extends Abstrac
     public ConsumerTarget_1_0(final SendingLink_1_0 link,
                               boolean acquires)
     {
-        super(State.SUSPENDED, isPullOnly(link), false, link.getSession().getAMQPConnection());
+        super(State.SUSPENDED, false, link.getSession().getAMQPConnection());
         _link = link;
         _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
         _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
         _acquires = acquires;
     }
 
-    private static boolean isPullOnly(SendingLink_1_0 link)
-    {
-        Source source = (Source) link.getEndpoint().getSource();
-        return source.getCapabilities() != null && Arrays.asList(source.getCapabilities()).contains(Symbol.getSymbol("QPID:PULL-ONLY"));
-    }
-
     private SendingLinkEndpoint getEndpoint()
     {
         return _link.getEndpoint();

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Nov  3 16:02:33 2016
@@ -1577,19 +1577,6 @@ public class Session_1_0 implements AMQS
     }
 
     @Override
-    public void notifyConsumerTargetCurrentStates()
-    {
-        for(SendingLink_1_0 link : _sendingLinks)
-        {
-            ConsumerTarget_1_0 consumerTarget = link.getConsumerTarget();
-            if(!consumerTarget.isPullOnly())
-            {
-                consumerTarget.notifyCurrentState();
-            }
-        }
-    }
-
-    @Override
     public void ensureConsumersNoticedStateChange()
     {
         for(SendingLink_1_0 link : _sendingLinks)

Modified: qpid/java/branches/remove-queue-runner/test-profiles/apache-ci.test.overridden.properties
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/test-profiles/apache-ci.test.overridden.properties?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/test-profiles/apache-ci.test.overridden.properties (original)
+++ qpid/java/branches/remove-queue-runner/test-profiles/apache-ci.test.overridden.properties Thu Nov  3 16:02:33 2016
@@ -21,7 +21,6 @@
 
 qpid.port.protocol_handshake_timeout=10000
 
-AbstractQueueTestBase.queueRunnerWaitTime=1000
 FailoverBehaviourTest.brokerStartupTime=60000
 FailoverBaseCase.defaultFailoverTime=15000
 qpid.test_nowait_for_ports=true



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org