You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/10/30 19:16:07 UTC
svn commit: r1711533 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport:
NonBlockingNetworkTransport.java SelectorThread.java
Author: kwall
Date: Fri Oct 30 18:16:06 2015
New Revision: 1711533
URL: http://svn.apache.org/viewvc?rev=1711533&view=rev
Log:
QPID-6819: [Java Broker] Schedule socket accepts on the thread pool
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1711533&r1=1711532&r2=1711533&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Fri Oct 30 18:16:06 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.transport
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@@ -131,58 +132,61 @@ public class NonBlockingNetworkTransport
boolean success = false;
try
{
- socketChannel = serverSocketChannel.accept();
- final MultiVersionProtocolEngine engine =
- _factory.newProtocolEngine(socketChannel.socket().getRemoteSocketAddress());
-
- if(engine != null)
+ while ((socketChannel = serverSocketChannel.accept()) != null)
{
- socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _port.isTcpNoDelay());
- socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+ SocketAddress remoteSocketAddress = socketChannel.socket().getRemoteSocketAddress();
+ final MultiVersionProtocolEngine engine =
+ _factory.newProtocolEngine(remoteSocketAddress);
- final int bufferSize = _port.getNetworkBufferSize();
+ if (engine != null)
+ {
+ socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _port.isTcpNoDelay());
+ socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
- socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, bufferSize);
- socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, bufferSize);
+ final int bufferSize = _port.getNetworkBufferSize();
- socketChannel.configureBlocking(false);
+ socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, bufferSize);
+ socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, bufferSize);
- AggregateTicker aggregateTicker = engine.getAggregateTicker();
+ socketChannel.configureBlocking(false);
- final IdleTimeoutTicker idleTimeoutTicker = new IdleTimeoutTicker(engine, _timeout);
- aggregateTicker.addTicker(idleTimeoutTicker);
+ AggregateTicker aggregateTicker = engine.getAggregateTicker();
- NonBlockingConnection connection =
- new NonBlockingConnection(socketChannel,
- engine,
- _encryptionSet,
- new Runnable()
- {
+ final IdleTimeoutTicker idleTimeoutTicker = new IdleTimeoutTicker(engine, _timeout);
+ aggregateTicker.addTicker(idleTimeoutTicker);
- @Override
- public void run()
+ NonBlockingConnection connection =
+ new NonBlockingConnection(socketChannel,
+ engine,
+ _encryptionSet,
+ new Runnable()
{
- engine.encryptedTransport();
- }
- },
- _scheduler,
- _port);
-
- engine.setNetworkConnection(connection);
- connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
-
- idleTimeoutTicker.setConnection(connection);
-
- connection.start();
- _scheduler.addConnection(connection);
-
- success = true;
- }
- else
- {
- LOGGER.error("No Engine available.");
+ @Override
+ public void run()
+ {
+ engine.encryptedTransport();
+ }
+ },
+ _scheduler,
+ _port);
+
+ engine.setNetworkConnection(connection);
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+
+ idleTimeoutTicker.setConnection(connection);
+
+ connection.start();
+
+ _scheduler.addConnection(connection);
+
+ success = true;
+ }
+ else
+ {
+ LOGGER.error("No Engine available.");
+ }
}
}
catch (IOException e)
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1711533&r1=1711532&r2=1711533&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Fri Oct 30 18:16:06 2015
@@ -20,6 +20,7 @@
package org.apache.qpid.server.transport;
import java.io.IOException;
+import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
@@ -164,9 +165,44 @@ class SelectorThread extends Thread
{
if(key.isAcceptable())
{
- NonBlockingNetworkTransport transport = (NonBlockingNetworkTransport) key.attachment();
- // todo - should we schedule this rather than running in this thread?
- transport.acceptSocketChannel((ServerSocketChannel)key.channel());
+ final NonBlockingNetworkTransport transport = (NonBlockingNetworkTransport) key.attachment();
+ final ServerSocketChannel channel = (ServerSocketChannel) key.channel();
+ final SocketAddress localSocketAddress = channel.socket().getLocalSocketAddress();
+
+ try
+ {
+ channel.register(_selector, 0);
+ }
+ catch (ClosedChannelException e)
+ {
+ LOGGER.error("Failed to register selector on accepting port {} ",
+ localSocketAddress, e);
+ }
+
+ _workQueue.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ transport.acceptSocketChannel(channel);
+ }
+ finally
+ {
+ try
+ {
+ channel.register(_selector, SelectionKey.OP_ACCEPT, transport);
+ wakeup();
+ }
+ catch (ClosedChannelException e)
+ {
+ LOGGER.error("Failed to register selector on accepting port {}",
+ localSocketAddress, e);
+ }
+ }
+ }
+ });
}
else
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org