You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2008/11/04 23:40:06 UTC

svn commit: r711455 - in /incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid: ssl/ transport/network/io/ transport/network/ssl/

Author: rajith
Date: Tue Nov  4 14:40:06 2008
New Revision: 711455

URL: http://svn.apache.org/viewvc?rev=711455&view=rev
Log:
This check in is related to QPID-1296
Since SSLSocket didn't support shutdownInput method, I added an SSL layer on top of the TCP transport.
The SSL layer uses the SSLEngine class introduced in java 1.5.
This Layer can be used with minimum code changes to the the nio transport as well.

Added:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
Modified:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java?rev=711455&r1=711454&r2=711455&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java Tue Nov  4 14:40:06 2008
@@ -41,38 +41,74 @@
 	/**
 	 * Path to the Java keystore file
 	 */
-	private String _keystorePath;
+	private String _keyStorePath;
 	
 	/**
 	 * Password for the keystore
 	 */
-	private String _keystorePassword;
+	private String _keyStorePassword;
 	
 	/**
-	 * Cert type to use
+	 * Cert type to use in keystore
 	 */
-	private String _certType;
+	private String _keyStoreCertType;
 	
 	/**
+     * Path to the Java truststore file
+     */
+    private String _trustStorePath;
+    
+    /**
+     * Password for the truststore
+     */
+    private String _trustStorePassword;
+    
+    /**
+     * Cert type to use in truststore
+     */
+    private String _trustStoreCertType;
+    
+	
+    
+    public SSLContextFactory(String trustStorePath, String trustStorePassword,
+            String trustStoreCertType) 
+    {
+        this(trustStorePath,trustStorePassword,trustStoreCertType,
+                          trustStorePath,trustStorePassword,trustStoreCertType);
+    }
+
+    /**
 	 * Create a factory instance
 	 * @param keystorePath path to the Java keystore file
 	 * @param keystorePassword password for the Java keystore
 	 * @param certType certificate type
 	 */
-	public SSLContextFactory(String keystorePath, String keystorePassword,
-			String certType) 
+	public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
+            String keyStorePath, String keyStorePassword, String keyStoreCertType) 
 	{
-		_keystorePath = keystorePath;
-		_keystorePassword = keystorePassword;
-		if (_keystorePassword.equals("none"))
+
+	    _trustStorePath = trustStorePath;
+        _trustStorePassword = trustStorePassword;
+                
+        if (_trustStorePassword.equals("none"))
+        {
+            _trustStorePassword = null;
+        }
+        _trustStoreCertType = trustStoreCertType;
+        
+	    _keyStorePath = keyStorePath;
+		_keyStorePassword = keyStorePassword;
+				
+		if (_keyStorePassword.equals("none"))
 		{
-			_keystorePassword = null;
+			_keyStorePassword = null;
 		}
-		_certType = certType;
-		if (keystorePath == null) {
-			throw new IllegalArgumentException("Keystore path must be specified");
+		_keyStoreCertType = keyStoreCertType;
+		
+		if (_trustStorePath == null) {
+			throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
 		}
-		if (certType == null) {
+		if (_trustStoreCertType == null) {
 			throw new IllegalArgumentException("Cert type must be specified");
 		}
 	}
@@ -86,16 +122,18 @@
 	public SSLContext buildServerContext() throws GeneralSecurityException, IOException
 	{
         // Create keystore
-		KeyStore ks = getInitializedKeyStore();
+		KeyStore ks = getInitializedKeyStore(_keyStorePath,_keyStorePassword);
 
         // Set up key manager factory to use our key store
-        KeyManagerFactory kmf = KeyManagerFactory.getInstance(_certType);
-        kmf.init(ks, _keystorePassword.toCharArray());
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance(_keyStoreCertType);
+        kmf.init(ks, _keyStorePassword.toCharArray());
 
+        KeyStore ts = getInitializedKeyStore(_trustStorePath,_trustStorePassword);
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
+        tmf.init(ts);
+        
         // Initialize the SSLContext to work with our key managers.
-        SSLContext sslContext = SSLContext.getInstance("TLS");
-        TrustManagerFactory tmf = TrustManagerFactory.getInstance(_certType);
-        tmf.init(ks);
+        SSLContext sslContext = SSLContext.getInstance("TLS");        
         sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
 
         return sslContext;		
@@ -109,34 +147,34 @@
 	 */
 	public SSLContext buildClientContext() throws GeneralSecurityException, IOException
 	{
-		KeyStore ks = getInitializedKeyStore();
-        TrustManagerFactory tmf = TrustManagerFactory.getInstance(_certType);
+		KeyStore ks = getInitializedKeyStore(_trustStorePath,_trustStorePassword);
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
         tmf.init(ks);
         SSLContext context = SSLContext.getInstance("TLS");
         context.init(null, tmf.getTrustManagers(), null);
         return context;		
 	}
 	
-	private KeyStore getInitializedKeyStore() throws GeneralSecurityException, IOException
+	private KeyStore getInitializedKeyStore(String storePath, String storePassword) throws GeneralSecurityException, IOException
 	{
         KeyStore ks = KeyStore.getInstance("JKS");
         InputStream in = null;
         try
         {
-        	File f = new File(_keystorePath);
+        	File f = new File(storePath);
         	if (f.exists())
         	{
         		in = new FileInputStream(f);
         	}
         	else 
         	{
-        		in = Thread.currentThread().getContextClassLoader().getResourceAsStream(_keystorePath);
+        		in = Thread.currentThread().getContextClassLoader().getResourceAsStream(storePath);
         	}
             if (in == null)
             {
-                throw new IOException("Unable to load keystore resource: " + _keystorePath);
+                throw new IOException("Unable to load keystore resource: " + storePath);
             }
-            ks.load(in, _keystorePassword.toCharArray());
+            ks.load(in, storePassword.toCharArray());
         }
         finally
         {

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java?rev=711455&r1=711454&r2=711455&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java Tue Nov  4 14:40:06 2008
@@ -69,7 +69,7 @@
             try
             {
                 Socket sock = socket.accept();
-                IoTransport<E> transport = new IoTransport<E>(sock, binding);
+                IoTransport<E> transport = new IoTransport<E>(sock, binding,false);
             }
             catch (IOException e)
             {

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=711455&r1=711454&r2=711455&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java Tue Nov  4 14:40:06 2008
@@ -26,19 +26,20 @@
 import java.net.SocketException;
 import java.nio.ByteBuffer;
 
-import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.ssl.SSLContextFactory;
 import org.apache.qpid.transport.Binding;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.ConnectionBinding;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.ssl.SSLReceiver;
+import org.apache.qpid.transport.network.ssl.SSLSender;
 import org.apache.qpid.transport.util.Logger;
 
 /**
@@ -70,21 +71,53 @@
         ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
 
     private Socket socket;
-    private IoSender sender;
+    private Sender<ByteBuffer> sender;
     private E endpoint;
     private IoReceiver receiver;
     private long timeout = 60000;
 
-    IoTransport(Socket socket, Binding<E,ByteBuffer> binding)
+    IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl)
     {
         this.socket = socket;
-        this.sender = new IoSender(this, 2*writeBufferSize, timeout);
-        this.endpoint = binding.endpoint(sender);
-        this.receiver = new IoReceiver(this, binding.receiver(endpoint),
-                                       2*readBufferSize, timeout);
+        
+        if (ssl)
+        {
+            SSLEngine engine = null;
+            SSLContext sslCtx;
+            try
+            {
+                sslCtx = createSSLContext();
+            }
+            catch (Exception e)
+            {
+                throw new TransportException("Error creating SSL Context", e);
+            }
+            
+            try
+            {
+                engine = sslCtx.createSSLEngine();
+                engine.setUseClientMode(true);
+            }
+            catch(Exception e)
+            {
+                throw new TransportException("Error creating SSL Engine", e);
+            }
+            
+            this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout));
+            this.endpoint = binding.endpoint(sender);
+            this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
+                                           2*readBufferSize, timeout);
+        }
+        else
+        {
+            this.sender = new IoSender(this, 2*writeBufferSize, timeout);
+            this.endpoint = binding.endpoint(sender);
+            this.receiver = new IoReceiver(this, binding.receiver(endpoint),
+                                           2*readBufferSize, timeout);
+        }
     }
 
-    IoSender getSender()
+    Sender<ByteBuffer> getSender()
     {
         return sender;
     }
@@ -103,8 +136,8 @@
                                       Binding<E,ByteBuffer> binding,
                                       boolean ssl)
     {
-        Socket socket = createSocket(host, port,ssl);
-        IoTransport<E> transport = new IoTransport<E>(socket, binding);
+        Socket socket = createSocket(host, port);
+        IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl);
         return transport.endpoint;
     }
 
@@ -144,21 +177,12 @@
 
     }
 
-    private static Socket createSocket(String host, int port, boolean ssl)
+    private static Socket createSocket(String host, int port)
     {
         try
         {
             InetAddress address = InetAddress.getByName(host);
-            Socket socket;
-            if (ssl)
-            {
-                SSLSocketFactory sslSocketfactory = (SSLSocketFactory) SSLSocketFactory.getDefault();
-                socket = sslSocketfactory.createSocket();
-            }
-            else
-            {
-                socket = new Socket();
-            }
+            Socket socket = new Socket();
             socket.setReuseAddress(true);
             socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
 
@@ -183,5 +207,23 @@
             throw new TransportException("Error connecting to broker", e);
         }
     }
+    
+    private SSLContext createSSLContext() throws Exception
+    {
+        String trustStorePath = System.getProperty("javax.net.ssl.trustStore");
+        String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");
+        String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");
+                
+        String keyStorePath = System.getProperty("javax.net.ssl.keyStore",trustStorePath);
+        String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword",trustStorePassword);
+        String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");
+        
+        SSLContextFactory sslContextFactory = new SSLContextFactory(trustStorePath,trustStorePassword,
+                                                                    trustStoreCertType,keyStorePath,
+                                                                    keyStorePassword,keyStoreCertType);
+        
+        return sslContextFactory.buildServerContext();
+        
+    }
 
 }

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java?rev=711455&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java Tue Nov  4 14:40:06 2008
@@ -0,0 +1,159 @@
+package org.apache.qpid.transport.network.ssl;
+
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLReceiver implements Receiver<ByteBuffer>
+{
+    private Receiver<ByteBuffer> delegate;
+    private SSLEngine engine;
+    private SSLSender sender;    
+    private int sslBufSize;
+    private ByteBuffer appData;
+    private ByteBuffer localBuffer;
+    private boolean dataCached = false;
+    private final Object notificationToken;
+    
+    private static final Logger log = Logger.get(SSLReceiver.class);
+    
+    public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender)
+    {
+        this.engine = engine;
+        this.delegate = delegate;
+        this.sender = sender;
+        this.sslBufSize = engine.getSession().getApplicationBufferSize();   
+        appData = ByteBuffer.allocate(sslBufSize);
+        localBuffer = ByteBuffer.allocate(sslBufSize);
+        notificationToken = sender.getNotificationToken();
+    }
+    
+    public void closed()
+    {        
+       delegate.closed();
+    }
+
+    public void exception(Throwable t)
+    {
+        delegate.exception(t);        
+    }
+    
+    private ByteBuffer addPreviouslyUnreadData(ByteBuffer buf)
+    {
+        if (dataCached)
+        {
+            ByteBuffer b = ByteBuffer.allocate(localBuffer.remaining() + buf.remaining());
+            b.put(localBuffer);
+            b.put(buf);
+            b.flip();
+            dataCached = false;
+            return b;
+        }
+        else
+        {
+            return buf;
+        }
+    }
+
+    public void received(ByteBuffer buf)
+    {
+        ByteBuffer netData = addPreviouslyUnreadData(buf);
+        
+        HandshakeStatus handshakeStatus;
+        Status status;
+        
+        while (netData.hasRemaining())
+        {               
+            try
+            {
+                SSLEngineResult result = engine.unwrap(netData, appData);
+                int read = result.bytesProduced();
+                status = result.getStatus();
+                handshakeStatus = result.getHandshakeStatus();  
+                
+                if (read > 0)
+                {
+                    int limit = appData.limit();
+                    appData.limit(appData.position());
+                    appData.position(appData.position() - read);
+                    
+                    ByteBuffer data = appData.slice();
+                    
+                    appData.limit(limit);
+                    appData.position(appData.position() + read);
+                    
+                    delegate.received(data);       
+                }     
+                
+                
+                switch(status) 
+                {
+                    case CLOSED:
+                        synchronized(notificationToken)
+                        {
+                            notificationToken.notifyAll();
+                        }
+                        return;
+                    
+                    case BUFFER_OVERFLOW:
+                        appData = ByteBuffer.allocate(sslBufSize);
+                        continue;
+                     
+                    case BUFFER_UNDERFLOW:
+                        localBuffer.clear();
+                        localBuffer.put(netData);
+                        localBuffer.flip();
+                        dataCached = true;
+                        break;
+                        
+                    case OK:                        
+                        break; // do nothing 
+                    
+                    default:
+                        throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+                }       
+                               
+                switch (handshakeStatus)
+                {
+                    case NEED_UNWRAP:
+                        if (netData.hasRemaining())
+                        {
+                            continue;
+                        }
+                        break;
+                    
+                    case NEED_TASK:
+                        sender.doTasks();
+                        handshakeStatus = engine.getHandshakeStatus();
+                       
+                    case NEED_WRAP: 
+                    case FINISHED:
+                    case NOT_HANDSHAKING:                        
+                        synchronized(notificationToken)
+                        {
+                            notificationToken.notifyAll();
+                        }
+                        break; 
+                        
+                    default:
+                        throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+                }
+                
+                    
+            }
+            catch(SSLException e)
+            {
+                throw new TransportException("Error in SSLReceiver",e);
+            }
+               
+        }
+    }
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java?rev=711455&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java Tue Nov  4 14:40:06 2008
@@ -0,0 +1,178 @@
+package org.apache.qpid.transport.network.ssl;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLSender implements Sender<ByteBuffer>
+{
+    private Sender<ByteBuffer> delegate;
+    private SSLEngine engine;
+    private int sslBufSize;
+    private ByteBuffer netData;
+    
+    private final Object engineState = new Object();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    
+    private static final Logger log = Logger.get(SSLSender.class);    
+    
+    public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate)
+    {
+        this.engine = engine;
+        this.delegate = delegate;        
+        sslBufSize = engine.getSession().getPacketBufferSize();
+        netData = ByteBuffer.allocate(sslBufSize);      
+    }
+
+    public void close()
+    {
+        if (!closed.getAndSet(true))
+        {
+            if (engine.isOutboundDone())
+            {
+                return;
+            }
+            log.debug("Closing SSL connection");
+            engine.closeOutbound();
+            send(ByteBuffer.allocate(0));
+            flush();  
+            while (!engine.isOutboundDone())
+            {
+                synchronized(engineState)
+                {
+                    try
+                    {
+                        engineState.wait();
+                    }
+                    catch(InterruptedException e)
+                    {
+                        // pass
+                    }
+                }
+            }
+            delegate.close();
+        }
+    }
+
+    public void flush()
+    {
+        delegate.flush();        
+    }
+
+    public void send(ByteBuffer appData)
+    {
+        if (closed.get())
+        {
+            throw new SenderException("SSL Sender is closed");
+        }   
+
+        HandshakeStatus handshakeStatus;
+        Status status;
+        
+        while(appData.hasRemaining())
+        {        
+
+            int read = 0;
+            try
+            {
+                SSLEngineResult result = engine.wrap(appData, netData);        
+                read   = result.bytesProduced();
+                status = result.getStatus();
+                handshakeStatus = result.getHandshakeStatus();
+                
+            }
+            catch(SSLException e)
+            {
+                throw new SenderException("SSL, Error occurred while encrypting data",e);
+            }            
+            
+            if(read > 0)
+            {
+                int limit = netData.limit();
+                netData.limit(netData.position());
+                netData.position(netData.position() - read);
+                
+                ByteBuffer data = netData.slice();
+                
+                netData.limit(limit);
+                netData.position(netData.position() + read);
+                
+                delegate.send(data);
+            }
+            
+            switch(status) 
+            {
+                case CLOSED:
+                    throw new SenderException("SSLEngine is closed");
+                
+                case BUFFER_OVERFLOW:
+                    netData.clear();
+                    continue;
+                    
+                case OK:                        
+                    break; // do nothing 
+                
+                default:
+                    throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+            }          
+            
+            switch (handshakeStatus)
+            {
+                case NEED_WRAP:
+                    if (netData.hasRemaining())
+                    {
+                        continue;
+                    }
+                
+                case NEED_TASK:
+                    doTasks();
+                    break;
+                   
+                case NEED_UNWRAP:
+                    flush();
+                    synchronized(engineState)
+                    {
+                        try
+                        {
+                            engineState.wait();
+                        }
+                        catch(InterruptedException e)
+                        {
+                            // pass
+                        }
+                    }
+                    break;
+                    
+                case FINISHED:                     
+                case NOT_HANDSHAKING:
+                    break; //do  nothing
+                      
+                default:
+                    throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+            }
+            
+        }
+    }
+    
+    public void doTasks() 
+    {
+        Runnable runnable;
+        while ((runnable = engine.getDelegatedTask()) != null) {
+            runnable.run();
+        }
+    }    
+    
+    public Object getNotificationToken()
+    {
+        return engineState;
+    }
+}