You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/15 10:55:55 UTC
svn commit: r1685516 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/adapter/
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-core/src/main/java...
Author: rgodfrey
Date: Mon Jun 15 08:55:55 2015
New Revision: 1685516
URL: http://svn.apache.org/r1685516
Log:
QPID-6589 : [Java Broker] use separate thread pools for each virtual host
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Mon Jun 15 08:55:55 2015
@@ -210,7 +210,6 @@ public final class ConnectionAdapter ext
private ListenableFuture<Void> asyncCloseUnderlying()
{
final SettableFuture<Void> closeFuture = SettableFuture.create();
-
_underlyingConnection.addDeleteTask(new Action()
{
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Mon Jun 15 08:55:55 2015
@@ -30,6 +30,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.util.Deletable;
public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends AMQSessionModel<S,T>> extends StatisticsGatherer, Deletable<T>
@@ -111,4 +112,5 @@ public interface AMQConnectionModel<T ex
ServerProtocolEngine getProtocolEngine();
+ void setScheduler(NetworkConnectionScheduler networkConnectionScheduler);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Mon Jun 15 08:55:55 2015
@@ -397,6 +397,7 @@ public class MultiVersionProtocolEngine
{
return _aggregateTicker;
}
+
}
private class SelfDelegateProtocolEngine implements ServerProtocolEngine
@@ -593,6 +594,7 @@ public class MultiVersionProtocolEngine
}
+
public long getConnectionId()
{
return _id;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java Mon Jun 15 08:55:55 2015
@@ -24,7 +24,9 @@ import javax.security.auth.Subject;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.NetworkConnection;
public interface ServerProtocolEngine extends ProtocolEngine
{
@@ -54,4 +56,5 @@ public interface ServerProtocolEngine ex
void setWorkListener(Action<ServerProtocolEngine> listener);
AggregateTicker getAggregateTicker();
+
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Mon Jun 15 08:55:55 2015
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.transport;
+import java.io.IOException;
+import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -31,7 +33,9 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class NetworkConnectionScheduler
+import org.apache.qpid.transport.TransportException;
+
+public class NetworkConnectionScheduler
{
private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
@@ -39,25 +43,35 @@ class NetworkConnectionScheduler
private final ThreadPoolExecutor _executor;
private final AtomicInteger _running = new AtomicInteger();
private final int _poolSize;
+ private final String _name;
- NetworkConnectionScheduler(final SelectorThread selectorThread, final NonBlockingNetworkTransport transport)
+ public NetworkConnectionScheduler(String name, int threadPoolSize)
{
- _selectorThread = selectorThread;
- _poolSize = transport.getThreadPoolSize();
- _executor = new ThreadPoolExecutor(_poolSize, _poolSize, 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(), new ThreadFactory()
+ try
{
- final AtomicInteger _count = new AtomicInteger();
-
- @Override
- public Thread newThread(final Runnable r)
+ _selectorThread = new SelectorThread(this);
+ _selectorThread.start();
+ _poolSize = threadPoolSize;
+ _name = name;
+ _executor = new ThreadPoolExecutor(_poolSize, _poolSize, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(), new ThreadFactory()
{
- Thread t = Executors.defaultThreadFactory().newThread(r);
- t.setName("IO-pool-"+selectorThread.getName()+"-"+_count.incrementAndGet());
- return t;
- }
- });
- _executor.prestartAllCoreThreads();
+ final AtomicInteger _count = new AtomicInteger();
+
+ @Override
+ public Thread newThread(final Runnable r)
+ {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("IO-pool-" + getName() + "-" + _count.incrementAndGet());
+ return t;
+ }
+ });
+ _executor.prestartAllCoreThreads();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
}
public void schedule(final NonBlockingConnection connection)
@@ -93,7 +107,7 @@ class NetworkConnectionScheduler
rerun = false;
boolean closed = connection.doWork();
- if (!closed)
+ if (!closed && connection.getScheduler() == this)
{
if (connection.isStateChanged() || connection.isPartialRead())
@@ -123,9 +137,39 @@ class NetworkConnectionScheduler
public void close()
{
+ _selectorThread.close();
_executor.shutdown();
}
+ public String getName()
+ {
+ return _name;
+ }
+ public void addAcceptingSocket(final ServerSocketChannel serverSocket,
+ final NonBlockingNetworkTransport nonBlockingNetworkTransport)
+ {
+ _selectorThread.addAcceptingSocket(serverSocket, nonBlockingNetworkTransport);
+ }
+
+ public void cancelAcceptingSocket(final ServerSocketChannel serverSocket)
+ {
+ _selectorThread.cancelAcceptingSocket(serverSocket);
+ }
+
+ public void addConnection(final NonBlockingConnection connection)
+ {
+ _selectorThread.addConnection(connection);
+ }
+
+ public void wakeup()
+ {
+ _selectorThread.wakeup();
+ }
+
+ public void removeConnection(final NonBlockingConnection connection)
+ {
+ _selectorThread.removeConnection(connection);
+ }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Mon Jun 15 08:55:55 2015
@@ -60,7 +60,7 @@ public class NonBlockingConnection imple
private final SocketChannel _socketChannel;
private final Object _peerPrincipalLock = new Object();
- private final SelectorThread _selector;
+ private NetworkConnectionScheduler _scheduler;
private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
@@ -92,7 +92,6 @@ public class NonBlockingConnection imple
public NonBlockingConnection(SocketChannel socketChannel,
ServerProtocolEngine delegate,
int receiveBufferSize,
- Ticker ticker,
final Set<TransportEncryption> encryptionSet,
final SSLContext sslContext,
final boolean wantClientAuth,
@@ -100,10 +99,10 @@ public class NonBlockingConnection imple
final Collection<String> enabledCipherSuites,
final Collection<String> disabledCipherSuites,
final Runnable onTransportEncryptionAction,
- final SelectorThread selectorThread)
+ final NetworkConnectionScheduler scheduler)
{
_socketChannel = socketChannel;
- _selector = selectorThread;
+ _scheduler = scheduler;
_protocolEngine = delegate;
_receiveBufSize = receiveBufferSize;
@@ -114,7 +113,7 @@ public class NonBlockingConnection imple
@Override
public void performAction(final ServerProtocolEngine object)
{
- _selector.wakeup();
+ _scheduler.wakeup();
}
});
@@ -178,7 +177,7 @@ public class NonBlockingConnection imple
if(_closed.compareAndSet(false,true))
{
_protocolEngine.notifyWork();
- _selector.wakeup();
+ _scheduler.wakeup();
}
}
@@ -616,8 +615,6 @@ public class NonBlockingConnection imple
@Override
public void send(final ByteBuffer msg)
{
- assert _selector.isIOThread() : "Send called by unexpected thread " + Thread.currentThread().getName();
-
if (_closed.get())
{
@@ -634,6 +631,14 @@ public class NonBlockingConnection imple
{
}
+ public void changeScheduler(NetworkConnectionScheduler scheduler)
+ {
+ NetworkConnectionScheduler currentScheduler = _scheduler;
+ currentScheduler.removeConnection(this);
+ _scheduler = scheduler;
+ _scheduler.addConnection(this);
+ }
+
@Override
public String toString()
{
@@ -665,4 +670,9 @@ public class NonBlockingConnection imple
headerBytes[4] == 2 || // TLS 1.1
headerBytes[4] == 3);
}
+
+ public NetworkConnectionScheduler getScheduler()
+ {
+ return _scheduler;
+ }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Mon Jun 15 08:55:55 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
+import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.EnumSet;
@@ -58,13 +59,13 @@ public class NonBlockingNetworkTransport
private final SSLContext _sslContext;
private final ServerSocketChannel _serverSocket;
private final int _timeout;
-
- private SelectorThread _selector;
+ private final NetworkConnectionScheduler _scheduler;
public NonBlockingNetworkTransport(final NetworkTransportConfiguration config,
final MultiVersionProtocolEngineFactory factory,
final SSLContext sslContext,
- final EnumSet<TransportEncryption> encryptionSet)
+ final EnumSet<TransportEncryption> encryptionSet,
+ final NetworkConnectionScheduler scheduler)
{
try
{
@@ -82,6 +83,7 @@ public class NonBlockingNetworkTransport
_serverSocket.bind(address);
_serverSocket.configureBlocking(false);
_encryptionSet = encryptionSet;
+ _scheduler = scheduler;
}
catch (IOException e)
@@ -93,37 +95,20 @@ public class NonBlockingNetworkTransport
public void start()
{
- try
- {
- _selector = new SelectorThread(this);
- _selector.start();
- _selector.addAcceptingSocket(_serverSocket);
- }
- catch (IOException e)
- {
- throw new TransportException("Failed to start", e);
- }
+ _scheduler.addAcceptingSocket(_serverSocket, this);
}
public void close()
{
- if(_selector != null)
+ _scheduler.cancelAcceptingSocket(_serverSocket);
+ try
{
- _selector.cancelAcceptingSocket(_serverSocket);
- try
- {
- _serverSocket.close();
- }
- catch (IOException e)
- {
- LOGGER.warn("Error closing the server socket for : " + _config.getAddress().toString(), e);
- }
- finally
- {
- _selector.close();
- _selector = null;
- }
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Error closing the server socket for : " + _config.getAddress().toString(), e);
}
}
@@ -171,7 +156,6 @@ public class NonBlockingNetworkTransport
new NonBlockingConnection(socketChannel,
engine,
receiveBufferSize,
- idleTimeoutTicker,
_encryptionSet,
_sslContext,
_config.wantClientAuth(),
@@ -187,7 +171,7 @@ public class NonBlockingNetworkTransport
engine.encryptedTransport();
}
},
- _selector);
+ _scheduler);
engine.setNetworkConnection(connection, connection.getSender());
connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
@@ -196,7 +180,7 @@ public class NonBlockingNetworkTransport
connection.start();
- _selector.addConnection(connection);
+ _scheduler.addConnection(connection);
success = true;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Mon Jun 15 08:55:55 2015
@@ -39,7 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SelectorThread extends Thread
+class SelectorThread extends Thread
{
private static final Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
@@ -58,19 +58,18 @@ public class SelectorThread extends Thre
private final Selector _selector;
private final AtomicBoolean _closed = new AtomicBoolean();
private final NetworkConnectionScheduler _scheduler;
- private final NonBlockingNetworkTransport _transport;
private long _nextTimeout;
- SelectorThread(final NonBlockingNetworkTransport nonBlockingNetworkTransport) throws IOException
+ SelectorThread(final NetworkConnectionScheduler scheduler) throws IOException
{
- super("SelectorThread-" + nonBlockingNetworkTransport.getConfig().getAddress().toString());
+ super("SelectorThread-" + scheduler.getName());
- _transport = nonBlockingNetworkTransport;
_selector = Selector.open();
- _scheduler = new NetworkConnectionScheduler(this, _transport);
+ _scheduler = scheduler;
}
- public void addAcceptingSocket(final ServerSocketChannel socketChannel)
+ public void addAcceptingSocket(final ServerSocketChannel socketChannel,
+ final NonBlockingNetworkTransport nonBlockingNetworkTransport)
{
_tasks.add(new Runnable()
{
@@ -80,7 +79,7 @@ public class SelectorThread extends Thre
try
{
- socketChannel.register(_selector, SelectionKey.OP_ACCEPT);
+ socketChannel.register(_selector, SelectionKey.OP_ACCEPT, nonBlockingNetworkTransport);
}
catch (IllegalStateException | ClosedChannelException e)
{
@@ -127,7 +126,7 @@ public class SelectorThread extends Thre
catch (IOException e)
{
// TODO Inform the model object
- LOGGER.error("Failed to select for " + _transport.getConfig().getAddress().toString(),e );
+ LOGGER.error("Failed to trying to select()",e );
break;
}
@@ -177,8 +176,7 @@ public class SelectorThread extends Thre
toBeScheduled.add(connection);
try
{
- SelectionKey register = connection.getSocketChannel().register(_selector, 0);
- register.cancel();
+ unregisterConnection(connection);
}
catch (ClosedChannelException e)
{
@@ -196,6 +194,12 @@ public class SelectorThread extends Thre
return toBeScheduled;
}
+ private void unregisterConnection(final NonBlockingConnection connection) throws ClosedChannelException
+ {
+ SelectionKey register = connection.getSocketChannel().register(_selector, 0);
+ register.cancel();
+ }
+
private List<NonBlockingConnection> reregisterUnregisteredConnections()
{
List<NonBlockingConnection> unregisterableConnections = new ArrayList<>();
@@ -230,8 +234,9 @@ public class SelectorThread extends Thre
{
if(key.isAcceptable())
{
+ NonBlockingNetworkTransport transport = (NonBlockingNetworkTransport) key.attachment();
// todo - should we schedule this rather than running in this thread?
- _transport.acceptSocketChannel((ServerSocketChannel)key.channel());
+ transport.acceptSocketChannel((ServerSocketChannel)key.channel());
}
else
{
@@ -272,6 +277,20 @@ public class SelectorThread extends Thre
}
+ void removeConnection(NonBlockingConnection connection)
+ {
+ try
+ {
+ unregisterConnection(connection);
+ }
+ catch (ClosedChannelException e)
+ {
+ LOGGER.debug("Failed to unregister with selector for connection " + connection +
+ ". Connection is probably being closed by peer.", e);
+
+ }
+ }
+
public void wakeup()
{
_selector.wakeup();
@@ -281,11 +300,6 @@ public class SelectorThread extends Thre
{
_closed.set(true);
_selector.wakeup();
- _scheduler.close();
}
- boolean isIOThread()
- {
- return Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX);
- }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Mon Jun 15 08:55:55 2015
@@ -46,6 +46,7 @@ class TCPandSSLTransport implements Acce
private AmqpPort<?> _port;
private Set<Protocol> _supported;
private Protocol _defaultSupportedProtocolReply;
+ private NetworkConnectionScheduler _scheduler;
TCPandSSLTransport(final Set<Transport> transports,
final SSLContext sslContext,
@@ -99,7 +100,9 @@ class TCPandSSLTransport implements Acce
encryptionSet.add(TransportEncryption.TLS);
}
- _networkTransport = new NonBlockingNetworkTransport(settings, protocolEngineFactory, _sslContext, encryptionSet);
+ _scheduler = new NetworkConnectionScheduler("Port-"+_port.getName(),settings.getThreadPoolSize());
+ _networkTransport = new NonBlockingNetworkTransport(settings, protocolEngineFactory,
+ _sslContext, encryptionSet, _scheduler);
_networkTransport.start();
}
@@ -115,6 +118,10 @@ class TCPandSSLTransport implements Acce
{
_networkTransport.close();
}
+ if(_scheduler != null)
+ {
+ _scheduler.close();
+ }
}
class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Jun 15 08:55:55 2015
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -91,6 +92,7 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -149,6 +151,8 @@ public abstract class AbstractVirtualHos
private final Set<BlockingType> _blockingReasons = Collections.synchronizedSet(EnumSet.noneOf(BlockingType.class));
+ private NetworkConnectionScheduler _networkConnectionScheduler;
+
@ManagedAttributeField
private boolean _queue_deadLetterQueueEnabled;
@@ -860,7 +864,11 @@ public abstract class AbstractVirtualHos
_dtxRegistry.close();
closeMessageStore();
shutdownHouseKeeping();
-
+ if(_networkConnectionScheduler != null)
+ {
+ _networkConnectionScheduler.close();
+ _networkConnectionScheduler = null;
+ }
_eventLogger.message(VirtualHostMessages.CLOSED(getName()));
}
@@ -995,6 +1003,7 @@ public abstract class AbstractVirtualHos
ConnectionAdapter c = new ConnectionAdapter(connection);
c.create();
childAdded(c);
+ connection.setScheduler(_networkConnectionScheduler);
}
@@ -1375,6 +1384,11 @@ public abstract class AbstractVirtualHos
try
{
shutdownHouseKeeping();
+ if(_networkConnectionScheduler != null)
+ {
+ _networkConnectionScheduler.close();
+ _networkConnectionScheduler = null;
+ }
closeMessageStore();
setState(State.STOPPED);
@@ -1629,7 +1643,7 @@ public abstract class AbstractVirtualHos
private ListenableFuture<Void> onActivate()
{
_houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), new SuppressingInheritedAccessControlContextThreadFactory("virtualhost-" + getName() + "-pool"));
-
+ _networkConnectionScheduler = new NetworkConnectionScheduler("virtualhost-" + getName() + "-iopool", Runtime.getRuntime().availableProcessors());
MessageStore messageStore = getMessageStore();
messageStore.openMessageStore(this);
@@ -1847,5 +1861,4 @@ public abstract class AbstractVirtualHos
}
}
-
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Mon Jun 15 08:55:55 2015
@@ -40,6 +40,8 @@ import org.apache.qpid.server.security.S
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.txn.DtxRegistry;
public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AMQQueue<?>, E extends ExchangeImpl<?> >
@@ -112,4 +114,5 @@ public interface VirtualHostImpl< X exte
boolean authoriseCreateConnection(AMQConnectionModel<?, ?> connection);
String getLocalAddress(String routingAddress);
+
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Mon Jun 15 08:55:55 2015
@@ -532,6 +532,7 @@ class RedirectingVirtualHostImpl
return localAddress;
}
+
private void throwUnsupportedForRedirector()
{
throw new IllegalStateException("The virtual host state of " + getState()
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Mon Jun 15 08:55:55 2015
@@ -57,6 +57,7 @@ import org.apache.qpid.server.protocol.S
import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.transport.network.Ticker;
@@ -671,6 +672,12 @@ public class MockConsumer implements Con
}
@Override
+ public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ {
+
+ }
+
+ @Override
public String getClientVersion()
{
return null;
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java Mon Jun 15 08:55:55 2015
@@ -21,7 +21,6 @@ package org.apache.qpid.server.transport
import org.apache.qpid.server.protocol.MultiVersionProtocolEngine;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.AggregateTicker;
@@ -111,7 +110,10 @@ public class NetworkConnectionSchedulerT
when(verboseEngine.getAggregateTicker()).thenReturn(new AggregateTicker());
when(timidEngine.getAggregateTicker()).thenReturn(new AggregateTicker());
- NonBlockingNetworkTransport transport = new NonBlockingNetworkTransport(config, engineFactory, null, EnumSet.of(TransportEncryption.NONE));
+ final NetworkConnectionScheduler scheduler = new NetworkConnectionScheduler(getName(), 1);
+
+ NonBlockingNetworkTransport transport = new NonBlockingNetworkTransport(config, engineFactory, null, EnumSet.of(TransportEncryption.NONE),
+ scheduler);
transport.start();
final int port = transport.getAcceptingPort();
@@ -164,6 +166,7 @@ public class NetworkConnectionSchedulerT
verify(timidEngine, atLeast(6)).received(any(ByteBuffer.class));
_keepRunningThreads = false;
transport.close();
+ scheduler.close();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Mon Jun 15 08:55:55 2015
@@ -38,11 +38,12 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -89,6 +90,7 @@ public class ProtocolEngine_0_10 extend
return _aggregateTicker;
}
+
@Override
public boolean isMessageAssignmentSuspended()
{
@@ -343,4 +345,9 @@ public class ProtocolEngine_0_10 extend
{
_workListener.set(listener);
}
+
+ public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ {
+ ((NonBlockingConnection)_network).changeScheduler(networkConnectionScheduler);
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Jun 15 08:55:55 2015
@@ -40,6 +40,9 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.ServerProtocolEngine;
@@ -56,6 +59,7 @@ import org.apache.qpid.server.protocol.S
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -73,7 +77,7 @@ import org.apache.qpid.transport.Session
public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>,
LogSubject, AuthorizationHolder
{
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class);
public static final long CLOSE_OK_TIMEOUT = 10000l;
private final Broker<?> _broker;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
@@ -102,7 +106,7 @@ public class ServerConnection extends Co
private int _messageCompressionThreshold;
private final int _maxMessageSize;
- private ServerProtocolEngine _serverProtocolEngine;
+ private ProtocolEngine_0_10 _serverProtocolEngine;
private boolean _ignoreFutureInput;
private boolean _ignoreAllButConnectionCloseOk;
@@ -214,7 +218,13 @@ public class ServerConnection extends Co
return _serverProtocolEngine;
}
- public void setProtocolEngine(final ServerProtocolEngine serverProtocolEngine)
+ @Override
+ public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ {
+ _serverProtocolEngine.setScheduler(networkConnectionScheduler);
+ }
+
+ public void setProtocolEngine(final ProtocolEngine_0_10 serverProtocolEngine)
{
_serverProtocolEngine = serverProtocolEngine;
}
@@ -396,7 +406,6 @@ public class ServerConnection extends Co
public void closeAsync(final AMQConstant cause, final String message)
{
-
addAsyncTask(new Action<ServerConnection>()
{
@Override
@@ -406,8 +415,6 @@ public class ServerConnection extends Co
{
markAllSessionsClosed();
- performDeleteTasks();
-
setState(CLOSING);
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
try
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Mon Jun 15 08:55:55 2015
@@ -28,6 +28,8 @@ import java.util.List;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -76,12 +78,15 @@ public class ServerSessionTest extends Q
when(amqpPort.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP);
+ final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class);
+ connection.setProtocolEngine(protocolEngine);
connection.setVirtualHost(_virtualHost);
ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
// create a session with the same name but on a different connection
ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP);
+ connection2.setProtocolEngine(protocolEngine);
connection2.setVirtualHost(_virtualHost);
ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
@@ -98,6 +103,8 @@ public class ServerSessionTest extends Q
AmqpPort port = mock(AmqpPort.class);
when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(1024);
ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP);
+ final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class);
+ connection.setProtocolEngine(protocolEngine);
connection.setVirtualHost(_virtualHost);
final List<Method> invokedMethods = new ArrayList<>();
ServerSession session = new ServerSession(connection, new ServerSessionDelegate(),
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Jun 15 08:55:55 2015
@@ -85,6 +85,8 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -279,6 +281,12 @@ public class AMQProtocolEngine implement
}
@Override
+ public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ {
+ ((NonBlockingConnection)_network).changeScheduler(networkConnectionScheduler);
+ }
+
+ @Override
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
_messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
@@ -847,22 +855,13 @@ public class AMQProtocolEngine implement
}
finally
{
- try
+ synchronized (this)
{
- for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList)
- {
- task.performAction(this);
- }
- }
- finally
- {
- synchronized (this)
- {
- _closed = true;
- notifyAll();
- }
- getEventLogger().message(_logSubject, connectionDropped ? ConnectionMessages.DROPPED_CONNECTION() : ConnectionMessages.CLOSE());
+ _closed = true;
+ notifyAll();
}
+ getEventLogger().message(_logSubject, connectionDropped ? ConnectionMessages.DROPPED_CONNECTION() : ConnectionMessages.CLOSE());
+
}
}
}
@@ -1145,7 +1144,14 @@ public class AMQProtocolEngine implement
}
finally
{
- closeNetworkConnection();
+ try
+ {
+ closeNetworkConnection();
+ }
+ finally
+ {
+ performDeleteTasks();
+ }
}
}
catch (ConnectionScopedRuntimeException | TransportException e)
@@ -1154,6 +1160,30 @@ public class AMQProtocolEngine implement
}
}
+ private void performDeleteTasks()
+ {
+ if(runningAsSubject())
+ {
+ for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList)
+ {
+ task.performAction(this);
+ }
+ }
+ else
+ {
+ runAsSubject(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ performDeleteTasks();
+ return null;
+ }
+ });
+ }
+
+ }
+
@Override
public void encryptedTransport()
{
@@ -1542,7 +1572,6 @@ public class AMQProtocolEngine implement
writeFrame(responseBody.generateFrame(0));
_state = ConnectionState.OPEN;
-
}
else
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Mon Jun 15 08:55:55 2015
@@ -59,6 +59,8 @@ import org.apache.qpid.server.protocol.S
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -258,6 +260,16 @@ public class Connection_1_0 implements C
session.remoteEnd(new End());
}
+ if(_vhost != null)
+ {
+ _vhost.getConnectionRegistry().deregisterConnection(this);
+ }
+
+
+ }
+
+ void performCloseTasks()
+ {
List<Action<? super Connection_1_0>> taskCopy;
synchronized (_closeTasks)
@@ -272,12 +284,6 @@ public class Connection_1_0 implements C
{
_closeTasks.clear();
}
- if(_vhost != null)
- {
- _vhost.getConnectionRegistry().deregisterConnection(this);
- }
-
-
}
public void closed()
@@ -423,6 +429,12 @@ public class Connection_1_0 implements C
}
@Override
+ public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ {
+ _protocolEngine.changeScheduler(networkConnectionScheduler);
+ }
+
+ @Override
public Transport getTransport()
{
return _transport;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0.java Mon Jun 15 08:55:55 2015
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.io.PrintWriter;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.security.Principal;
@@ -40,7 +39,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
-import org.apache.qpid.amqp_1_0.codec.ProtocolHeaderHandler;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
import org.apache.qpid.amqp_1_0.framing.FrameHandler;
import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
@@ -67,6 +65,8 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.AggregateTicker;
@@ -516,6 +516,12 @@ public class ProtocolEngine_1_0_0 implem
}
}
+ void changeScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ {
+ ((NonBlockingConnection)_network).changeScheduler(networkConnectionScheduler);
+ }
+
+
public long getCreateTime()
{
return _createTime;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1685516&r1=1685515&r2=1685516&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java Mon Jun 15 08:55:55 2015
@@ -71,6 +71,8 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.transport.NetworkConnectionScheduler;
+import org.apache.qpid.server.transport.NonBlockingConnection;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.ByteBufferSender;
@@ -94,7 +96,7 @@ public class ProtocolEngine_1_0_0Test ex
public void setUp() throws Exception
{
super.setUp();
- _networkConnection = mock(NetworkConnection.class);
+ _networkConnection = mock(NonBlockingConnection.class);
_broker = mock(Broker.class);
when(_broker.getModel()).thenReturn(BrokerModel.getInstance());
final TaskExecutor taskExecutor = new TaskExecutorImpl();
@@ -107,6 +109,7 @@ public class ProtocolEngine_1_0_0Test ex
_authenticationProvider = mock(AuthenticationProvider.class);
when(_port.getAuthenticationProvider()).thenReturn(_authenticationProvider);
VirtualHostImpl virtualHost = mock(VirtualHostImpl.class);
+
_connectionRegistry = mock(IConnectionRegistry.class);
final ArgumentCaptor<AMQConnectionModel> connectionCaptor = ArgumentCaptor.forClass(AMQConnectionModel.class);
doAnswer(new Answer()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org