You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/12/10 15:51:16 UTC
svn commit: r1644437 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/transport/
common/src/main/java/org/apache/qpid/transport/network/
common/src/main/java/org/apache/qpid/transport/network/io/ c...
Author: rgodfrey
Date: Wed Dec 10 14:51:15 2014
New Revision: 1644437
URL: http://svn.apache.org/r1644437
Log:
Iniital SSL work
Added:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java (with props)
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Wed Dec 10 14:51:15 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
import java.net.InetSocketAddress;
+import java.util.EnumSet;
import java.util.Set;
import javax.net.ssl.SSLContext;
@@ -34,6 +35,7 @@ import org.apache.qpid.server.model.port
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport;
class TCPandSSLTransport implements AcceptingTransport
@@ -88,7 +90,17 @@ class TCPandSSLTransport implements Acce
_port,
_transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
- _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext);
+ EnumSet<TransportEncryption> encryptionSet = EnumSet.noneOf(TransportEncryption.class);
+ if(_transports.contains(Transport.TCP))
+ {
+ encryptionSet.add(TransportEncryption.NONE);
+ }
+ if(_transports.contains(Transport.SSL))
+ {
+ encryptionSet.add(TransportEncryption.TLS);
+ }
+ _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext,
+ encryptionSet);
}
public int getAcceptingPort()
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java Wed Dec 10 14:51:15 2014
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.transport.network;
+import java.util.Set;
+
import javax.net.ssl.SSLContext;
import org.apache.qpid.protocol.ProtocolEngineFactory;
@@ -29,7 +31,8 @@ public interface IncomingNetworkTranspor
{
public void accept(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
- SSLContext sslContext);
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet);
public int getAcceptingPort();
}
Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java?rev=1644437&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java Wed Dec 10 14:51:15 2014
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+public enum TransportEncryption
+{
+ NONE, TLS
+}
Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java Wed Dec 10 14:51:15 2014
@@ -27,6 +27,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
+import java.util.Set;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocket;
@@ -45,6 +46,7 @@ import org.apache.qpid.transport.network
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
@@ -136,7 +138,7 @@ public abstract class AbstractNetworkTra
public void accept(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
- SSLContext sslContext)
+ SSLContext sslContext, final Set<TransportEncryption> encryptionSet)
{
try
{
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java Wed Dec 10 14:51:15 2014
@@ -24,7 +24,9 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.security.Principal;
+import java.util.Set;
+import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSocket;
@@ -35,6 +37,7 @@ import org.apache.qpid.transport.Receive
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportEncryption;
public class NonBlockingConnection implements NetworkConnection
{
@@ -48,20 +51,20 @@ public class NonBlockingConnection imple
private boolean _principalChecked;
private final Object _lock = new Object();
- public NonBlockingConnection(SocketChannel socket, Receiver<ByteBuffer> delegate,
- int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
+ public NonBlockingConnection(SocketChannel socket,
+ Receiver<ByteBuffer> delegate,
+ int sendBufferSize,
+ int receiveBufferSize,
+ long timeout,
+ Ticker ticker,
+ final Set<TransportEncryption> encryptionSet,
+ final SSLContext sslContext,
+ final boolean wantClientAuth, final boolean needClientAuth)
{
_socket = socket;
_timeout = timeout;
-// _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout);
-// _nonBlockingSenderReceiver.setTicker(ticker);
-
-// _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout);
-
-// _ioSender.setReceiver(_nonBlockingSenderReceiver);
-
- _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker);
+ _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth);
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java Wed Dec 10 14:51:15 2014
@@ -21,13 +21,13 @@
package org.apache.qpid.transport.network.io;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.Set;
import javax.net.ssl.SSLContext;
@@ -36,13 +36,12 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.TransportEncryption;
public class NonBlockingNetworkTransport implements IncomingNetworkTransport
{
@@ -54,6 +53,8 @@ public class NonBlockingNetworkTransport
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
private AcceptingThread _acceptor;
+
+/*
private SocketChannel _socketChannel;
private NonBlockingConnection _connection;
@@ -93,7 +94,7 @@ public class NonBlockingNetworkTransport
{
IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
_connection = createNetworkConnection(_socketChannel, delegate, sendBufferSize, receiveBufferSize,
- TIMEOUT, ticker);
+ TIMEOUT, ticker, _encryptionSet, _sslContext);
ticker.setConnection(_connection);
_connection.start();
}
@@ -114,41 +115,51 @@ public class NonBlockingNetworkTransport
return _connection;
}
+*/
protected NonBlockingConnection createNetworkConnection(final SocketChannel socket,
- final Receiver<ByteBuffer> engine,
- final Integer sendBufferSize,
- final Integer receiveBufferSize,
- final int timeout,
- final IdleTimeoutTicker ticker)
+ final Receiver<ByteBuffer> engine,
+ final Integer sendBufferSize,
+ final Integer receiveBufferSize,
+ final int timeout,
+ final IdleTimeoutTicker ticker,
+ final Set<TransportEncryption> encryptionSet,
+ final SSLContext sslContext,
+ final boolean wantClientAuth,
+ final boolean needClientAuth)
{
- return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker);
+ return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth);
}
public void close()
{
+/*
if(_connection != null)
{
_connection.close();
}
+*/
if(_acceptor != null)
{
_acceptor.close();
}
}
+/*
public NetworkConnection getConnection()
{
return _connection;
}
+*/
public void accept(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
- SSLContext sslContext)
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet)
{
try
{
- _acceptor = new AcceptingThread(config, factory, sslContext);
+ _acceptor = new AcceptingThread(config, factory, sslContext, encryptionSet);
_acceptor.setDaemon(false);
_acceptor.start();
}
@@ -165,6 +176,7 @@ public class NonBlockingNetworkTransport
private class AcceptingThread extends Thread
{
+ private final Set<TransportEncryption> _encryptionSet;
private volatile boolean _closed = false;
private NetworkTransportConfiguration _config;
private ProtocolEngineFactory _factory;
@@ -174,7 +186,8 @@ public class NonBlockingNetworkTransport
private AcceptingThread(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
- SSLContext sslContext) throws IOException
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet) throws IOException
{
_config = config;
_factory = factory;
@@ -187,6 +200,7 @@ public class NonBlockingNetworkTransport
_serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
_serverSocket.bind(address);
+ _encryptionSet = encryptionSet;
}
@@ -250,7 +264,11 @@ public class NonBlockingNetworkTransport
sendBufferSize,
receiveBufferSize,
_timeout,
- ticker);
+ ticker,
+ _encryptionSet,
+ _sslContext,
+ _config.wantClientAuth(),
+ _config.needClientAuth());
connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Wed Dec 10 14:51:15 2014
@@ -24,11 +24,18 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +45,8 @@ import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
{
@@ -47,6 +56,7 @@ public class NonBlockingSenderReceiver
private final Selector _selector;
private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+ private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
private final Thread _ioThread;
private final String _remoteSocketAddress;
@@ -54,16 +64,55 @@ public class NonBlockingSenderReceiver
private final Receiver<ByteBuffer> _receiver;
private final int _receiveBufSize;
private final Ticker _ticker;
+ private final Set<TransportEncryption> _encryptionSet;
+ private final SSLContext _sslContext;
+ private ByteBuffer _netInputBuffer;
+ private SSLEngine _sslEngine;
private ByteBuffer _currentBuffer;
+ private TransportEncryption _transportEncryption;
+ private SSLEngineResult _status;
+
- public NonBlockingSenderReceiver(final SocketChannel socketChannel, Receiver<ByteBuffer> receiver, int receiveBufSize, Ticker ticker)
+ public NonBlockingSenderReceiver(final SocketChannel socketChannel,
+ Receiver<ByteBuffer> receiver,
+ int receiveBufSize,
+ Ticker ticker,
+ final Set<TransportEncryption> encryptionSet,
+ final SSLContext sslContext,
+ final boolean wantClientAuth,
+ final boolean needClientAuth)
{
_socketChannel = socketChannel;
_receiver = receiver;
_receiveBufSize = receiveBufSize;
_ticker = ticker;
+ _encryptionSet = encryptionSet;
+ _sslContext = sslContext;
+
+
+ if(encryptionSet.size() == 1)
+ {
+ _transportEncryption = _encryptionSet.iterator().next();
+ }
+
+ if(encryptionSet.contains(TransportEncryption.TLS))
+ {
+ _sslEngine = _sslContext.createSSLEngine();
+ _sslEngine.setUseClientMode(false);
+ SSLUtil.removeSSLv3Support(_sslEngine);
+ if(needClientAuth)
+ {
+ _sslEngine.setNeedClientAuth(true);
+ }
+ else if(wantClientAuth)
+ {
+ _sslEngine.setWantClientAuth(true);
+ }
+ _netInputBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+
+ }
try
{
@@ -197,7 +246,6 @@ public class NonBlockingSenderReceiver
private boolean doWrite() throws IOException
{
- int byteBuffersWritten = 0;
ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
@@ -206,45 +254,153 @@ public class NonBlockingSenderReceiver
bufArray[i] = bufferIterator.next();
}
- _socketChannel.write(bufArray);
+ int byteBuffersWritten = 0;
- for (ByteBuffer buf : bufArray)
+ if(_transportEncryption == TransportEncryption.NONE)
{
- if (buf.remaining() == 0)
+
+
+ _socketChannel.write(bufArray);
+
+ for (ByteBuffer buf : bufArray)
{
- byteBuffersWritten++;
- _buffers.poll();
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
}
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + byteBuffersWritten + " byte buffer(s) completely");
+ }
+
+ return bufArray.length == byteBuffersWritten;
}
+ else if(_transportEncryption == TransportEncryption.TLS)
+ {
+ int remaining = 0;
+
+ do
+ {
+ LOGGER.debug("Handshake status: " + _sslEngine.getHandshakeStatus());
+ if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
+ {
+ final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+ _status = _sslEngine.wrap(bufArray, netBuffer);
+ LOGGER.debug("Status: " + _status.getStatus() + " HandshakeStatus " + _status.getHandshakeStatus());
+ runSSLEngineTasks(_status);
+
+ netBuffer.flip();
+ LOGGER.debug("Encrypted " + netBuffer.remaining() + " bytes for output");
+ remaining = netBuffer.remaining();
+ if (remaining != 0)
+ {
+ _encryptedOutput.add(netBuffer);
+ }
+ for (ByteBuffer buf : bufArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
+ }
+ }
+
+ }
+ while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+
+ ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
+ long written = _socketChannel.write(encryptedBuffers);
+ LOGGER.debug("Written " + written + " encrypted bytes");
- if (LOGGER.isDebugEnabled())
+ ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
+ while(iter.hasNext())
+ {
+ ByteBuffer buf = iter.next();
+ if(buf.remaining() == 0)
+ {
+ iter.remove();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return bufArray.length == byteBuffersWritten;
+
+ }
+ else
{
- LOGGER.debug("Written " + byteBuffersWritten + " byte buffer(s) completely");
+ // TODO - actually implement
+ return true;
}
-
- return bufArray.length == byteBuffersWritten;
}
private void doRead() throws IOException
{
- int remaining = 0;
- while (remaining == 0 && !_closed.get())
+ if(_transportEncryption == TransportEncryption.NONE)
+ {
+ int remaining = 0;
+ while (remaining == 0 && !_closed.get())
+ {
+ if (_currentBuffer == null || _currentBuffer.remaining() == 0)
+ {
+ _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
+ }
+ _socketChannel.read(_currentBuffer);
+ remaining = _currentBuffer.remaining();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + _currentBuffer.position() + " byte(s)");
+ }
+ ByteBuffer dup = _currentBuffer.duplicate();
+ dup.flip();
+ _currentBuffer = _currentBuffer.slice();
+ _receiver.received(dup);
+ }
+ }
+ else if(_transportEncryption == TransportEncryption.TLS)
{
- if(_currentBuffer == null || _currentBuffer.remaining() == 0)
+ int read = 1;
+ int unwrapped = 0;
+ while(!_closed.get() && (read > 0 || unwrapped > 0) && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
{
- _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
+ read = _socketChannel.read(_netInputBuffer);
+ LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
+ _netInputBuffer.flip();
+ ByteBuffer appInputBuffer =
+ ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+
+ _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
+ LOGGER.debug("Status: " +_status.getStatus() + " HandshakeStatus " + _status.getHandshakeStatus());
+ _netInputBuffer.compact();
+
+ appInputBuffer.flip();
+ unwrapped = appInputBuffer.remaining();
+ LOGGER.debug("Unwrapped to " + unwrapped + " bytes");
+
+ _receiver.received(appInputBuffer);
+
+ runSSLEngineTasks(_status);
}
- _socketChannel.read(_currentBuffer);
- remaining = _currentBuffer.remaining();
- if (LOGGER.isDebugEnabled())
+ }
+ }
+
+ private void runSSLEngineTasks(final SSLEngineResult status)
+ {
+ if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
+ {
+ Runnable task;
+ while((task = _sslEngine.getDelegatedTask()) != null)
{
- LOGGER.debug("Read " + _currentBuffer.position() + " byte(s)");
+ LOGGER.debug("Running task");
+ task.run();
}
- ByteBuffer dup = _currentBuffer.duplicate();
- dup.flip();
- _currentBuffer = _currentBuffer.slice();
- _receiver.received(dup);
}
}
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java Wed Dec 10 14:51:15 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.transport.networ
import java.nio.ByteBuffer;
+import java.util.Set;
import javax.net.ssl.SSLContext;
@@ -150,7 +151,9 @@ public class TransportTest extends QpidT
}
public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory, SSLContext sslContext)
+ ProtocolEngineFactory factory,
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet)
{
throw new UnsupportedOperationException();
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes Wed Dec 10 14:51:15 2014
@@ -33,7 +33,6 @@ org.apache.qpid.test.client.message.Sele
// QPID-6262: Temporary exclusion whilst NIO refactoring is in flight
-org.apache.qpid.client.ssl.SSLTest#*
org.apache.qpid.server.transport.TCPandSSLTransportTest#*
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationTest#*
org.apache.qpid.server.logging.BrokerLoggingTest#testBrokerStartupListeningTCPSSL
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org