You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/12/10 15:51:16 UTC

svn commit: r1644437 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: broker-core/src/main/java/org/apache/qpid/server/transport/ common/src/main/java/org/apache/qpid/transport/network/ common/src/main/java/org/apache/qpid/transport/network/io/ c...

Author: rgodfrey
Date: Wed Dec 10 14:51:15 2014
New Revision: 1644437

URL: http://svn.apache.org/r1644437
Log:
Iniital SSL work

Added:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java   (with props)
Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Wed Dec 10 14:51:15 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport
 import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
 
 import java.net.InetSocketAddress;
+import java.util.EnumSet;
 import java.util.Set;
 
 import javax.net.ssl.SSLContext;
@@ -34,6 +35,7 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.TransportEncryption;
 import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport;
 
 class TCPandSSLTransport implements AcceptingTransport
@@ -88,7 +90,17 @@ class TCPandSSLTransport implements Acce
                 _port,
                 _transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
 
-        _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext);
+        EnumSet<TransportEncryption> encryptionSet = EnumSet.noneOf(TransportEncryption.class);
+        if(_transports.contains(Transport.TCP))
+        {
+            encryptionSet.add(TransportEncryption.NONE);
+        }
+        if(_transports.contains(Transport.SSL))
+        {
+            encryptionSet.add(TransportEncryption.TLS);
+        }
+        _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext,
+                                 encryptionSet);
     }
 
     public int getAcceptingPort()

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java Wed Dec 10 14:51:15 2014
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.transport.network;
 
+import java.util.Set;
+
 import javax.net.ssl.SSLContext;
 
 import org.apache.qpid.protocol.ProtocolEngineFactory;
@@ -29,7 +31,8 @@ public interface IncomingNetworkTranspor
 {
     public void accept(NetworkTransportConfiguration config,
                        ProtocolEngineFactory factory,
-                       SSLContext sslContext);
+                       SSLContext sslContext,
+                       final Set<TransportEncryption> encryptionSet);
 
     public int getAcceptingPort();
 }

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java?rev=1644437&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java Wed Dec 10 14:51:15 2014
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+public enum TransportEncryption
+{
+    NONE, TLS
+}

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java Wed Dec 10 14:51:15 2014
@@ -27,6 +27,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
+import java.util.Set;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLServerSocket;
@@ -45,6 +46,7 @@ import org.apache.qpid.transport.network
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.OutgoingNetworkTransport;
 import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.TransportEncryption;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
 public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
@@ -136,7 +138,7 @@ public abstract class AbstractNetworkTra
 
     public void accept(NetworkTransportConfiguration config,
                        ProtocolEngineFactory factory,
-                       SSLContext sslContext)
+                       SSLContext sslContext, final Set<TransportEncryption> encryptionSet)
     {
         try
         {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java Wed Dec 10 14:51:15 2014
@@ -24,7 +24,9 @@ import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.security.Principal;
+import java.util.Set;
 
+import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSocket;
 
@@ -35,6 +37,7 @@ import org.apache.qpid.transport.Receive
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportEncryption;
 
 public class NonBlockingConnection implements NetworkConnection
 {
@@ -48,20 +51,20 @@ public class NonBlockingConnection imple
     private boolean _principalChecked;
     private final Object _lock = new Object();
 
-    public NonBlockingConnection(SocketChannel socket, Receiver<ByteBuffer> delegate,
-                                 int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
+    public NonBlockingConnection(SocketChannel socket,
+                                 Receiver<ByteBuffer> delegate,
+                                 int sendBufferSize,
+                                 int receiveBufferSize,
+                                 long timeout,
+                                 Ticker ticker,
+                                 final Set<TransportEncryption> encryptionSet,
+                                 final SSLContext sslContext,
+                                 final boolean wantClientAuth, final boolean needClientAuth)
     {
         _socket = socket;
         _timeout = timeout;
 
-//        _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout);
-//        _nonBlockingSenderReceiver.setTicker(ticker);
-
-//        _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout);
-
-//        _ioSender.setReceiver(_nonBlockingSenderReceiver);
-
-        _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker);
+        _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth);
 
     }
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java Wed Dec 10 14:51:15 2014
@@ -21,13 +21,13 @@
 package org.apache.qpid.transport.network.io;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.Set;
 
 import javax.net.ssl.SSLContext;
 
@@ -36,13 +36,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.configuration.CommonProperties;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.IncomingNetworkTransport;
 import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.TransportEncryption;
 
 public class NonBlockingNetworkTransport implements IncomingNetworkTransport
 {
@@ -54,6 +53,8 @@ public class NonBlockingNetworkTransport
                                                                    CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
     private AcceptingThread _acceptor;
 
+
+/*
     private SocketChannel _socketChannel;
     private NonBlockingConnection _connection;
 
@@ -93,7 +94,7 @@ public class NonBlockingNetworkTransport
         {
             IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
             _connection = createNetworkConnection(_socketChannel, delegate, sendBufferSize, receiveBufferSize,
-                                                  TIMEOUT, ticker);
+                                                  TIMEOUT, ticker, _encryptionSet, _sslContext);
             ticker.setConnection(_connection);
             _connection.start();
         }
@@ -114,41 +115,51 @@ public class NonBlockingNetworkTransport
         return _connection;
     }
 
+*/
 
     protected NonBlockingConnection createNetworkConnection(final SocketChannel socket,
-                                                          final Receiver<ByteBuffer> engine,
-                                                          final Integer sendBufferSize,
-                                                          final Integer receiveBufferSize,
-                                                          final int timeout,
-                                                          final IdleTimeoutTicker ticker)
+                                                            final Receiver<ByteBuffer> engine,
+                                                            final Integer sendBufferSize,
+                                                            final Integer receiveBufferSize,
+                                                            final int timeout,
+                                                            final IdleTimeoutTicker ticker,
+                                                            final Set<TransportEncryption> encryptionSet,
+                                                            final SSLContext sslContext,
+                                                            final boolean wantClientAuth,
+                                                            final boolean needClientAuth)
     {
-        return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker);
+        return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth);
     }
 
     public void close()
     {
+/*
         if(_connection != null)
         {
             _connection.close();
         }
+*/
         if(_acceptor != null)
         {
             _acceptor.close();
         }
     }
+/*
 
     public NetworkConnection getConnection()
     {
         return _connection;
     }
+*/
 
     public void accept(NetworkTransportConfiguration config,
                        ProtocolEngineFactory factory,
-                       SSLContext sslContext)
+                       SSLContext sslContext,
+                       final Set<TransportEncryption> encryptionSet)
     {
         try
         {
-            _acceptor = new AcceptingThread(config, factory, sslContext);
+            _acceptor = new AcceptingThread(config, factory, sslContext, encryptionSet);
             _acceptor.setDaemon(false);
             _acceptor.start();
         }
@@ -165,6 +176,7 @@ public class NonBlockingNetworkTransport
 
     private class AcceptingThread extends Thread
     {
+        private final Set<TransportEncryption> _encryptionSet;
         private volatile boolean _closed = false;
         private NetworkTransportConfiguration _config;
         private ProtocolEngineFactory _factory;
@@ -174,7 +186,8 @@ public class NonBlockingNetworkTransport
 
         private AcceptingThread(NetworkTransportConfiguration config,
                                 ProtocolEngineFactory factory,
-                                SSLContext sslContext) throws IOException
+                                SSLContext sslContext,
+                                final Set<TransportEncryption> encryptionSet) throws IOException
         {
             _config = config;
             _factory = factory;
@@ -187,6 +200,7 @@ public class NonBlockingNetworkTransport
 
             _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
             _serverSocket.bind(address);
+            _encryptionSet = encryptionSet;
         }
 
 
@@ -250,7 +264,11 @@ public class NonBlockingNetworkTransport
                                                             sendBufferSize,
                                                             receiveBufferSize,
                                                             _timeout,
-                                                            ticker);
+                                                            ticker,
+                                                            _encryptionSet,
+                                                            _sslContext,
+                                                            _config.wantClientAuth(),
+                                                            _config.needClientAuth());
 
                             connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Wed Dec 10 14:51:15 2014
@@ -24,11 +24,18 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +45,8 @@ import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.SenderClosedException;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
 public class NonBlockingSenderReceiver  implements Runnable, Sender<ByteBuffer>
 {
@@ -47,6 +56,7 @@ public class NonBlockingSenderReceiver
     private final Selector _selector;
 
     private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+    private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
 
     private final Thread _ioThread;
     private final String _remoteSocketAddress;
@@ -54,16 +64,55 @@ public class NonBlockingSenderReceiver
     private final Receiver<ByteBuffer> _receiver;
     private final int _receiveBufSize;
     private final Ticker _ticker;
+    private final Set<TransportEncryption> _encryptionSet;
+    private final SSLContext _sslContext;
+    private ByteBuffer _netInputBuffer;
+    private SSLEngine _sslEngine;
 
     private ByteBuffer _currentBuffer;
 
+    private TransportEncryption _transportEncryption;
+    private SSLEngineResult _status;
+
 
-    public NonBlockingSenderReceiver(final SocketChannel socketChannel, Receiver<ByteBuffer> receiver, int receiveBufSize, Ticker ticker)
+    public NonBlockingSenderReceiver(final SocketChannel socketChannel,
+                                     Receiver<ByteBuffer> receiver,
+                                     int receiveBufSize,
+                                     Ticker ticker,
+                                     final Set<TransportEncryption> encryptionSet,
+                                     final SSLContext sslContext,
+                                     final boolean wantClientAuth,
+                                     final boolean needClientAuth)
     {
         _socketChannel = socketChannel;
         _receiver = receiver;
         _receiveBufSize = receiveBufSize;
         _ticker = ticker;
+        _encryptionSet = encryptionSet;
+        _sslContext = sslContext;
+
+
+        if(encryptionSet.size() == 1)
+        {
+            _transportEncryption = _encryptionSet.iterator().next();
+        }
+
+        if(encryptionSet.contains(TransportEncryption.TLS))
+        {
+            _sslEngine = _sslContext.createSSLEngine();
+            _sslEngine.setUseClientMode(false);
+            SSLUtil.removeSSLv3Support(_sslEngine);
+            if(needClientAuth)
+            {
+                _sslEngine.setNeedClientAuth(true);
+            }
+            else if(wantClientAuth)
+            {
+                _sslEngine.setWantClientAuth(true);
+            }
+            _netInputBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+
+        }
 
         try
         {
@@ -197,7 +246,6 @@ public class NonBlockingSenderReceiver
 
     private boolean doWrite() throws IOException
     {
-        int byteBuffersWritten = 0;
 
         ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
         Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
@@ -206,45 +254,153 @@ public class NonBlockingSenderReceiver
             bufArray[i] = bufferIterator.next();
         }
 
-        _socketChannel.write(bufArray);
+        int byteBuffersWritten = 0;
 
-        for (ByteBuffer buf : bufArray)
+        if(_transportEncryption == TransportEncryption.NONE)
         {
-            if (buf.remaining() == 0)
+
+
+            _socketChannel.write(bufArray);
+
+            for (ByteBuffer buf : bufArray)
             {
-                byteBuffersWritten++;
-                _buffers.poll();
+                if (buf.remaining() == 0)
+                {
+                    byteBuffersWritten++;
+                    _buffers.poll();
+                }
             }
+
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Written " + byteBuffersWritten + " byte buffer(s) completely");
+            }
+
+            return bufArray.length == byteBuffersWritten;
         }
+        else if(_transportEncryption == TransportEncryption.TLS)
+        {
+            int remaining = 0;
+
+            do
+            {
+                LOGGER.debug("Handshake status: " + _sslEngine.getHandshakeStatus());
+                if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
+                {
+                    final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+                    _status = _sslEngine.wrap(bufArray, netBuffer);
+                    LOGGER.debug("Status: " + _status.getStatus() + " HandshakeStatus " + _status.getHandshakeStatus());
+                    runSSLEngineTasks(_status);
+
+                    netBuffer.flip();
+                    LOGGER.debug("Encrypted " + netBuffer.remaining() + " bytes for output");
+                    remaining = netBuffer.remaining();
+                    if (remaining != 0)
+                    {
+                        _encryptedOutput.add(netBuffer);
+                    }
+                    for (ByteBuffer buf : bufArray)
+                    {
+                        if (buf.remaining() == 0)
+                        {
+                            byteBuffersWritten++;
+                            _buffers.poll();
+                        }
+                    }
+                }
+
+            }
+            while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+
+            ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
+            long written  = _socketChannel.write(encryptedBuffers);
+            LOGGER.debug("Written " + written + " encrypted bytes");
 
-        if (LOGGER.isDebugEnabled())
+            ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
+            while(iter.hasNext())
+            {
+                ByteBuffer buf = iter.next();
+                if(buf.remaining() == 0)
+                {
+                    iter.remove();
+                }
+                else
+                {
+                    break;
+                }
+            }
+
+            return bufArray.length == byteBuffersWritten;
+
+        }
+        else
         {
-            LOGGER.debug("Written " + byteBuffersWritten + " byte buffer(s) completely");
+            // TODO - actually implement
+            return true;
         }
-
-        return bufArray.length == byteBuffersWritten;
     }
 
     private void doRead() throws IOException
     {
 
-        int remaining = 0;
-        while (remaining == 0 && !_closed.get())
+        if(_transportEncryption == TransportEncryption.NONE)
+        {
+            int remaining = 0;
+            while (remaining == 0 && !_closed.get())
+            {
+                if (_currentBuffer == null || _currentBuffer.remaining() == 0)
+                {
+                    _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
+                }
+                _socketChannel.read(_currentBuffer);
+                remaining = _currentBuffer.remaining();
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Read " + _currentBuffer.position() + " byte(s)");
+                }
+                ByteBuffer dup = _currentBuffer.duplicate();
+                dup.flip();
+                _currentBuffer = _currentBuffer.slice();
+                _receiver.received(dup);
+            }
+        }
+        else if(_transportEncryption == TransportEncryption.TLS)
         {
-            if(_currentBuffer == null || _currentBuffer.remaining() == 0)
+            int read = 1;
+            int unwrapped = 0;
+            while(!_closed.get() && (read > 0 || unwrapped > 0) && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
             {
-                _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
+                read = _socketChannel.read(_netInputBuffer);
+                LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
+                _netInputBuffer.flip();
+                ByteBuffer appInputBuffer =
+                        ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+
+                _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
+                LOGGER.debug("Status: " +_status.getStatus() + " HandshakeStatus " + _status.getHandshakeStatus());
+                _netInputBuffer.compact();
+
+                appInputBuffer.flip();
+                unwrapped = appInputBuffer.remaining();
+                LOGGER.debug("Unwrapped to " + unwrapped + " bytes");
+
+                _receiver.received(appInputBuffer);
+
+                runSSLEngineTasks(_status);
             }
-            _socketChannel.read(_currentBuffer);
-            remaining = _currentBuffer.remaining();
-            if (LOGGER.isDebugEnabled())
+        }
+    }
+
+    private void runSSLEngineTasks(final SSLEngineResult status)
+    {
+        if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
+        {
+            Runnable task;
+            while((task = _sslEngine.getDelegatedTask()) != null)
             {
-                LOGGER.debug("Read " + _currentBuffer.position() + " byte(s)");
+                LOGGER.debug("Running task");
+                task.run();
             }
-            ByteBuffer dup = _currentBuffer.duplicate();
-            dup.flip();
-            _currentBuffer = _currentBuffer.slice();
-            _receiver.received(dup);
         }
     }
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java Wed Dec 10 14:51:15 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.transport.networ
 
 
 import java.nio.ByteBuffer;
+import java.util.Set;
 
 import javax.net.ssl.SSLContext;
 
@@ -150,7 +151,9 @@ public class TransportTest extends QpidT
         }
 
         public void accept(NetworkTransportConfiguration config,
-                           ProtocolEngineFactory factory, SSLContext sslContext)
+                           ProtocolEngineFactory factory,
+                           SSLContext sslContext,
+                           final Set<TransportEncryption> encryptionSet)
         {
             throw new UnsupportedOperationException();
         }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes?rev=1644437&r1=1644436&r2=1644437&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes Wed Dec 10 14:51:15 2014
@@ -33,7 +33,6 @@ org.apache.qpid.test.client.message.Sele
 
 
 // QPID-6262: Temporary exclusion whilst NIO refactoring is in flight
-org.apache.qpid.client.ssl.SSLTest#*
 org.apache.qpid.server.transport.TCPandSSLTransportTest#*
 org.apache.qpid.server.security.auth.manager.ExternalAuthenticationTest#*
 org.apache.qpid.server.logging.BrokerLoggingTest#testBrokerStartupListeningTCPSSL



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org