You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/03/03 15:15:31 UTC
svn commit: r1663708 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-core/src/test/java/org/apache/qpid/server/consumer...
Author: kwall
Date: Tue Mar 3 14:15:30 2015
New Revision: 1663708
URL: http://svn.apache.org/r1663708
Log:
channel block/unblock now async, remove unnecessary selector bumps
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Tue Mar 3 14:15:30 2015
@@ -116,5 +116,5 @@ public interface AMQSessionModel<T exten
void transportStateChanged();
- void processPendingMessages();
+ void processPending();
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Tue Mar 3 14:15:30 2015
@@ -623,6 +623,8 @@ public class NonBlockingConnection imple
@Override
public void send(final ByteBuffer msg)
{
+ assert Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX) : "Send called by unexpected thread " + Thread.currentThread().getName();
+
if (_closed.get())
{
throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed");
@@ -634,7 +636,5 @@ public class NonBlockingConnection imple
@Override
public void flush()
{
- getSelector().wakeup();
-
}
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Tue Mar 3 14:15:30 2015
@@ -36,11 +36,9 @@ import java.util.concurrent.ScheduledThr
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-/**
-* Created by keith on 28/01/2015.
-*/
public class SelectorThread extends Thread
{
+ public static final String IO_THREAD_NAME_PREFIX = "NCS-";
private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
@@ -289,7 +287,8 @@ public class SelectorThread extends Thre
String currentName = Thread.currentThread().getName();
try
{
- Thread.currentThread().setName("NCS-"+connection.getRemoteAddress().toString());
+ Thread.currentThread().setName(
+ IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString());
processConnection(connection);
}
finally
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Tue Mar 3 14:15:30 2015
@@ -488,7 +488,7 @@ public class MockConsumer implements Con
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Tue Mar 3 14:15:30 2015
@@ -632,7 +632,6 @@ public class ConsumerTarget_0_10 extends
public void flushBatched()
{
- _session.getConnection().flush();
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Tue Mar 3 14:15:30 2015
@@ -70,7 +70,6 @@ public class ServerConnection extends Co
{
private final Broker<?> _broker;
- private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
private final Subject _authorizedSubject = new Subject();
@@ -79,10 +78,10 @@ public class ServerConnection extends Co
private final long _connectionId;
private final Object _reference = new Object();
private VirtualHostImpl<?,?,?> _virtualHost;
- private AmqpPort<?> _port;
- private AtomicLong _lastIoTime = new AtomicLong();
+ private final AmqpPort<?> _port;
+ private final AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
- private Transport _transport;
+ private final Transport _transport;
private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList =
new CopyOnWriteArrayList<Action<? super ServerConnection>>();
@@ -95,7 +94,7 @@ public class ServerConnection extends Co
private volatile boolean _stopped;
private int _messageCompressionThreshold;
- private int _maxMessageSize;
+ private final int _maxMessageSize;
private ServerProtocolEngine _serverProtocolEngine;
@@ -149,10 +148,6 @@ public class ServerConnection extends Co
if (state == State.OPEN)
{
- if (_onOpenTask != null)
- {
- _onOpenTask.run();
- }
getEventLogger().message(ConnectionMessages.OPEN(getClientId(),
"0-10",
getClientVersion(),
@@ -256,11 +251,6 @@ public class ServerConnection extends Co
return _stopped;
}
- public void onOpen(final Runnable task)
- {
- _onOpenTask = task;
- }
-
public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message)
{
addAsyncTask(new Action<ServerConnection>()
@@ -740,7 +730,7 @@ public class ServerConnection extends Co
for (AMQSessionModel session : getSessionModels())
{
- session.processPendingMessages();
+ session.processPending();
}
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Mar 3 14:15:30 2015
@@ -137,6 +137,7 @@ public class ServerSession extends Sessi
private org.apache.qpid.server.model.Session<?> _modelObject;
private long _blockTime;
private long _blockingTimeout;
+ private boolean _wireBlockingState;
public static interface MessageDispositionChangeListener
{
@@ -208,10 +209,6 @@ public class ServerSession extends Sessi
if (state == State.OPEN)
{
getVirtualHost().getEventLogger().message(ChannelMessages.CREATE());
- if(_blocking.get())
- {
- invokeBlock();
- }
}
}
else
@@ -245,6 +242,17 @@ public class ServerSession extends Sessi
invoke(new MessageStop(""));
}
+ private void invokeUnblock()
+ {
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _outstandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
+ }
+
+
@Override
protected boolean isFull(int id)
{
@@ -824,12 +832,11 @@ public class ServerSession extends Sessi
if(_blocking.compareAndSet(false,true))
{
+ getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
if(getState() == State.OPEN)
{
- invokeBlock();
+ getConnection().notifyWork();
}
- _blockTime = System.currentTimeMillis();
- getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
}
@@ -853,24 +860,17 @@ public class ServerSession extends Sessi
{
if(_blocking.compareAndSet(true,false) && !isClosing())
{
- _blockTime = 0l;
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
- MessageFlow mf = new MessageFlow();
- mf.setUnit(MessageCreditUnit.MESSAGE);
- mf.setDestination("");
- _outstandingCredit.set(Integer.MAX_VALUE);
- mf.setValue(Integer.MAX_VALUE);
- invoke(mf);
-
-
+ getConnection().notifyWork();
}
}
}
+
boolean blockingTimeoutExceeded()
{
long blockTime = _blockTime;
- boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
+ boolean b = _wireBlockingState && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
return b;
}
@@ -1136,8 +1136,25 @@ public class ServerSession extends Sessi
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
+ boolean desiredBlockingState = _blocking.get();
+ if (desiredBlockingState != _wireBlockingState)
+ {
+ _wireBlockingState = desiredBlockingState;
+
+ if (desiredBlockingState)
+ {
+ invokeBlock();
+ }
+ else
+ {
+ invokeUnblock();
+ }
+ _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
+ }
+
+
for(ConsumerTarget target : getSubscriptions())
{
target.processPending();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Mar 3 14:15:30 2015
@@ -209,6 +209,8 @@ public class AMQChannel
private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>();
private long _maxUncommittedInMemorySize;
+ private boolean _wireBlockingState;
+
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
{
_creditManager = new Pre0_10CreditManager(0l,0l, connection);
@@ -1611,12 +1613,14 @@ public class AMQChannel
{
if(_blockingEntities.add(this))
{
+
if(_blocking.compareAndSet(false,true))
{
getVirtualHost().getEventLogger().message(_logSubject,
ChannelMessages.FLOW_ENFORCED("** All Queues **"));
- flow(false);
- _blockTime = System.currentTimeMillis();
+
+
+ getConnection().notifyWork();
}
}
}
@@ -1628,8 +1632,7 @@ public class AMQChannel
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
-
- flow(true);
+ getConnection().notifyWork();
}
}
}
@@ -1643,8 +1646,7 @@ public class AMQChannel
if(_blocking.compareAndSet(false,true))
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName()));
- flow(false);
- _blockTime = System.currentTimeMillis();
+ getConnection().notifyWork();
}
}
@@ -1657,7 +1659,7 @@ public class AMQChannel
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
- flow(true);
+ getConnection().notifyWork();
}
}
}
@@ -2262,7 +2264,7 @@ public class AMQChannel
private boolean blockingTimeoutExceeded()
{
- return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout;
+ return _wireBlockingState && (System.currentTimeMillis() - _blockTime) > _blockingTimeout;
}
@Override
@@ -3598,9 +3600,17 @@ public class AMQChannel
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
+ boolean desiredBlockingState = _blocking.get();
+ if (desiredBlockingState != _wireBlockingState)
+ {
+ _wireBlockingState = desiredBlockingState;
+ flow(!desiredBlockingState);
+ _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
+ }
+
for(ConsumerTarget target : _tag2SubscriptionTargetMap.values())
{
target.processPending();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Tue Mar 3 14:15:30 2015
@@ -44,8 +44,6 @@ import java.util.concurrent.CopyOnWriteA
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
@@ -148,11 +146,8 @@ public class AMQProtocolEngine implement
* The channels that the latest call to {@link #received(ByteBuffer)} applied to.
* Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
* on after handling the frames.
- *
- * Thread-safety: guarded by {@link #_receivedLock}.
*/
- private final Set<AMQChannel> _channelsForCurrentMessage =
- new HashSet<>();
+ private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>();
private AMQDecoder _decoder;
@@ -197,7 +192,6 @@ public class AMQProtocolEngine implement
private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want?
private boolean _blocking;
- private final ReentrantLock _receivedLock;
private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
private final Broker<?> _broker;
private final Transport _transport;
@@ -251,7 +245,6 @@ public class AMQProtocolEngine implement
_port = port;
_transport = transport;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
- _receivedLock = new ReentrantLock();
_decoder = new BrokerDecoder(this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -545,43 +538,8 @@ public class AMQProtocolEngine implement
private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
- private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
- private ByteBuffer asByteBuffer(AMQDataBlock block)
- {
- final int size = (int) block.getSize();
-
- final byte[] data;
-
-
- if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
- {
- data= new byte[size];
- }
- else
- {
-
- data = _reusableBytes;
- }
- _reusableDataOutput.setBuffer(data);
-
- try
- {
- block.writePayload(_reusableDataOutput);
- }
- catch (IOException e)
- {
- throw new ServerScopedRuntimeException(e);
- }
-
- final ByteBuffer copy = ByteBuffer.allocate(_reusableDataOutput.length());
- copy.put(data, 0, _reusableDataOutput.length());
- copy.flip();
- return copy;
- }
-
-
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
@@ -1969,11 +1927,6 @@ public class AMQProtocolEngine implement
return _reference;
}
- public Lock getReceivedLock()
- {
- return _receivedLock;
- }
-
@Override
public long getLastReadTime()
{
@@ -2095,6 +2048,8 @@ public class AMQProtocolEngine implement
@Override
public void processPending()
{
+
+
while(_asyncTaskList.peek() != null)
{
Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll();
@@ -2103,7 +2058,7 @@ public class AMQProtocolEngine implement
for (AMQSessionModel session : getSessionModels())
{
- session.processPendingMessages();
+ session.processPending();
}
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Tue Mar 3 14:15:30 2015
@@ -515,6 +515,7 @@ public abstract class ConsumerTarget_0_8
if (isAutoClose())
{
_needToClose.set(true);
+ getChannel().getConnection().notifyWork();
}
}
@@ -531,8 +532,6 @@ public abstract class ConsumerTarget_0_8
public void flushBatched()
{
_channel.getConnection().setDeferFlush(false);
-
- _channel.getConnection().notifyWork();
}
protected void addUnacknowledgedMessage(MessageInstance entry)
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Tue Mar 3 14:15:30 2015
@@ -552,7 +552,7 @@ public class Connection_1_0 implements C
for (AMQSessionModel session : getSessionModels())
{
- session.processPendingMessages();
+ session.processPending();
}
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1663708&r1=1663707&r2=1663708&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Mar 3 14:15:30 2015
@@ -901,7 +901,7 @@ public class Session_1_0 implements Sess
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
for(Consumer<?> consumer : getConsumers())
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org