You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/16 11:28:47 UTC
svn commit: r1685745 - in /qpid/java/trunk/broker-core/src:
main/java/org/apache/qpid/server/model/port/
main/java/org/apache/qpid/server/transport/
test/java/org/apache/qpid/server/transport/
Author: rgodfrey
Date: Tue Jun 16 09:28:46 2015
New Revision: 1685745
URL: http://svn.apache.org/r1685745
Log:
QPID-6249 : Refactory NonBlockingConnection (work by Lorenz Quack and Rob Godfrey)
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java (with props)
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java (with props)
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java (with props)
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java (with props)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java Tue Jun 16 09:28:46 2015
@@ -35,6 +35,8 @@ import org.apache.qpid.server.model.Trus
import org.apache.qpid.server.model.VirtualHostAlias;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import javax.net.ssl.SSLContext;
+
@ManagedObject( category = false, type = "AMQP")
public interface AmqpPort<X extends AmqpPort<X>> extends ClientAuthCapablePort<X>
{
@@ -60,6 +62,10 @@ public interface AmqpPort<X extends Amqp
@ManagedContextDefault(name = PORT_MAX_OPEN_CONNECTIONS)
int DEFAULT_MAX_OPEN_CONNECTIONS = -1;
+ String THREAD_POOL_SIZE = "qpid.port.thread_pool_size";
+
+ @ManagedContextDefault(name = THREAD_POOL_SIZE)
+ int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();
String PORT_MAX_MESSAGE_SIZE = "qpid.port.max_message_size";
@@ -72,6 +78,8 @@ public interface AmqpPort<X extends Amqp
int DEFAULT_OPEN_CONNECTIONS_WARN_PERCENT = 80;
+ SSLContext getSSLContext();
+
@ManagedAttribute(defaultValue = "*")
String getBindingAddress();
@@ -84,6 +92,8 @@ public interface AmqpPort<X extends Amqp
@ManagedAttribute( defaultValue = AmqpPort.DEFAULT_AMQP_RECEIVE_BUFFER_SIZE )
int getReceiveBufferSize();
+ @ManagedAttribute( defaultValue = "${" + THREAD_POOL_SIZE + "}")
+ int getThreadPoolSize();
@ManagedAttribute( defaultValue = DEFAULT_AMQP_NEED_CLIENT_AUTH )
boolean getNeedClientAuth();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Tue Jun 16 09:28:46 2015
@@ -119,6 +119,9 @@ public class AmqpPortImpl extends Abstra
@ManagedAttributeField
private int _maxOpenConnections;
+ @ManagedAttributeField
+ private int _threadPoolSize;
+
private final AtomicInteger _connectionCount = new AtomicInteger();
private final AtomicBoolean _connectionCountWarningGiven = new AtomicBoolean();
@@ -126,6 +129,7 @@ public class AmqpPortImpl extends Abstra
private AcceptingTransport _transport;
private final AtomicBoolean _closing = new AtomicBoolean();
private final SettableFuture _noConnectionsRemain = SettableFuture.create();
+ private SSLContext _sslContext;
@ManagedObjectFactoryConstructor
public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -134,6 +138,11 @@ public class AmqpPortImpl extends Abstra
_broker = broker;
}
+ @Override
+ public SSLContext getSSLContext()
+ {
+ return _sslContext;
+ }
@Override
public String getBindingAddress()
@@ -166,6 +175,12 @@ public class AmqpPortImpl extends Abstra
}
@Override
+ public int getThreadPoolSize()
+ {
+ return _threadPoolSize;
+ }
+
+ @Override
protected void onCreate()
{
super.onCreate();
@@ -238,15 +253,14 @@ public class AmqpPortImpl extends Abstra
);
}
- SSLContext sslContext = null;
if (transports.contains(Transport.SSL) || transports.contains(Transport.WSS))
{
- sslContext = createSslContext();
+ _sslContext = createSslContext();
}
Protocol defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
_transport = transportProvider.createTransport(transportSet,
- sslContext,
+ _sslContext,
this,
getProtocols(),
defaultSupportedProtocolReply);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Tue Jun 16 09:28:46 2015
@@ -26,20 +26,11 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.security.cert.Certificate;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLPeerUnverifiedException;
-
+import org.apache.qpid.server.model.port.AmqpPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,101 +41,71 @@ import org.apache.qpid.transport.ByteBuf
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
import org.apache.qpid.util.SystemUtils;
public class NonBlockingConnection implements NetworkConnection, ByteBufferSender
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
- private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
private final SocketChannel _socketChannel;
- private final Object _peerPrincipalLock = new Object();
+ private NonBlockingConnectionDelegate _delegate;
private NetworkConnectionScheduler _scheduler;
private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
- private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
private final String _remoteSocketAddress;
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final ServerProtocolEngine _protocolEngine;
- private final int _receiveBufSize;
private final Runnable _onTransportEncryptionAction;
-
+ private final int _receiveBufferSize;
private volatile int _maxReadIdle;
private volatile int _maxWriteIdle;
- private Principal _principal;
- private boolean _principalChecked;
private ByteBuffer _netInputBuffer;
- private SSLEngine _sslEngine;
-
- private ByteBuffer _currentBuffer;
- private TransportEncryption _transportEncryption;
- private SSLEngineResult _status;
private volatile boolean _fullyWritten = true;
- private boolean _workDone;
- private Certificate _peerCertificate;
- private boolean _partialRead;
+ private boolean _partialRead = false;
+ private final AmqpPort _port;
public NonBlockingConnection(SocketChannel socketChannel,
- ServerProtocolEngine delegate,
+ ServerProtocolEngine protocolEngine,
int receiveBufferSize,
final Set<TransportEncryption> encryptionSet,
- final SSLContext sslContext,
- final boolean wantClientAuth,
- final boolean needClientAuth,
- final Collection<String> enabledCipherSuites,
- final Collection<String> disabledCipherSuites,
final Runnable onTransportEncryptionAction,
- final NetworkConnectionScheduler scheduler)
+ final NetworkConnectionScheduler scheduler,
+ final AmqpPort port)
{
_socketChannel = socketChannel;
_scheduler = scheduler;
- _protocolEngine = delegate;
- _receiveBufSize = receiveBufferSize;
+ _protocolEngine = protocolEngine;
_onTransportEncryptionAction = onTransportEncryptionAction;
- delegate.setWorkListener(new Action<ServerProtocolEngine>()
- {
- @Override
- public void performAction(final ServerProtocolEngine object)
- {
- _scheduler.wakeup();
- }
- });
+ _receiveBufferSize = receiveBufferSize;
- if(encryptionSet.size() == 1)
+ _netInputBuffer = ByteBuffer.allocate(receiveBufferSize);
+ _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
+ _port = port;
+
+ protocolEngine.setWorkListener(new Action<ServerProtocolEngine>()
{
- _transportEncryption = encryptionSet.iterator().next();
- if (_transportEncryption == TransportEncryption.TLS)
+ @Override
+ public void performAction(final ServerProtocolEngine object)
{
- onTransportEncryptionAction.run();
+ _scheduler.wakeup();
}
- }
+ });
- if(encryptionSet.contains(TransportEncryption.TLS))
+ if(encryptionSet.size() == 1)
{
- _sslEngine = sslContext.createSSLEngine();
- _sslEngine.setUseClientMode(false);
- SSLUtil.removeSSLv3Support(_sslEngine);
- SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites);
-
- if(needClientAuth)
- {
- _sslEngine.setNeedClientAuth(true);
- }
- else if(wantClientAuth)
- {
- _sslEngine.setWantClientAuth(true);
- }
- _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2));
+ setTransportEncryption(encryptionSet.iterator().next());
+ }
+ else
+ {
+ _delegate = new NonBlockingConnectionUndecidedDelegate(this);
}
- _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
}
public boolean isPartialRead()
@@ -162,15 +123,18 @@ public class NonBlockingConnection imple
return _socketChannel;
}
+ @Override
public void start()
{
}
+ @Override
public ByteBufferSender getSender()
{
return this;
}
+ @Override
public void close()
{
LOGGER.debug("Closing " + _remoteSocketAddress);
@@ -181,21 +145,25 @@ public class NonBlockingConnection imple
}
}
+ @Override
public SocketAddress getRemoteAddress()
{
return _socketChannel.socket().getRemoteSocketAddress();
}
+ @Override
public SocketAddress getLocalAddress()
{
return _socketChannel.socket().getLocalSocketAddress();
}
+ @Override
public void setMaxWriteIdle(int sec)
{
_maxWriteIdle = sec;
}
+ @Override
public void setMaxReadIdle(int sec)
{
_maxReadIdle = sec;
@@ -204,45 +172,13 @@ public class NonBlockingConnection imple
@Override
public Principal getPeerPrincipal()
{
- checkPeerPrincipal();
- return _principal;
+ return _delegate.getPeerPrincipal();
}
@Override
public Certificate getPeerCertificate()
{
- checkPeerPrincipal();
- return _peerCertificate;
- }
-
- private void checkPeerPrincipal()
- {
- synchronized (_peerPrincipalLock)
- {
- if (!_principalChecked)
- {
- if (_sslEngine != null)
- {
- try
- {
- _principal = _sslEngine.getSession().getPeerPrincipal();
- final Certificate[] peerCertificates =
- _sslEngine.getSession().getPeerCertificates();
- if (peerCertificates != null && peerCertificates.length > 0)
- {
- _peerCertificate = peerCertificates[0];
- }
- }
- catch (SSLPeerUnverifiedException e)
- {
- _principal = null;
- _peerCertificate = null;
- }
- }
-
- _principalChecked = true;
- }
- }
+ return _delegate.getPeerCertificate();
}
@Override
@@ -269,7 +205,6 @@ public class NonBlockingConnection imple
public boolean isStateChanged()
{
-
return _protocolEngine.hasWork();
}
@@ -281,8 +216,6 @@ public class NonBlockingConnection imple
{
try
{
- _workDone = false;
-
long currentTime = System.currentTimeMillis();
int tick = getTicker().getTimeToNextTick(currentTime);
if (tick <= 0)
@@ -299,7 +232,7 @@ public class NonBlockingConnection imple
_fullyWritten = doWrite();
_protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
- if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0))
+ if (dataRead || (_delegate.needsWork() && _netInputBuffer.position() != 0))
{
_protocolEngine.notifyWork();
}
@@ -319,187 +252,121 @@ public class NonBlockingConnection imple
}
else
{
+ shutdown();
+ }
- if(!SystemUtils.isWindows())
- {
- try
- {
- _socketChannel.shutdownInput();
- }
- catch (IOException e)
- {
- LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e);
+ return closed;
- }
- }
- try
+ }
+
+ private void shutdown()
+ {
+ shutdownInput();
+
+ shutdownFinalWrite();
+ LOGGER.debug("Closing receiver");
+ _protocolEngine.closed();
+
+ shutdownOutput();
+ }
+
+ private void shutdownFinalWrite()
+ {
+ try
+ {
+ while(!doWrite())
{
- while(!doWrite())
- {
- }
}
- catch (IOException e)
- {
- LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
+ }
+ }
+ private void shutdownOutput()
+ {
+ try
+ {
+ if(!SystemUtils.isWindows())
+ {
+ _socketChannel.shutdownOutput();
}
- LOGGER.debug("Closing receiver");
- _protocolEngine.closed();
+ _socketChannel.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e);
+ }
+ }
+
+ private void shutdownInput()
+ {
+ if(!SystemUtils.isWindows())
+ {
try
{
- if(!SystemUtils.isWindows())
- {
- _socketChannel.shutdownOutput();
- }
-
- _socketChannel.close();
+ _socketChannel.shutdownInput();
}
catch (IOException e)
{
- LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e);
+ LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e);
}
}
-
- return closed;
-
}
- private boolean doRead() throws IOException
+ /**
+ * doRead is not reentrant.
+ */
+ boolean doRead() throws IOException
{
boolean readData = false;
_partialRead = false;
- if(_transportEncryption == TransportEncryption.NONE)
+ if (!_closed.get())
{
- if (!_closed.get())
- {
- if (_currentBuffer == null || _currentBuffer.remaining() == 0)
- {
- _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
- }
- int read = _socketChannel.read(_currentBuffer);
- if(read > 0)
- {
- readData = true;
- }
- if (read == -1)
- {
- _closed.set(true);
- }
-
- _partialRead = !_currentBuffer.hasRemaining();
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Read " + read + " byte(s)");
- }
- ByteBuffer dup = _currentBuffer.duplicate();
- dup.flip();
- _currentBuffer = _currentBuffer.slice();
- _protocolEngine.received(dup);
- }
+ readData = _delegate.doRead();
}
- else if(_transportEncryption == TransportEncryption.TLS)
- {
- if (!_closed.get() && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
- {
- int read = _socketChannel.read(_netInputBuffer);
- if (read == -1)
- {
- _closed.set(true);
- }
- else if(read > 0)
- {
- readData = true;
- }
-
- _partialRead = !_netInputBuffer.hasRemaining();
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Read " + read + " encrypted bytes ");
- }
-
- _netInputBuffer.flip();
-
+ return readData;
+ }
- int unwrapped = 0;
- boolean tasksRun;
- do
- {
- ByteBuffer appInputBuffer =
- ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+ boolean readAndProcessData() throws IOException
+ {
+ boolean readData = readIntoBuffer(_netInputBuffer);
- _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
- if (_status.getStatus() == SSLEngineResult.Status.CLOSED)
- {
- // KW If SSLEngine changes state to CLOSED, what will ever set _closed to true?
- LOGGER.debug("SSLEngine closed");
- }
-
- tasksRun = runSSLEngineTasks(_status);
-
- appInputBuffer.flip();
- unwrapped = appInputBuffer.remaining();
- if(unwrapped > 0)
- {
- readData = true;
- }
- _protocolEngine.received(appInputBuffer);
- }
- while(unwrapped > 0 || tasksRun);
+ ByteBuffer duplicate = _netInputBuffer.duplicate();
+ duplicate.flip();
- _netInputBuffer.compact();
+ readData |= processData(duplicate);
- }
+ if (_netInputBuffer.hasRemaining())
+ {
+ // slice but keep unprocessed data
+ int amountOfUnprocessedData = duplicate.remaining();
+ _netInputBuffer.position(_netInputBuffer.position() - amountOfUnprocessedData);
+ _netInputBuffer = _netInputBuffer.slice();
+ _netInputBuffer.position(amountOfUnprocessedData);
}
else
{
- int read = 1;
- while (!_closed.get() && read > 0)
- {
-
- read = _socketChannel.read(_netInputBuffer);
- if (read == -1)
- {
- _closed.set(true);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
- }
-
- if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
- {
- _netInputBuffer.flip();
- final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
- ByteBuffer dup = _netInputBuffer.duplicate();
- dup.get(headerBytes);
-
- _transportEncryption = looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE;
- LOGGER.debug("Identified transport encryption as " + _transportEncryption);
-
- if (_transportEncryption == TransportEncryption.NONE)
- {
- _protocolEngine.received(_netInputBuffer);
- }
- else
- {
- _onTransportEncryptionAction.run();
- _netInputBuffer.compact();
- readData = doRead();
- }
- break;
- }
- }
+ // compact into new buffer
+ _netInputBuffer = ByteBuffer.allocate(_receiveBufferSize);
+ _netInputBuffer.put(duplicate);
}
return readData;
}
- private boolean doWrite() throws IOException
+ void writeToTransport(ByteBuffer[] buffers) throws IOException
{
+ long written = _socketChannel.write(buffers);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + written + " bytes");
+ }
+ }
+ private boolean doWrite() throws IOException
+ {
ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
for (int i = 0; i < bufArray.length; i++)
@@ -507,90 +374,9 @@ public class NonBlockingConnection imple
bufArray[i] = bufferIterator.next();
}
- int byteBuffersWritten = 0;
-
- if(_transportEncryption == TransportEncryption.NONE)
- {
-
-
- long written = _socketChannel.write(bufArray);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Written " + written + " bytes");
- }
-
- for (ByteBuffer buf : bufArray)
- {
- if (buf.remaining() == 0)
- {
- byteBuffersWritten++;
- _buffers.poll();
- }
- else
- {
- break;
- }
- }
-
-
- return bufArray.length == byteBuffersWritten;
- }
- else if(_transportEncryption == TransportEncryption.TLS)
+ if (_delegate != null)
{
- int remaining = 0;
- do
- {
- if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
- {
- _workDone = true;
- final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
- _status = _sslEngine.wrap(bufArray, netBuffer);
- runSSLEngineTasks(_status);
-
- netBuffer.flip();
- remaining = netBuffer.remaining();
- if (remaining != 0)
- {
- _encryptedOutput.add(netBuffer);
- }
- for (ByteBuffer buf : bufArray)
- {
- if (buf.remaining() == 0)
- {
- byteBuffersWritten++;
- _buffers.poll();
- }
- else
- {
- break;
- }
- }
- }
-
- }
- while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
- ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
- long written = _socketChannel.write(encryptedBuffers);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Written " + written + " encrypted bytes");
- }
- ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
- while(iter.hasNext())
- {
- ByteBuffer buf = iter.next();
- if(buf.remaining() == 0)
- {
- iter.remove();
- }
- else
- {
- break;
- }
- }
-
- return (bufArray.length == byteBuffersWritten) && _encryptedOutput.isEmpty();
-
+ return _delegate.doWrite(bufArray);
}
else
{
@@ -598,18 +384,31 @@ public class NonBlockingConnection imple
}
}
- private boolean runSSLEngineTasks(final SSLEngineResult status)
+ boolean processData(ByteBuffer data) throws IOException
+ {
+ return _delegate.processData(data);
+ }
+
+ protected boolean readIntoBuffer(final ByteBuffer buffer) throws IOException
{
- if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
+ boolean readData = false;
+ int read = _socketChannel.read(buffer);
+ if (read > 0)
{
- Runnable task;
- while((task = _sslEngine.getDelegatedTask()) != null)
- {
- task.run();
- }
- return true;
+ readData = true;
+ }
+ else if (read == -1)
+ {
+ _closed.set(true);
}
- return false;
+
+ _partialRead = !buffer.hasRemaining();
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " byte(s)");
+ }
+ return readData;
}
@Override
@@ -626,6 +425,11 @@ public class NonBlockingConnection imple
}
}
+ public void writeBufferProcessed()
+ {
+ _buffers.poll();
+ }
+
@Override
public void flush()
{
@@ -642,34 +446,30 @@ public class NonBlockingConnection imple
return "[NonBlockingConnection " + _remoteSocketAddress + "]";
}
- private boolean looksLikeSSL(final byte[] headerBytes)
- {
- return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
- }
-
- private boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
+ public NetworkConnectionScheduler getScheduler()
{
- return headerBytes[0] == 22 && // SSL Handshake
- (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[2] == 0 || // SSL 3.0
- headerBytes[2] == 1 || // TLS 1.0
- headerBytes[2] == 2 || // TLS 1.1
- headerBytes[2] == 3)) && // TLS1.2
- (headerBytes[5] == 1); // client_hello
+ return _scheduler;
}
- private boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
+ public void processAmqpData(ByteBuffer data)
{
- return headerBytes[0] == -128 &&
- headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[4] == 0 || // SSL 3.0
- headerBytes[4] == 1 || // TLS 1.0
- headerBytes[4] == 2 || // TLS 1.1
- headerBytes[4] == 3);
+ _protocolEngine.received(data);
}
- public NetworkConnectionScheduler getScheduler()
+ public void setTransportEncryption(TransportEncryption transportEncryption)
{
- return _scheduler;
+ switch (transportEncryption)
+ {
+ case TLS:
+ _onTransportEncryptionAction.run();
+ _delegate = new NonBlockingConnectionTLSDelegate(this, _port);
+ break;
+ case NONE:
+ _delegate = new NonBlockingConnectionPlainDelegate(this);
+ break;
+ default:
+ throw new IllegalArgumentException("unknown TransportEncryption " + transportEncryption);
+ }
+ LOGGER.debug("Identified transport encryption as " + transportEncryption);
}
}
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java?rev=1685745&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java Tue Jun 16 09:28:46 2015
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.cert.Certificate;
+
+public interface NonBlockingConnectionDelegate
+{
+ boolean doRead() throws IOException;
+ boolean doWrite(ByteBuffer[] bufferArray) throws IOException;
+ boolean processData(ByteBuffer data) throws IOException;
+
+ Principal getPeerPrincipal();
+
+ Certificate getPeerCertificate();
+
+ boolean needsWork();
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1685745&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java Tue Jun 16 09:28:46 2015
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.cert.Certificate;
+
+public class NonBlockingConnectionPlainDelegate implements NonBlockingConnectionDelegate
+{
+ private final NonBlockingConnection _parent;
+
+ public NonBlockingConnectionPlainDelegate(NonBlockingConnection parent)
+ {
+ _parent = parent;
+ }
+
+ @Override
+ public boolean doRead() throws IOException
+ {
+ return _parent.readAndProcessData();
+ }
+
+ @Override
+ public boolean processData(ByteBuffer buffer)
+ {
+ _parent.processAmqpData(buffer);
+ buffer.position(buffer.limit());
+ return false;
+ }
+
+ @Override
+ public boolean doWrite(ByteBuffer[] bufferArray) throws IOException
+ {
+ int byteBuffersWritten = 0;
+
+ _parent.writeToTransport(bufferArray);
+
+ for (ByteBuffer buf : bufferArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _parent.writeBufferProcessed();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return bufferArray.length == byteBuffersWritten;
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ return null;
+ }
+
+ @Override
+ public Certificate getPeerCertificate()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean needsWork()
+ {
+ return false;
+ }
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1685745&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java Tue Jun 16 09:28:46 2015
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class NonBlockingConnectionTLSDelegate implements NonBlockingConnectionDelegate
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnectionTLSDelegate.class);
+
+ private final SSLEngine _sslEngine;
+ private final NonBlockingConnection _parent;
+ private SSLEngineResult _status;
+ private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
+ private Principal _principal;
+ private Certificate _peerCertificate;
+ private boolean _principalChecked;
+
+ public NonBlockingConnectionTLSDelegate(NonBlockingConnection parent, AmqpPort port)
+ {
+ _parent = parent;
+ _sslEngine = createSSLEngine(port);
+ }
+
+ @Override
+ public boolean doRead() throws IOException
+ {
+ boolean readData = false;
+ if (_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
+ {
+ readData = _parent.readAndProcessData();
+ }
+ return readData;
+ }
+
+ @Override
+ public boolean processData(ByteBuffer buffer) throws IOException
+ {
+ return unwrapAndProcessBuffer(buffer);
+ }
+
+ @Override
+ public boolean doWrite(ByteBuffer[] bufferArray) throws IOException
+ {
+ int byteBuffersWritten = wrapBufferArray(bufferArray);
+
+ ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
+
+ _parent.writeToTransport(encryptedBuffers);
+
+ ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
+ while(iter.hasNext())
+ {
+ ByteBuffer buf = iter.next();
+ if(buf.remaining() == 0)
+ {
+ iter.remove();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return (bufferArray.length == byteBuffersWritten) && _encryptedOutput.isEmpty();
+ }
+
+ private boolean unwrapAndProcessBuffer(final ByteBuffer wrappedDataBuffer) throws SSLException
+ {
+ boolean readData = false;
+ int unwrapped;
+ boolean tasksRun;
+ do
+ {
+ ByteBuffer appInputBuffer =
+ ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+ _status = _sslEngine.unwrap(wrappedDataBuffer, appInputBuffer);
+ if (_status.getStatus() == SSLEngineResult.Status.CLOSED)
+ {
+ // KW If SSLEngine changes state to CLOSED, what will ever set _closed to true?
+ LOGGER.debug("SSLEngine closed");
+ }
+
+ tasksRun = runSSLEngineTasks(_status);
+
+ appInputBuffer.flip();
+ unwrapped = appInputBuffer.remaining();
+ if(unwrapped > 0)
+ {
+ readData = true;
+ }
+ _parent.processAmqpData(appInputBuffer);
+ }
+ while(unwrapped > 0 || tasksRun);
+ return readData;
+ }
+
+ private int wrapBufferArray(final ByteBuffer[] bufferArray) throws SSLException
+ {
+ int byteBuffersWritten = 0;
+ int remaining = 0;
+ do
+ {
+ if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
+ {
+ final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+ _status = _sslEngine.wrap(bufferArray, netBuffer);
+ runSSLEngineTasks(_status);
+
+ netBuffer.flip();
+ remaining = netBuffer.remaining();
+ if (remaining != 0)
+ {
+ _encryptedOutput.add(netBuffer);
+ }
+ for (ByteBuffer buf : bufferArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _parent.writeBufferProcessed();
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+
+ }
+ while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+ return byteBuffersWritten;
+ }
+
+ private boolean runSSLEngineTasks(final SSLEngineResult status)
+ {
+ if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
+ {
+ Runnable task;
+ while((task = _sslEngine.getDelegatedTask()) != null)
+ {
+ task.run();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ checkPeerPrincipal();
+ return _principal;
+ }
+
+ @Override
+ public Certificate getPeerCertificate()
+ {
+ checkPeerPrincipal();
+ return _peerCertificate;
+ }
+
+ @Override
+ public boolean needsWork()
+ {
+ return _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
+ }
+
+ private synchronized void checkPeerPrincipal()
+ {
+ if (!_principalChecked)
+ {
+ try
+ {
+ _principal = _sslEngine.getSession().getPeerPrincipal();
+ final Certificate[] peerCertificates =
+ _sslEngine.getSession().getPeerCertificates();
+ if (peerCertificates != null && peerCertificates.length > 0)
+ {
+ _peerCertificate = peerCertificates[0];
+ }
+ }
+ catch (SSLPeerUnverifiedException e)
+ {
+ _principal = null;
+ _peerCertificate = null;
+ }
+
+ _principalChecked = true;
+ }
+ }
+
+ private SSLEngine createSSLEngine(AmqpPort port)
+ {
+ SSLEngine sslEngine = port.getSSLContext().createSSLEngine();
+ sslEngine.setUseClientMode(false);
+ SSLUtil.removeSSLv3Support(sslEngine);
+ SSLUtil.updateEnabledCipherSuites(sslEngine, port.getEnabledCipherSuites(), port.getDisabledCipherSuites());
+
+ if(port.getNeedClientAuth())
+ {
+ sslEngine.setNeedClientAuth(true);
+ }
+ else if(port.getWantClientAuth())
+ {
+ sslEngine.setWantClientAuth(true);
+ }
+ return sslEngine;
+ }
+
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1685745&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java Tue Jun 16 09:28:46 2015
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.network.TransportEncryption;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.cert.Certificate;
+
+public class NonBlockingConnectionUndecidedDelegate implements NonBlockingConnectionDelegate
+{
+ private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
+ public final NonBlockingConnection _parent;
+
+ public NonBlockingConnectionUndecidedDelegate(NonBlockingConnection parent)
+ {
+ _parent = parent;
+ }
+
+ @Override
+ public boolean doRead() throws IOException
+ {
+ return _parent.readAndProcessData();
+ }
+
+ public boolean processData(ByteBuffer buffer) throws IOException
+ {
+ if (buffer.remaining() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
+ {
+ final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
+ ByteBuffer dup = buffer.duplicate();
+ dup.get(headerBytes);
+
+ if (looksLikeSSL(headerBytes))
+ {
+ _parent.setTransportEncryption(TransportEncryption.TLS);
+ }
+ else
+ {
+ _parent.setTransportEncryption(TransportEncryption.NONE);
+ }
+ _parent.processData(buffer);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean doWrite(ByteBuffer[] bufferArray) throws IOException
+ {
+ return true;
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ return null;
+ }
+
+ @Override
+ public Certificate getPeerCertificate()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean needsWork()
+ {
+ return false;
+ }
+
+ private boolean looksLikeSSL(final byte[] headerBytes)
+ {
+ return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
+ }
+
+ private boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
+ {
+ return headerBytes[0] == 22 && // SSL Handshake
+ (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[2] == 0 || // SSL 3.0
+ headerBytes[2] == 1 || // TLS 1.0
+ headerBytes[2] == 2 || // TLS 1.1
+ headerBytes[2] == 3)) && // TLS1.2
+ (headerBytes[5] == 1); // client_hello
+ }
+
+ private boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
+ {
+ return headerBytes[0] == -128 &&
+ headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[4] == 0 || // SSL 3.0
+ headerBytes[4] == 1 || // TLS 1.0
+ headerBytes[4] == 2 || // TLS 1.1
+ headerBytes[4] == 3);
+ }
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Tue Jun 16 09:28:46 2015
@@ -23,7 +23,6 @@ package org.apache.qpid.server.transport
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
-import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.EnumSet;
@@ -31,6 +30,7 @@ import java.util.Set;
import javax.net.ssl.SSLContext;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +45,8 @@ import org.apache.qpid.transport.network
import org.apache.qpid.transport.network.io.AbstractNetworkTransport;
import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
public class NonBlockingNetworkTransport
{
@@ -54,41 +56,53 @@ public class NonBlockingNetworkTransport
private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
private final Set<TransportEncryption> _encryptionSet;
- private final NetworkTransportConfiguration _config;
private final ProtocolEngineFactory _factory;
- private final SSLContext _sslContext;
private final ServerSocketChannel _serverSocket;
private final int _timeout;
private final NetworkConnectionScheduler _scheduler;
+ private final AmqpPort _port;
+ private final InetSocketAddress _address;
- public NonBlockingNetworkTransport(final NetworkTransportConfiguration config,
- final MultiVersionProtocolEngineFactory factory,
- final SSLContext sslContext,
+ public NonBlockingNetworkTransport(final MultiVersionProtocolEngineFactory factory,
final EnumSet<TransportEncryption> encryptionSet,
- final NetworkConnectionScheduler scheduler)
+ final NetworkConnectionScheduler scheduler,
+ final AmqpPort port)
{
try
{
- _config = config;
_factory = factory;
- _sslContext = sslContext;
_timeout = TIMEOUT;
- InetSocketAddress address = config.getAddress();
+ String bindingAddress = port.getBindingAddress();
+ if (WILDCARD_ADDRESS.equals(bindingAddress))
+ {
+ bindingAddress = null;
+ }
+ int portNumber = port.getPort();
+
+ if ( bindingAddress == null )
+ {
+ _address = new InetSocketAddress(portNumber);
+ }
+ else
+ {
+ _address = new InetSocketAddress(bindingAddress, portNumber);
+ }
_serverSocket = ServerSocketChannel.open();
_serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
- _serverSocket.bind(address);
+ _serverSocket.bind(_address);
_serverSocket.configureBlocking(false);
_encryptionSet = encryptionSet;
_scheduler = scheduler;
+ _port = port;
}
catch (IOException e)
{
- throw new TransportException("Failed to start AMQP on port : " + config, e);
+ throw new TransportException("Failed to start AMQP on port : " + port, e);
}
}
@@ -108,7 +122,7 @@ public class NonBlockingNetworkTransport
}
catch (IOException e)
{
- LOGGER.warn("Error closing the server socket for : " + _config.getAddress().toString(), e);
+ LOGGER.warn("Error closing the server socket for : " + _address.toString(), e);
}
}
@@ -117,11 +131,6 @@ public class NonBlockingNetworkTransport
return _serverSocket.socket().getLocalPort();
}
- public NetworkTransportConfiguration getConfig()
- {
- return _config;
- }
-
void acceptSocketChannel(final ServerSocketChannel serverSocketChannel)
{
SocketChannel socketChannel = null;
@@ -136,11 +145,11 @@ public class NonBlockingNetworkTransport
if(engine != null)
{
- socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+ socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _port.isTcpNoDelay());
socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
- final int sendBufferSize = _config.getSendBufferSize();
- final int receiveBufferSize = _config.getReceiveBufferSize();
+ final int sendBufferSize = _port.getSendBufferSize();
+ final int receiveBufferSize = _port.getReceiveBufferSize();
socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
@@ -157,12 +166,7 @@ public class NonBlockingNetworkTransport
engine,
receiveBufferSize,
_encryptionSet,
- _sslContext,
- _config.wantClientAuth(),
- _config.needClientAuth(),
- _config.getEnabledCipherSuites(),
- _config.getDisabledCipherSuites(),
- new Runnable()
+ new Runnable()
{
@Override
@@ -171,7 +175,8 @@ public class NonBlockingNetworkTransport
engine.encryptedTransport();
}
},
- _scheduler);
+ _scheduler,
+ _port);
engine.setNetworkConnection(connection, connection.getSender());
connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
@@ -208,9 +213,4 @@ public class NonBlockingNetworkTransport
}
}
}
-
- public int getThreadPoolSize()
- {
- return _config.getThreadPoolSize();
- }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Tue Jun 16 09:28:46 2015
@@ -20,42 +20,31 @@
*/
package org.apache.qpid.server.transport;
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
-import java.net.InetSocketAddress;
import java.util.EnumSet;
-import java.util.Collection;
import java.util.Set;
-import javax.net.ssl.SSLContext;
-
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.TransportEncryption;
class TCPandSSLTransport implements AcceptingTransport
{
private NonBlockingNetworkTransport _networkTransport;
private Set<Transport> _transports;
- private SSLContext _sslContext;
- private InetSocketAddress _bindingSocketAddress;
private AmqpPort<?> _port;
private Set<Protocol> _supported;
private Protocol _defaultSupportedProtocolReply;
private NetworkConnectionScheduler _scheduler;
TCPandSSLTransport(final Set<Transport> transports,
- final SSLContext sslContext,
final AmqpPort<?> port,
final Set<Protocol> supported,
final Protocol defaultSupportedProtocolReply)
{
_transports = transports;
- _sslContext = sslContext;
_port = port;
_supported = supported;
_defaultSupportedProtocolReply = defaultSupportedProtocolReply;
@@ -64,24 +53,6 @@ class TCPandSSLTransport implements Acce
@Override
public void start()
{
- String bindingAddress = _port.getBindingAddress();
- if (WILDCARD_ADDRESS.equals(bindingAddress))
- {
- bindingAddress = null;
- }
- int port = _port.getPort();
- if ( bindingAddress == null )
- {
- _bindingSocketAddress = new InetSocketAddress(port);
- }
- else
- {
- _bindingSocketAddress = new InetSocketAddress(bindingAddress, port);
- }
-
- final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
-
-
final MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(
_port.getParent(Broker.class),
@@ -100,9 +71,9 @@ class TCPandSSLTransport implements Acce
encryptionSet.add(TransportEncryption.TLS);
}
- _scheduler = new NetworkConnectionScheduler("Port-"+_port.getName(),settings.getThreadPoolSize());
- _networkTransport = new NonBlockingNetworkTransport(settings, protocolEngineFactory,
- _sslContext, encryptionSet, _scheduler);
+ _scheduler = new NetworkConnectionScheduler("Port-"+_port.getName(), _port.getThreadPoolSize());
+ _networkTransport = new NonBlockingNetworkTransport(protocolEngineFactory,
+ encryptionSet, _scheduler, _port);
_networkTransport.start();
}
@@ -123,71 +94,4 @@ class TCPandSSLTransport implements Acce
_scheduler.close();
}
}
-
- class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration
- {
- public ServerNetworkTransportConfiguration()
- {
- }
-
- @Override
- public boolean wantClientAuth()
- {
- return _port.getWantClientAuth();
- }
-
- @Override
- public Collection<String> getEnabledCipherSuites()
- {
- return _port.getEnabledCipherSuites();
- }
-
- @Override
- public Collection<String> getDisabledCipherSuites()
- {
- return _port.getDisabledCipherSuites();
- }
-
- @Override
- public boolean needClientAuth()
- {
- return _port.getNeedClientAuth();
- }
-
- @Override
- public boolean getTcpNoDelay()
- {
- return _port.isTcpNoDelay();
- }
-
- @Override
- public int getSendBufferSize()
- {
- return _port.getSendBufferSize();
- }
-
- @Override
- public int getThreadPoolSize()
- {
- return Runtime.getRuntime().availableProcessors();
- }
-
- @Override
- public int getReceiveBufferSize()
- {
- return _port.getReceiveBufferSize();
- }
-
- @Override
- public InetSocketAddress getAddress()
- {
- return _bindingSocketAddress;
- }
-
- @Override
- public String toString()
- {
- return _port.toString();
- }
- }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java Tue Jun 16 09:28:46 2015
@@ -37,6 +37,6 @@ class TCPandSSLTransportProvider impleme
final Set<Protocol> supported,
final Protocol defaultSupportedProtocolReply)
{
- return new TCPandSSLTransport(transports, sslContext, port, supported, defaultSupportedProtocolReply);
+ return new TCPandSSLTransport(transports, port, supported, defaultSupportedProtocolReply);
}
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java Tue Jun 16 09:28:46 2015
@@ -19,6 +19,7 @@
package org.apache.qpid.server.transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngine;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -45,63 +46,16 @@ public class NetworkConnectionSchedulerT
public void testFairRead() throws IOException, InterruptedException
{
- NetworkTransportConfiguration config = new NetworkTransportConfiguration()
- {
-
- @Override
- public boolean getTcpNoDelay()
- {
- return true;
- }
-
- @Override
- public int getReceiveBufferSize()
- {
- return 1;
- }
-
- @Override
- public int getSendBufferSize()
- {
- return 1;
- }
-
- @Override
- public int getThreadPoolSize()
- {
- return 1;
- }
-
- @Override
- public InetSocketAddress getAddress()
- {
- return new InetSocketAddress(0);
- }
-
- @Override
- public boolean needClientAuth()
- {
- return false;
- }
-
- @Override
- public boolean wantClientAuth()
- {
- return false;
- }
+ AmqpPort port = mock(AmqpPort.class);
+ when(port.isTcpNoDelay()).thenReturn(true);
+ when(port.getSendBufferSize()).thenReturn(1);
+ when(port.getReceiveBufferSize()).thenReturn(1);
+ when(port.getPort()).thenReturn(0);
+ when(port.getBindingAddress()).thenReturn("*");
+ when(port.getEnabledCipherSuites()).thenReturn(Collections.emptyList());
+ when(port.getDisabledCipherSuites()).thenReturn(Collections.emptyList());
+ when(port.getThreadPoolSize()).thenReturn(1);
- @Override
- public Collection<String> getEnabledCipherSuites()
- {
- return Collections.emptyList();
- }
-
- @Override
- public Collection<String> getDisabledCipherSuites()
- {
- return Collections.emptyList();
- }
- };
MultiVersionProtocolEngineFactory engineFactory = mock(MultiVersionProtocolEngineFactory.class);
MultiVersionProtocolEngine verboseEngine = mock(MultiVersionProtocolEngine.class);
MultiVersionProtocolEngine timidEngine = mock(MultiVersionProtocolEngine.class);
@@ -112,14 +66,14 @@ public class NetworkConnectionSchedulerT
final NetworkConnectionScheduler scheduler = new NetworkConnectionScheduler(getName(), 1);
- NonBlockingNetworkTransport transport = new NonBlockingNetworkTransport(config, engineFactory, null, EnumSet.of(TransportEncryption.NONE),
- scheduler);
+ NonBlockingNetworkTransport transport = new NonBlockingNetworkTransport(engineFactory, EnumSet.of(TransportEncryption.NONE),
+ scheduler, port);
transport.start();
- final int port = transport.getAcceptingPort();
+ final int portNumber = transport.getAcceptingPort();
Socket verboseSocket = new Socket();
- verboseSocket.connect(new InetSocketAddress(port));
+ verboseSocket.connect(new InetSocketAddress(portNumber));
final OutputStream verboseOutputStream = verboseSocket.getOutputStream();
Thread verboseSender = new Thread(new Runnable()
{
@@ -141,7 +95,7 @@ public class NetworkConnectionSchedulerT
});
Socket timidSocket = new Socket();
- timidSocket.connect(new InetSocketAddress(port));
+ timidSocket.connect(new InetSocketAddress(portNumber));
final OutputStream timidOutputStream = timidSocket.getOutputStream();
Thread timidSender = new Thread(new Runnable()
{
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1685745&r1=1685744&r2=1685745&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Tue Jun 16 09:28:46 2015
@@ -109,9 +109,10 @@ public class TCPandSSLTransportTest exte
when(port.getSendBufferSize()).thenReturn(64*1024);
when(port.getReceiveBufferSize()).thenReturn(64*1024);
when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
+ when(port.getThreadPoolSize()).thenReturn(1);
+ when(port.getSSLContext()).thenReturn(sslContext);
TCPandSSLTransport transport = new TCPandSSLTransport(new HashSet<>(Arrays.asList(transports)),
- sslContext,
port,
new HashSet<>(Arrays.asList(Protocol.AMQP_0_8,
Protocol.AMQP_0_9,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org