You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/03 16:02:33 UTC
svn commit: r1767915 - in /qpid/java/branches/remove-queue-runner:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/model/port/
broker-core/src/main/java/org/apache/qpid/server/protocol/ broker...
Author: lquack
Date: Thu Nov 3 16:02:33 2016
New Revision: 1767915
URL: http://svn.apache.org/viewvc?rev=1767915&view=rev
Log:
get rid of pullOnly, queueRunnerWaitTime, and outbound message buffers size checking
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/branches/remove-queue-runner/test-profiles/apache-ci.test.overridden.properties
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Nov 3 16:02:33 2016
@@ -47,7 +47,6 @@ import org.apache.qpid.server.util.State
public abstract class AbstractConsumerTarget implements ConsumerTarget, LogSubject
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
- protected static final String PULL_ONLY_CONSUMER = "x-pull-only";
private final AtomicReference<State> _state;
private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
@@ -60,18 +59,15 @@ public abstract class AbstractConsumerTa
private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
- private final boolean _isPullOnly;
private Iterator<ConsumerImpl> _pullIterator;
private final AtomicBoolean _waitingOnStateChange = new AtomicBoolean();
protected AbstractConsumerTarget(final State initialState,
- final boolean isPullOnly,
final boolean isMultiQueue,
final AMQPConnection<?> amqpConnection)
{
_state = new AtomicReference<State>(initialState);
- _isPullOnly = true;
_isMultiQueue = isMultiQueue;
_suspendedConsumerLoggingTicker = isMultiQueue
? new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
@@ -271,18 +267,9 @@ public abstract class AbstractConsumerTa
}
@Override
- public boolean isPullOnly()
- {
- return _isPullOnly;
- }
-
- @Override
public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
- AMQPConnection<?> amqpConnection = getSessionModel().getAMQPConnection();
- amqpConnection.reserveOutboundMessageSpace(entry.getMessage().getSize());
_queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
- amqpConnection.notifyWork();
return entry.getMessage().getSize();
}
@@ -291,7 +278,7 @@ public abstract class AbstractConsumerTa
@Override
public boolean hasMessagesToSend()
{
- return !_queue.isEmpty() || (isPullOnly() && messagesAvailable());
+ return !_queue.isEmpty() || messagesAvailable();
}
private boolean messagesAvailable()
@@ -312,30 +299,23 @@ public abstract class AbstractConsumerTa
@Override
public boolean sendNextMessage()
{
-
- if(isPullOnly())
+ if(_pullIterator == null || !_pullIterator.hasNext())
{
+ _pullIterator = getConsumers().iterator();
+ }
+ if(_pullIterator.hasNext())
+ {
+ ConsumerImpl consumer = _pullIterator.next();
- if(_pullIterator == null || !_pullIterator.hasNext())
- {
- _pullIterator = getConsumers().iterator();
- }
- if(_pullIterator.hasNext())
- {
- ConsumerImpl consumer = _pullIterator.next();
-
- _waitingOnStateChange.set(true);
+ _waitingOnStateChange.set(true);
- consumer.pullMessage();
- }
+ consumer.pullMessage();
}
+
ConsumerMessageInstancePair consumerMessage = _queue.poll();
if (consumerMessage != null)
{
- if(isPullOnly())
- {
- _waitingOnStateChange.set(false);
- }
+ _waitingOnStateChange.set(false);
try
{
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Thu Nov 3 16:02:33 2016
@@ -92,6 +92,4 @@ public interface ConsumerTarget
void releaseSendLock();
- boolean isPullOnly();
-
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Thu Nov 3 16:02:33 2016
@@ -57,7 +57,6 @@ public interface AmqpPort<X extends Amqp
String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors";
String PORT_AMQP_ACCEPT_BACKLOG = "qpid.port.amqp.acceptBacklog";
- String PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = "qpid.port.amqp.outboundMessageBufferSize";
@ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
@@ -88,11 +87,6 @@ public interface AmqpPort<X extends Amqp
@ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT)
int DEFAULT_OPEN_CONNECTIONS_WARN_PERCENT = 80;
- @SuppressWarnings("unused")
- @ManagedContextDefault(name = PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)
- long DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = 1024 * 1024;
-
-
String PROTOCOL_HANDSHAKE_TIMEOUT = "qpid.port.protocol_handshake_timeout";
@SuppressWarnings("unused")
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Thu Nov 3 16:02:33 2016
@@ -106,6 +106,5 @@ public interface AMQSessionModel<T exten
void addTicker(Ticker ticker);
void removeTicker(Ticker ticker);
- void notifyConsumerTargetCurrentStates();
void ensureConsumersNoticedStateChange();
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Nov 3 16:02:33 2016
@@ -225,8 +225,6 @@ public abstract class AbstractQueue<X ex
private volatile int _maxAsyncDeliveries;
private volatile long _estimatedAverageMessageHeaderSize;
- private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
-
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
@@ -289,7 +287,6 @@ public abstract class AbstractQueue<X ex
private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
- private volatile boolean _hasPullOnlyConsumers;
private interface HoldMethod
{
@@ -943,10 +940,6 @@ public abstract class AbstractQueue<X ex
if (!isDeleted())
{
- if(consumer.isPullOnly())
- {
- _hasPullOnlyConsumers = true;
- }
_consumerList.add(consumer);
if (isDeleted())
@@ -965,10 +958,7 @@ public abstract class AbstractQueue<X ex
{
consumer.queueEmpty();
}
- if(consumer.isPullOnly())
- {
- consumer.notifyWork();
- }
+ consumer.notifyWork();
return consumer;
}
@@ -1009,18 +999,6 @@ public abstract class AbstractQueue<X ex
resetSubPointersForGroups(consumer);
}
- if(consumer.isPullOnly())
- {
- boolean hasOnlyPushConsumers = true;
- ConsumerNode consumerNode = _consumerList.getHead().findNext();
- while (consumerNode != null && hasOnlyPushConsumers)
- {
- hasOnlyPushConsumers = !consumerNode.getConsumer().isPullOnly();
- consumerNode = consumerNode.findNext();
- }
- _hasPullOnlyConsumers = !hasOnlyPushConsumers;
- }
-
// auto-delete queues must be deleted if there are no remaining subscribers
if(!consumer.isTransient()
@@ -1091,7 +1069,7 @@ public abstract class AbstractQueue<X ex
updateSubRequeueEntry(sub, entry);
}
}
- notifyPullOnlyConsumers();
+ notifyAllConsumers();
}
public void addBinding(final Binding<?> binding)
@@ -1229,7 +1207,7 @@ public abstract class AbstractQueue<X ex
if (entry.isAvailable())
{
checkConsumersNotAheadOfDelivery(entry);
- notifyPullOnlyConsumers();
+ notifyAllConsumers();
}
checkForNotificationOnNewMessage(entry.getMessage());
@@ -1522,7 +1500,7 @@ public abstract class AbstractQueue<X ex
updateSubRequeueEntry(sub, entry);
}
}
- notifyPullOnlyConsumers();
+ notifyAllConsumers();
}
@@ -1747,11 +1725,6 @@ public abstract class AbstractQueue<X ex
_exclusiveSubscriber = exclusiveSubscriber;
}
- long getStateChangeCount()
- {
- return _stateChangeCount.get();
- }
-
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
abstract QueueEntryList getEntries();
@@ -2133,26 +2106,22 @@ public abstract class AbstractQueue<X ex
}
}
- void notifyPullOnlyConsumers()
+ void notifyAllConsumers()
{
- if(_hasPullOnlyConsumers)
+ ConsumerNode consumerNode = _consumerList.getHead().findNext();
+ while (consumerNode != null)
{
- ConsumerNode consumerNode = _consumerList.getHead().findNext();
- while (consumerNode != null)
+ QueueConsumer<?> consumer = consumerNode.getConsumer();
+ if (consumer.isActive() && getNextAvailableEntry(consumer) != null)
{
- QueueConsumer<?> consumer = consumerNode.getConsumer();
- if (consumer.isActive() && getNextAvailableEntry(consumer) != null)
- {
- consumer.notifyWork();
- }
- consumerNode = consumerNode.findNext();
+ consumer.notifyWork();
}
+ consumerNode = consumerNode.findNext();
}
}
void flushConsumer(QueueConsumer<?> sub)
{
-
flushConsumer(sub, Long.MAX_VALUE);
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Thu Nov 3 16:02:33 2016
@@ -54,7 +54,5 @@ public interface QueueConsumer<X extends
boolean hasCredit();
- boolean isPullOnly();
-
void notifyWork();
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Nov 3 16:02:33 2016
@@ -339,12 +339,6 @@ class QueueConsumerImpl
}
@Override
- public boolean isPullOnly()
- {
- return _target.isPullOnly();
- }
-
- @Override
public void notifyWork()
{
_target.notifyWork();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Thu Nov 3 16:02:33 2016
@@ -72,8 +72,6 @@ public interface AMQPConnection<C extend
void sendConnectionCloseAsync(AMQConstant connectionForced, String reason);
- void reserveOutboundMessageSpace(long size);
-
boolean isIOThread();
void checkAuthorizedMessagePrincipal(String messageUserId);
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Thu Nov 3 16:02:33 2016
@@ -718,12 +718,6 @@ public abstract class AbstractAMQPConnec
return getSessionModels().size();
}
- @Override
- public void reserveOutboundMessageSpace(final long size)
- {
- _network.reserveOutboundMessageSpace(size);
- }
-
protected void markTransportClosed()
{
_transportClosedFuture.set(null);
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Thu Nov 3 16:02:33 2016
@@ -61,8 +61,6 @@ public class NonBlockingConnection imple
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final ProtocolEngine _protocolEngine;
private final Runnable _onTransportEncryptionAction;
- private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
- private final long _outboundMessageBufferLimit;
private volatile boolean _fullyWritten = true;
@@ -97,9 +95,6 @@ public class NonBlockingConnection imple
_port = port;
_threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteSocketAddress.toString();
- _outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
- AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
-
protocolEngine.setWorkListener(new Action<ProtocolEngine>()
{
@Override
@@ -214,15 +209,6 @@ public class NonBlockingConnection imple
}
@Override
- public void reserveOutboundMessageSpace(long size)
- {
- if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
- {
- // RG - TODO
- }
- }
-
- @Override
public String getTransportInfo()
{
return _delegate.getTransportInfo();
@@ -540,12 +526,7 @@ public class NonBlockingConnection imple
_buffers.poll();
buf.dispose();
}
- if (_fullyWritten)
- {
- _usedOutboundMessageSpace.set(0);
- }
return _fullyWritten;
-
}
protected int readFromNetwork() throws IOException
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java Thu Nov 3 16:02:33 2016
@@ -23,8 +23,6 @@ import org.apache.qpid.transport.network
public interface ServerNetworkConnection extends NetworkConnection
{
- void reserveOutboundMessageSpace(long size);
-
String getTransportInfo();
long getScheduledTime();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Thu Nov 3 16:02:33 2016
@@ -293,12 +293,6 @@ public class MockConsumer implements Con
}
@Override
- public boolean isPullOnly()
- {
- return true;
- }
-
- @Override
public boolean isMultiQueue()
{
return false;
@@ -513,12 +507,6 @@ public class MockConsumer implements Con
{
}
-
- @Override
- public void notifyConsumerTargetCurrentStates()
- {
-
- }
@Override
public void ensureConsumersNoticedStateChange()
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Thu Nov 3 16:02:33 2016
@@ -71,7 +71,6 @@ import org.apache.qpid.test.utils.QpidTe
abstract class AbstractQueueTestBase extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(AbstractQueueTestBase.class);
- private long _queueRunnerWaitTime;
private Queue<?> _queue;
private VirtualHost<?> _virtualHost;
private String _qname = "qname";
@@ -97,8 +96,6 @@ abstract class AbstractQueueTestBase ext
_queue = _virtualHost.createChild(Queue.class, attributes);
_exchange = (DirectExchange) _virtualHost.getChildByName(Exchange.class, ExchangeDefaults.DIRECT_EXCHANGE_NAME);
- _queueRunnerWaitTime = Long.getLong("AbstractQueueTestBase.queueRunnerWaitTime", 150L);
- _logger.debug("Using AbstractQueueTestBase.queueRunnerWaitTime {}", _queueRunnerWaitTime);
}
@Override
@@ -404,7 +401,7 @@ abstract class AbstractQueueTestBase ext
/* Enqueue one message with expiration set for a short time in the future */
- final long expiration = System.currentTimeMillis() + _queueRunnerWaitTime;
+ final long expiration = System.currentTimeMillis() + 100L;
when(messageA.getExpiration()).thenReturn(expiration);
_queue.enqueue(messageA, postEnqueueAction, null);
@@ -575,19 +572,9 @@ abstract class AbstractQueueTestBase ext
// Check sending a message ends up with the subscriber
_queue.enqueue(messageA, null, null);
- final long timeout = System.currentTimeMillis() + _queueRunnerWaitTime;
-
- QueueEntry lastSeen = null;
-
while(_consumerTarget.processPending());
- /*while (timeout > System.currentTimeMillis() &&
- ((lastSeen = _consumer.getQueueContext().getLastSeenEntry()) == null || lastSeen.getMessage() == null))
- {
- Thread.sleep(10);
- }
-*/
- assertEquals("Queue context did not see expected message within timeout",
+ assertEquals("Queue context did not see expected message",
messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
@@ -1240,9 +1227,4 @@ abstract class AbstractQueueTestBase ext
return _consumerTarget;
}
- public long getQueueRunnerWaitTime()
- {
- return _queueRunnerWaitTime;
- }
-
}
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Thu Nov 3 16:02:33 2016
@@ -116,7 +116,7 @@ public class ConsumerTarget_0_10 extends
Map<String, Object> arguments,
boolean multiQueue)
{
- super(State.SUSPENDED, isPullOnly(arguments), multiQueue, session.getAMQPConnection());
+ super(State.SUSPENDED, multiQueue, session.getAMQPConnection());
_session = session;
_postIdSettingAction = new AddMessageDispositionListenerAction(session);
_acceptMode = acceptMode;
@@ -135,13 +135,6 @@ public class ConsumerTarget_0_10 extends
}
}
- private static boolean isPullOnly(Map<String, Object> arguments)
- {
- return arguments != null
- && arguments.containsKey(PULL_ONLY_CONSUMER)
- && Boolean.valueOf(String.valueOf(arguments.get(PULL_ONLY_CONSUMER)));
- }
-
@Override
public boolean isFlowSuspended()
{
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Nov 3 16:02:33 2016
@@ -1250,19 +1250,6 @@ public class ServerSession extends Sessi
}
@Override
- public void notifyConsumerTargetCurrentStates()
- {
- Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
- for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
- {
- if(!consumerTarget.isPullOnly())
- {
- consumerTarget.notifyCurrentState();
- }
- }
- }
-
- @Override
public void ensureConsumersNoticedStateChange()
{
Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Nov 3 16:02:33 2016
@@ -3817,18 +3817,6 @@ public class AMQChannel
}
@Override
- public void notifyConsumerTargetCurrentStates()
- {
- for(ConsumerTarget_0_8 consumerTarget : getConsumerTargets())
- {
- if(!consumerTarget.isPullOnly())
- {
- consumerTarget.notifyCurrentState();
- }
- }
- }
-
- @Override
public void ensureConsumersNoticedStateChange()
{
for (ConsumerTarget_0_8 consumerTarget : getConsumerTargets())
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Thu Nov 3 16:02:33 2016
@@ -305,7 +305,7 @@ public abstract class ConsumerTarget_0_8
RecordDeliveryMethod recordMethod,
boolean multiQueue)
{
- super(State.ACTIVE, isPullOnly(arguments), multiQueue, channel.getAMQPConnection());
+ super(State.ACTIVE, multiQueue, channel.getAMQPConnection());
_channel = channel;
_consumerTag = consumerTag;
@@ -344,15 +344,6 @@ public abstract class ConsumerTarget_0_8
}
}
- private static boolean isPullOnly(FieldTable arguments)
- {
- return arguments != null
- && arguments.containsKey(PULL_ONLY_CONSUMER)
- && Boolean.valueOf(String.valueOf(arguments.get(PULL_ONLY_CONSUMER)));
- }
-
-
-
@Override
public String getTargetAddress()
{
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Nov 3 16:02:33 2016
@@ -75,19 +75,13 @@ class ConsumerTarget_1_0 extends Abstrac
public ConsumerTarget_1_0(final SendingLink_1_0 link,
boolean acquires)
{
- super(State.SUSPENDED, isPullOnly(link), false, link.getSession().getAMQPConnection());
+ super(State.SUSPENDED, false, link.getSession().getAMQPConnection());
_link = link;
_typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
_sectionEncoder = new SectionEncoderImpl(_typeRegistry);
_acquires = acquires;
}
- private static boolean isPullOnly(SendingLink_1_0 link)
- {
- Source source = (Source) link.getEndpoint().getSource();
- return source.getCapabilities() != null && Arrays.asList(source.getCapabilities()).contains(Symbol.getSymbol("QPID:PULL-ONLY"));
- }
-
private SendingLinkEndpoint getEndpoint()
{
return _link.getEndpoint();
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Nov 3 16:02:33 2016
@@ -1577,19 +1577,6 @@ public class Session_1_0 implements AMQS
}
@Override
- public void notifyConsumerTargetCurrentStates()
- {
- for(SendingLink_1_0 link : _sendingLinks)
- {
- ConsumerTarget_1_0 consumerTarget = link.getConsumerTarget();
- if(!consumerTarget.isPullOnly())
- {
- consumerTarget.notifyCurrentState();
- }
- }
- }
-
- @Override
public void ensureConsumersNoticedStateChange()
{
for(SendingLink_1_0 link : _sendingLinks)
Modified: qpid/java/branches/remove-queue-runner/test-profiles/apache-ci.test.overridden.properties
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/test-profiles/apache-ci.test.overridden.properties?rev=1767915&r1=1767914&r2=1767915&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/test-profiles/apache-ci.test.overridden.properties (original)
+++ qpid/java/branches/remove-queue-runner/test-profiles/apache-ci.test.overridden.properties Thu Nov 3 16:02:33 2016
@@ -21,7 +21,6 @@
qpid.port.protocol_handshake_timeout=10000
-AbstractQueueTestBase.queueRunnerWaitTime=1000
FailoverBehaviourTest.brokerStartupTime=60000
FailoverBaseCase.defaultFailoverTime=15000
qpid.test_nowait_for_ports=true
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org