You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/03/19 16:16:47 UTC

svn commit: r925261 - in /qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport: ./ network/ network/io/ network/security/

Author: rajith
Date: Fri Mar 19 15:16:46 2010
New Revision: 925261

URL: http://svn.apache.org/viewvc?rev=925261&view=rev
Log:
Did some general refactoring to support QPID-2444 QPID-2445 QPID-2446 QPID-2447
1. Provided a mechanism to register any transport using qpid.transport jvm arg.
2. Created a TransportBuilder class which abstracts the building of receiver and sender pipes.
3. Created a ConnectionSettings class that contains all connection level parameters. This enhancement is also required to support QPID-2343
4. Added SecurityLayer class that adds the nessacery codecs and cordination to retrive the userID for mechanisms like EXTERNAL

Added:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
Modified:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=925261&r1=925260&r2=925261&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Mar 19 15:16:46 2010
@@ -25,14 +25,7 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.Connection.State.NEW;
 import static org.apache.qpid.transport.Connection.State.OPEN;
 import static org.apache.qpid.transport.Connection.State.OPENING;
-import org.apache.qpid.transport.network.ConnectionBinding;
-import org.apache.qpid.transport.network.io.IoTransport;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.transport.util.Waiter;
-import org.apache.qpid.util.Strings;
 
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslServer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -40,6 +33,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
+
+import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
+import org.apache.qpid.util.Strings;
+
 
 /**
  * Connection
@@ -109,7 +110,8 @@ public class Connection extends Connecti
     private Map<String,Object> _serverProperties;
     private String userID;
     private ConnectionSettings conSettings;
-
+    private SecurityLayer securityLayer;
+    
     // want to make this final
     private int _connectionId;
 
@@ -215,10 +217,17 @@ public class Connection extends Connecti
             userID = settings.getUsername();
             delegate = new ClientDelegate(settings);
 
-            IoTransport.connect(settings.getHost(),
+            /*IoTransport.connect(settings.getHost(),
                                 settings.getPort(),
                                 ConnectionBinding.get(this),
-                                settings.isUseSSL());
+                                settings.isUseSSL());*/
+            
+            TransportBuilder transport = new TransportBuilder();
+            transport.init(this);
+            this.sender = transport.buildSenderPipe();
+            transport.buildReceiverPipe(this);
+            this.securityLayer = transport.getSecurityLayer();
+            
             send(new ProtocolHeader(1, 0, 10));
 
             Waiter w = new Waiter(lock, timeout);
@@ -633,5 +642,10 @@ public class Connection extends Connecti
     {
         return conSettings;
     }
+    
+    public SecurityLayer getSecurityLayer()
+    {
+        return securityLayer;
+    }
 
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java?rev=925261&r1=925260&r2=925261&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java Fri Mar 19 15:16:46 2010
@@ -36,47 +36,32 @@ public class ConnectionSettings
     String username = "guest";
     String password = "guest";
     int port = 5672;
-    boolean tcpNodelay;
+    boolean tcpNodelay = Boolean.getBoolean("amqj.tcp_nodelay");
     int maxChannelCount = 32767;
     int maxFrameSize = 65535;
     int heartbeatInterval;
+    int readBufferSize = 65535;
+    int writeBufferSize = 65535;
+    long transportTimeout = 60000;
     
     // SSL props
     boolean useSSL;
-    String keyStorePath;
-    String keyStorePassword;
-    String keyStoreCertType;
-    String trustStoreCertType;
-    String trustStorePath;
-    String trustStorePassword;
+    String keyStorePath = System.getProperty("javax.net.ssl.keyStore");
+    String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword");
+    String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");;
+    String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");;
+    String trustStorePath = System.getProperty("javax.net.ssl.trustStore");;
+    String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");;
     String certAlias;
     boolean verifyHostname;
     
     // SASL props
-    String saslMechs = "PLAIN";
-    String saslProtocol = "AMQP";
-    String saslServerName = "localhost";
+    String saslMechs = System.getProperty("qpid.sasl_mechs", "PLAIN");
+    String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP");
+    String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost");
     boolean useSASLEncryption;
-    
-    private Connection owner;
-    
+   
     private Map<String, Object> _clientProperties;
-
-    public Connection getConnection()
-    {
-        return owner;
-    }
-
-    public void setConnection(Connection owner)
-    {
-        if (this.owner != null)
-        {
-            throw new IllegalStateException(
-                    "A ConnectionSettings instance can be associated" +
-                    " with one and only one Connection instance");
-        }
-        this.owner = owner;
-    }
     
     public boolean isTcpNodelay()
     {
@@ -318,4 +303,34 @@ public class ConnectionSettings
         this.trustStoreCertType = trustStoreCertType;
     }
 
+    public int getReadBufferSize()
+    {
+        return readBufferSize;
+    }
+
+    public void setReadBufferSize(int readBufferSize)
+    {
+        this.readBufferSize = readBufferSize;
+    }
+
+    public int getWriteBufferSize()
+    {
+        return writeBufferSize;
+    }
+
+    public void setWriteBufferSize(int writeBufferSize)
+    {
+        this.writeBufferSize = writeBufferSize;
+    }
+    
+    public long getTransportTimeout()
+    {
+        return transportTimeout;
+    }
+
+    public void setTransportTimeout(long transportTimeout)
+    {
+        this.transportTimeout = transportTimeout;
+    }
+
 }

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.security.SecurityLayer;
+
+public class TransportBuilder
+{
+    private Connection con;
+    private ConnectionSettings settings;
+    private NetworkTransport transport;
+    private SecurityLayer securityLayer = new SecurityLayer();
+    
+    public void init(Connection con) throws TransportException
+    {
+        this.con = con;
+        this.settings = con.getConnectionSettings();
+        transport = Transport.getTransport();    
+        transport.init(settings);        
+        securityLayer.init(con);
+    }
+
+    public Sender<ProtocolEvent> buildSenderPipe()
+    {
+        ConnectionSettings settings = con.getConnectionSettings();
+        
+        // Io layer
+        Sender<ByteBuffer> sender = transport.sender();
+        
+        // Security layer 
+        sender = securityLayer.sender(sender);
+        
+        Disassembler dis = new Disassembler(sender, settings.getMaxFrameSize());
+        return dis;
+    }
+    
+    public void buildReceiverPipe(Receiver<ProtocolEvent> delegate)
+    {
+        ConnectionSettings settings = con.getConnectionSettings();
+        
+        Receiver<ByteBuffer> receiver = new InputHandler(new Assembler(delegate));
+        
+        // Security layer 
+        receiver = securityLayer.receiver(receiver);
+        
+        //Io layer
+        transport.receiver(receiver);        
+    } 
+    
+    public SecurityLayer getSecurityLayer()
+    {
+        return securityLayer;
+    }
+
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ConnectionSettings;
+
+public interface NetworkTransport
+{
+    public void init(ConnectionSettings settings);
+    
+    public Sender<ByteBuffer> sender();
+    
+    public void receiver(Receiver<ByteBuffer> delegate);    
+    
+    public void close();
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,56 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+package org.apache.qpid.transport.network;
+
+import org.apache.qpid.transport.TransportException;
+
+public class Transport
+{    
+    private final static Class<?> transportClass;
+    
+    static 
+    {
+        try
+        {
+            transportClass = 
+                Class.forName(System.getProperty("qpid.transport", 
+                                                 "org.apache.qpid.transport.network.io.IoNetworkTransport"));
+            
+        }
+        catch(Exception e)
+        {
+            throw new Error("Error occured while loading Qpid Transport",e);
+        }
+    }
+    
+    public static NetworkTransport getTransport() throws TransportException
+    {
+        try
+        {
+            return (NetworkTransport)transportClass.newInstance();
+        }
+        catch (Exception e)
+        {
+            throw new TransportException("Error while creating a new transport instance",e);
+        }
+    }
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,15 @@
+package org.apache.qpid.transport.network.io;
+
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Sender;
+
+public interface IoContext
+{
+    Sender<ByteBuffer> getSender();
+    
+    IoReceiver getReceiver();
+
+    Socket getSocket();
+}

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,120 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.NetworkTransport;
+import org.apache.qpid.transport.util.Logger;
+
+public class IoNetworkTransport implements NetworkTransport, IoContext
+{
+    static
+    {
+        org.apache.mina.common.ByteBuffer.setAllocator
+            (new org.apache.mina.common.SimpleByteBufferAllocator());
+        org.apache.mina.common.ByteBuffer.setUseDirectBuffers
+            (Boolean.getBoolean("amqj.enableDirectBuffers"));
+    }
+
+    private static final Logger log = Logger.get(IoNetworkTransport.class);
+
+    private Socket socket;
+    private Sender<ByteBuffer> sender;
+    private IoReceiver receiver;
+    private long timeout = 60000; 
+    private ConnectionSettings settings;    
+    
+    @Override
+    public void init(ConnectionSettings settings)
+    {
+        try
+        {
+            this.settings = settings;
+            InetAddress address = InetAddress.getByName(settings.getHost());
+            socket = new Socket();
+            socket.setReuseAddress(true);
+            socket.setTcpNoDelay(settings.isTcpNodelay());
+
+            log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+            log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+            socket.setSendBufferSize(settings.getWriteBufferSize());
+            socket.setReceiveBufferSize(settings.getReadBufferSize());
+
+            log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+            log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+            socket.connect(new InetSocketAddress(address, settings.getPort()));
+        }
+        catch (SocketException e)
+        {
+            throw new TransportException("Error connecting to broker", e);
+        }
+        catch (IOException e)
+        {
+            throw new TransportException("Error connecting to broker", e);
+        }
+    }
+
+    @Override
+    public void receiver(Receiver<ByteBuffer> delegate)
+    {
+        receiver = new IoReceiver(this, delegate,
+                2*settings.getReadBufferSize() , timeout);
+    }
+
+    @Override
+    public Sender<ByteBuffer> sender()
+    {
+        return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
+    }
+    
+    @Override
+    public void close()
+    {
+        
+    }
+
+    public Sender<ByteBuffer> getSender()
+    {
+        return sender;
+    }
+
+    public IoReceiver getReceiver()
+    {
+        return receiver;
+    }
+
+    public Socket getSocket()
+    {
+        return socket;
+    }
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=925261&r1=925260&r2=925261&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Fri Mar 19 15:16:46 2010
@@ -42,7 +42,7 @@ final class IoReceiver implements Runnab
 
     private static final Logger log = Logger.get(IoReceiver.class);
 
-    private final IoTransport transport;
+    private final IoContext ioCtx;
     private final Receiver<ByteBuffer> receiver;
     private final int bufferSize;
     private final Socket socket;
@@ -52,13 +52,13 @@ final class IoReceiver implements Runnab
     private final boolean shutdownBroken =
         ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
 
-    public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
+    public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver,
                       int bufferSize, long timeout)
     {
-        this.transport = transport;
+        this.ioCtx = ioCtx;
         this.receiver = receiver;
         this.bufferSize = bufferSize;
-        this.socket = transport.getSocket();
+        this.socket = ioCtx.getSocket();
         this.timeout = timeout;
 
         try

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=925261&r1=925260&r2=925261&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Fri Mar 19 15:16:46 2010
@@ -43,7 +43,7 @@ public final class IoSender implements R
     // we can test other cases as well
     private final static int START = Integer.MAX_VALUE - 10;
 
-    private final IoTransport transport;
+    private final IoContext ioCtx;
     private final long timeout;
     private final Socket socket;
     private final OutputStream out;
@@ -60,10 +60,10 @@ public final class IoSender implements R
     private volatile Throwable exception = null;
 
 
-    public IoSender(IoTransport transport, int bufferSize, long timeout)
+    public IoSender(IoContext ioCtx, int bufferSize, long timeout)
     {
-        this.transport = transport;
-        this.socket = transport.getSocket();
+        this.ioCtx = ioCtx;
+        this.socket = ioCtx.getSocket();
         this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
         this.timeout = timeout;
 
@@ -207,7 +207,7 @@ public final class IoSender implements R
                         throw new SenderException("join timed out");
                     }
                 }
-                transport.getReceiver().close(false);
+                ioCtx.getReceiver().close(false);
             }
             catch (InterruptedException e)
             {

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=925261&r1=925260&r2=925261&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java Fri Mar 19 15:16:46 2010
@@ -51,7 +51,7 @@ import org.apache.qpid.transport.util.Lo
  * SO_RCVBUF    - amqj.receiveBufferSize
  * SO_SNDBUF    - amqj.sendBufferSize
  */
-public final class IoTransport<E>
+public final class IoTransport<E> implements IoContext
 {
 
     static
@@ -119,17 +119,17 @@ public final class IoTransport<E>
         }
     }
 
-    Sender<ByteBuffer> getSender()
+    public Sender<ByteBuffer> getSender()
     {
         return sender;
     }
 
-    IoReceiver getReceiver()
+    public IoReceiver getReceiver()
     {
         return receiver;
     }
 
-    Socket getSocket()
+    public Socket getSocket()
     {
         return socket;
     }

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,185 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.security;
+
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionListener;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
+import org.apache.qpid.transport.network.security.sasl.SASLSender;
+import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
+import org.apache.qpid.transport.network.security.ssl.SSLSender;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+public class SecurityLayer
+{
+    ConnectionSettings settings;
+    Connection con;
+    SSLSecurityLayer sslLayer;
+    SASLSecurityLayer saslLayer;
+    
+    public void init(Connection con) throws TransportException
+    {
+        this.con = con;
+        this.settings = con.getConnectionSettings();
+        if (settings.isUseSSL())
+        {
+            sslLayer = new SSLSecurityLayer();
+        }
+        if (settings.isUseSASLEncryption())
+        {
+            saslLayer = new SASLSecurityLayer();
+        }        
+        
+    }
+    
+    public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+    {
+        Sender<ByteBuffer> sender = delegate;
+        
+        if (settings.isUseSSL())
+        {
+            sender = sslLayer.sender(sender);
+        }     
+        
+        if (settings.isUseSASLEncryption())
+        {
+            sender = saslLayer.sender(sender);
+        }
+        
+        return sender;
+    }
+    
+    public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+    {
+        Receiver<ByteBuffer> receiver = delegate;
+        
+        if (settings.isUseSSL())
+        {
+            receiver = sslLayer.receiver(receiver);
+        }        
+        
+        if (settings.isUseSASLEncryption())
+        {
+            receiver = saslLayer.receiver(receiver);
+        }
+        
+        return receiver;
+    }
+    
+    public String getUserID()
+    {
+        if (settings.isUseSSL())
+        {
+            return sslLayer.getUserID();
+        }
+        else
+        {
+            return null;
+        }
+    }
+    
+    class SSLSecurityLayer
+    {
+        SSLEngine engine;
+        SSLSender sender;
+                
+        public SSLSecurityLayer() 
+        {
+            SSLContext sslCtx;
+            try
+            {
+                sslCtx = SSLUtil.createSSLContext(settings);
+            }
+            catch (Exception e)
+            {
+                throw new TransportException("Error creating SSL Context", e);
+            }
+            
+            try
+            {
+                engine = sslCtx.createSSLEngine();
+                engine.setUseClientMode(true);
+            }
+            catch(Exception e)
+            {
+                throw new TransportException("Error creating SSL Engine", e);
+            }
+        }
+        
+        public SSLSender sender(Sender<ByteBuffer> delegate)
+        {
+            sender = new SSLSender(engine,delegate);
+            sender.setConnectionSettings(settings);
+            return sender;
+        }
+        
+        public SSLReceiver receiver(Receiver<ByteBuffer> delegate)
+        {
+            if (sender == null)
+            {
+                throw new  
+                IllegalStateException("SecurityLayer.sender method should be " +
+                		"invoked before SecurityLayer.receiver");
+            }
+            
+            SSLReceiver receiver = new SSLReceiver(engine,delegate,sender);
+            receiver.setConnectionSettings(settings);
+            return receiver;
+        }
+        
+        public String getUserID()
+        {
+            return null;
+        }
+        
+    }
+    
+    class SASLSecurityLayer
+    {
+        public SASLSecurityLayer() 
+        {
+        }
+        
+        public SASLSender sender(Sender<ByteBuffer> delegate)
+        {
+            SASLSender sender = new SASLSender(delegate);
+            con.addConnectionListener((ConnectionListener)sender);
+            return sender;
+        }
+        
+        public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
+        {
+            SASLReceiver receiver = new SASLReceiver(delegate);
+            con.addConnectionListener((ConnectionListener)receiver);
+            return receiver;
+        }
+        
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org