You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [35/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java Fri Oct 21 01:19:00 2011
@@ -26,8 +26,11 @@ import org.apache.qpid.framing.abstracti
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
 import org.apache.qpid.framing.*;
 
+import org.apache.mina.common.ByteBuffer;
+
 public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
 {
     private int _basicPublishClassId;
@@ -57,9 +60,9 @@ public class MethodConverter_8_0 extends
                 return contentBodyChunk.getSize();
             }
 
-            public byte[] getData()
+            public ByteBuffer getData()
             {
-                return contentBodyChunk._payload;
+                return contentBodyChunk.payload;
             }
 
             public void reduceToFit()
@@ -78,9 +81,9 @@ public class MethodConverter_8_0 extends
                 
     }
    
-    public AMQBody convertToBody(byte[] data)
+    public AMQBody convertToBody(java.nio.ByteBuffer buf)
     {
-        return new ContentBody(data);
+        return new ContentBody(ByteBuffer.wrap(buf));
     }
 
     public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Fri Oct 21 01:19:00 2011
@@ -80,7 +80,7 @@ public final class AMQConstant
     /**
      * An operator intervened to close the connection for some reason. The client may retry at some later date.
      */
-    public static final AMQConstant CONNECTION_FORCED = new AMQConstant(320, "connection forced", true);
+    public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
 
     /** The client tried to work with an unknown virtual host or cluster. */
     public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
@@ -104,7 +104,7 @@ public final class AMQConstant
 
     public static final AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "Request Timeout", true);
 
-    public static final AMQConstant ARGUMENT_INVALID = new AMQConstant(409, "argument invalid", true);
+    public static final AMQConstant INVALID_ARGUMENT = new AMQConstant(409, "argument invalid", true);
 
     /**
      * The client sent a malformed frame that the server could not decode. This strongly implies a programming error
@@ -153,7 +153,10 @@ public final class AMQConstant
 
     public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true);
 
-    public static final AMQConstant INVALID_ARGUMENT = new AMQConstant(542, "invalid argument", true);
+    /**
+     * The server does not support the protocol version
+     */
+    public static final AMQConstant UNSUPPORTED_BROKER_PROTOCOL_ERROR = new AMQConstant(542, "broker unsupported protocol", true);
     /**
      * The client imp does not support the protocol version
      */

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Fri Oct 21 01:19:00 2011
@@ -21,11 +21,10 @@
 package org.apache.qpid.protocol;
 
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
 
 /**
  * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
@@ -33,6 +32,9 @@ import org.apache.qpid.transport.network
  */
 public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
 {
+   // Sets the network driver providing data for this ProtocolEngine
+   void setNetworkDriver (NetworkDriver driver);
+
    // Returns the remote address of the NetworkDriver
    SocketAddress getRemoteAddress();
 
@@ -56,6 +58,4 @@ public interface ProtocolEngine extends 
    void readerIdle();
 
 
-    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
-
 }
\ No newline at end of file

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java Fri Oct 21 01:19:00 2011
@@ -20,12 +20,12 @@
  */
 package org.apache.qpid.protocol;
 
-import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.NetworkDriver;
 
 public interface ProtocolEngineFactory  
 { 
  
   // Returns a new instance of a ProtocolEngine 
-  ProtocolEngine newProtocolEngine();
+  ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); 
    
 } 
\ No newline at end of file

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java Fri Oct 21 01:19:00 2011
@@ -20,17 +20,18 @@
  */
 package org.apache.qpid.ssl;
 
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 
-import org.apache.qpid.transport.network.security.ssl.QpidClientX509KeyManager;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
 /**
@@ -38,92 +39,157 @@ import org.apache.qpid.transport.network
  * before this will work.
  * 
  */
-public class SSLContextFactory
-{
-    public static final String JAVA_KEY_STORE_CODE = "JKS";
-    public static final String TRANSPORT_LAYER_SECURITY_CODE = "TLS";
-    public static final String KEY_STORE_CERTIFICATE_TYPE = "SunX509";
-
-    private SSLContextFactory()
-    {
-        //no instances
-    }
-
-    public static SSLContext buildServerContext(final String keyStorePath,
-            final String keyStorePassword, final String keyStoreCertType)
-            throws GeneralSecurityException, IOException
+public class SSLContextFactory {
+	
+	/**
+	 * Path to the Java keystore file
+	 */
+	private String _keyStorePath;
+	
+	/**
+	 * Password for the keystore
+	 */
+	private String _keyStorePassword;
+	
+	/**
+	 * Cert type to use in keystore
+	 */
+	private String _keyStoreCertType;
+	
+	/**
+     * Path to the Java truststore file
+     */
+    private String _trustStorePath;
+    
+    /**
+     * Password for the truststore
+     */
+    private String _trustStorePassword;
+    
+    /**
+     * Cert type to use in truststore
+     */
+    private String _trustStoreCertType;
+    
+	private KeyManager customKeyManager;
+    
+    public SSLContextFactory(String trustStorePath, String trustStorePassword,
+            String trustStoreCertType) 
     {
-        return buildContext(null, null, null, keyStorePath, keyStorePassword,
-                keyStoreCertType, null);
+        this(trustStorePath,trustStorePassword,trustStoreCertType,
+                          trustStorePath,trustStorePassword,trustStoreCertType);
     }
 
-    public static SSLContext buildClientContext(final String trustStorePath,
-            final String trustStorePassword, final String trustStoreCertType,
-            final String keyStorePath, final String keyStorePassword,
-            final String keyStoreCertType, final String certAlias)
-            throws GeneralSecurityException, IOException
-    {
-        return buildContext(trustStorePath, trustStorePassword,
-                trustStoreCertType, keyStorePath, keyStorePassword,
-                keyStoreCertType, certAlias);
-    }
-    
-    private static SSLContext buildContext(final String trustStorePath,
-            final String trustStorePassword, final String trustStoreCertType,
-            final String keyStorePath, final String keyStorePassword,
-            final String keyStoreCertType, final String certAlias)
-            throws GeneralSecurityException, IOException
+    /**
+	 * Create a factory instance
+	 * @param keystorePath path to the Java keystore file
+	 * @param keystorePassword password for the Java keystore
+	 * @param certType certificate type
+	 */
+	public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
+            String keyStorePath, String keyStorePassword, String keyStoreCertType) 
+	{
+
+	    _trustStorePath = trustStorePath;
+        _trustStorePassword = trustStorePassword;
+                
+        if (_trustStorePassword != null && _trustStorePassword.equals("none"))
+        {
+            _trustStorePassword = null;
+        }
+        _trustStoreCertType = trustStoreCertType;
+        
+	    _keyStorePath = keyStorePath;
+		_keyStorePassword = keyStorePassword;
+				
+		if (_keyStorePassword != null && _keyStorePassword.equals("none"))
+		{
+			_keyStorePassword = null;
+		}
+		_keyStoreCertType = keyStoreCertType;
+		
+		if (_trustStorePath == null) {
+			throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
+		}
+		if (_trustStoreCertType == null) {
+			throw new IllegalArgumentException("Cert type must be specified");
+		}
+	}
+	
+	public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
+	                         KeyManager customKeyManager) 
     {
-        // Initialize the SSLContext to work with our key managers.
-        final SSLContext sslContext = SSLContext
-                .getInstance(TRANSPORT_LAYER_SECURITY_CODE);
-
-        final TrustManager[] trustManagers;
-        final KeyManager[] keyManagers;
 
-        if (trustStorePath != null)
+        _trustStorePath = trustStorePath;
+        _trustStorePassword = trustStorePassword;
+                
+        if (_trustStorePassword != null && _trustStorePassword.equals("none"))
         {
-            final KeyStore ts = SSLUtil.getInitializedKeyStore(trustStorePath,
-                    trustStorePassword);
-            final TrustManagerFactory tmf = TrustManagerFactory
-                    .getInstance(trustStoreCertType);
-            tmf.init(ts);
-
-            trustManagers = tmf.getTrustManagers();
+            _trustStorePassword = null;
         }
-        else
-        {
-            trustManagers = null;
+        _trustStoreCertType = trustStoreCertType;
+        
+        if (_trustStorePath == null) {
+            throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
         }
-
-        if (keyStorePath != null)
+        if (_trustStoreCertType == null) {
+            throw new IllegalArgumentException("Cert type must be specified");
+        }
+        
+        this.customKeyManager = customKeyManager;
+    }
+	
+	
+	/**
+	 * Builds a SSLContext appropriate for use with a server
+	 * @return SSLContext
+	 * @throws GeneralSecurityException
+	 * @throws IOException
+	 */
+
+	public SSLContext buildServerContext() throws GeneralSecurityException, IOException
+	{
+        KeyStore ts = SSLUtil.getInitializedKeyStore(_trustStorePath,_trustStorePassword);
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
+        tmf.init(ts);
+        
+        // Initialize the SSLContext to work with our key managers.
+        SSLContext sslContext = SSLContext.getInstance("TLS");
+        
+        if (customKeyManager != null)
         {
-            if (certAlias != null)
-            {
-                keyManagers = new KeyManager[] { new QpidClientX509KeyManager(
-                        certAlias, keyStorePath, keyStorePassword,
-                        keyStoreCertType) };
-            }
-            else
-            {
-                final KeyStore ks = SSLUtil.getInitializedKeyStore(
-                        keyStorePath, keyStorePassword);
-
-                char[] keyStoreCharPassword = keyStorePassword == null ? null : keyStorePassword.toCharArray();
-                // Set up key manager factory to use our key store
-                final KeyManagerFactory kmf = KeyManagerFactory
-                        .getInstance(keyStoreCertType);
-                kmf.init(ks, keyStoreCharPassword);
-                keyManagers = kmf.getKeyManagers();
-            }
+            sslContext.init(new KeyManager[]{customKeyManager},
+                            tmf.getTrustManagers(), null);
+            
         }
         else
         {
-            keyManagers = null;
-        }
+            // Create keystore
+            KeyStore ks = SSLUtil.getInitializedKeyStore(_keyStorePath,_keyStorePassword);
+            // Set up key manager factory to use our key store
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance(_keyStoreCertType);
+            kmf.init(ks, _keyStorePassword.toCharArray());
 
-        sslContext.init(keyManagers, trustManagers, null);
-
-        return sslContext;
-    }
+            sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);    
+        }
+        
+        return sslContext;		
+	}
+	
+	/**
+	 * Creates a SSLContext factory appropriate for use with a client
+	 * @return SSLContext
+	 * @throws GeneralSecurityException
+	 * @throws IOException
+	 */
+	public SSLContext buildClientContext() throws GeneralSecurityException, IOException
+	{
+		KeyStore ks = SSLUtil.getInitializedKeyStore(_trustStorePath,_trustStorePassword);
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
+        tmf.init(ks);
+        SSLContext context = SSLContext.getInstance("TLS");
+        context.init(null, tmf.getTrustManagers(), null);
+        return context;		
+	}
+	
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java Fri Oct 21 01:19:00 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.thread;
 
 import org.apache.qpid.thread.Threading;
 
-import java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
 
 public class QpidThreadExecutor implements Executor
 {

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Fri Oct 21 01:19:00 2011
@@ -20,20 +20,28 @@
  */
 package org.apache.qpid.transport;
 
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+
+import org.apache.qpid.security.UsernamePasswordCallbackHandler;
 import static org.apache.qpid.transport.Connection.State.OPEN;
 import static org.apache.qpid.transport.Connection.State.RESUMING;
+import org.apache.qpid.transport.util.Logger;
 
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import org.apache.qpid.transport.util.Logger;
-
 
 /**
  * ClientDelegate
@@ -44,13 +52,31 @@ public class ClientDelegate extends Conn
 {
     private static final Logger log = Logger.get(ClientDelegate.class);
 
+    private static final String KRB5_OID_STR = "1.2.840.113554.1.2.2";
+    protected static final Oid KRB5_OID;
 
+    static
+    {
+        Oid oid;
+        try
+        {
+            oid = new Oid(KRB5_OID_STR);
+        }
+        catch (GSSException ignore)
+        {
+            oid = null;
+        }
 
-    protected final ConnectionSettings _conSettings;
+        KRB5_OID = oid;
+    }
+
+    private List<String> clientMechs;
+    private ConnectionSettings conSettings;
 
     public ClientDelegate(ConnectionSettings settings)
     {
-        this._conSettings = settings;
+        this.conSettings = settings;
+        this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" "));
     }
 
     public void init(Connection conn, ProtocolHeader hdr)
@@ -66,9 +92,9 @@ public class ClientDelegate extends Conn
     {
         Map<String,Object> clientProperties = new HashMap<String,Object>();
 
-        if(this._conSettings.getClientProperties() != null)
+        if(this.conSettings.getClientProperties() != null)
         {
-            clientProperties.putAll(_conSettings.getClientProperties());
+            clientProperties.putAll(this.conSettings.getClientProperties());
         }
 
         clientProperties.put("qpid.session_flow", 1);
@@ -83,12 +109,41 @@ public class ClientDelegate extends Conn
                 (clientProperties, null, null, conn.getLocale());
             return;
         }
+
+        List<String> choosenMechs = new ArrayList<String>();
+        for (String mech:clientMechs)
+        {
+            if (brokerMechs.contains(mech))
+            {
+                choosenMechs.add(mech);
+            }
+        }
+
+        if (choosenMechs.size() == 0)
+        {
+            conn.exception(new ConnectionException("The following SASL mechanisms " +
+                    clientMechs.toString()  +
+                    " specified by the client are not supported by the broker"));
+            return;
+        }
+
+        String[] mechs = new String[choosenMechs.size()];
+        choosenMechs.toArray(mechs);
+
         conn.setServerProperties(start.getServerProperties());
 
         try
         {
-            final SaslClient sc = createSaslClient(brokerMechs);
-
+            Map<String,Object> saslProps = new HashMap<String,Object>();
+            if (conSettings.isUseSASLEncryption())
+            {
+                saslProps.put(Sasl.QOP, "auth-conf");
+            }
+            UsernamePasswordCallbackHandler handler =
+                new UsernamePasswordCallbackHandler();
+            handler.initialise(conSettings.getUsername(), conSettings.getPassword());
+            SaslClient sc = Sasl.createSaslClient
+                (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler);
             conn.setSaslClient(sc);
 
             byte[] response = sc.hasInitialResponse() ?
@@ -97,22 +152,12 @@ public class ClientDelegate extends Conn
                 (clientProperties, sc.getMechanismName(), response,
                  conn.getLocale());
         }
-        catch (ConnectionException ce)
-        {
-            conn.exception(ce);
-        }
         catch (SaslException e)
         {
             conn.exception(e);
         }
     }
 
-
-    protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException, SaslException
-    {
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     public void connectionSecure(Connection conn, ConnectionSecure secure)
     {
@@ -131,7 +176,7 @@ public class ClientDelegate extends Conn
     @Override
     public void connectionTune(Connection conn, ConnectionTune tune)
     {
-        int hb_interval = calculateHeartbeatInterval(_conSettings.getHeartbeatInterval(),
+        int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(),
                                                      tune.getHeartbeatMin(),
                                                      tune.getHeartbeatMax()
                                                      );
@@ -146,12 +191,32 @@ public class ClientDelegate extends Conn
         //(or that forced by protocol limitations [0xFFFF])
         conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
 
-        conn.connectionOpen(_conSettings.getVhost(), null, Option.INSIST);
+        conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST);
     }
 
     @Override
     public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
     {
+        SaslClient sc = conn.getSaslClient();
+        if (sc != null)
+        {
+            if (sc.getMechanismName().equals("GSSAPI"))
+            {
+                String id = getKerberosUser();
+                if (id != null)
+                {
+                    conn.setUserID(id);
+                }
+            }
+            else if (sc.getMechanismName().equals("EXTERNAL"))
+            {
+                if (conn.getSecurityLayer() != null)
+                {
+                    conn.setUserID(conn.getSecurityLayer().getUserID());
+                }
+            }
+        }
+        
         if (conn.isConnectionResuming())
         {
             conn.setState(RESUMING);
@@ -182,7 +247,7 @@ public class ClientDelegate extends Conn
         int i = heartbeat;
         if (i == 0)
         {
-            log.info("Idle timeout is 0 sec. Heartbeats are disabled.");
+            log.warn("Idle timeout is zero. Heartbeats are disabled");
             return 0; // heartbeats are disabled.
         }
         else if (i >= min && i <= max)
@@ -191,8 +256,8 @@ public class ClientDelegate extends Conn
         }
         else
         {
-            log.info("The broker does not support the configured connection idle timeout of %s sec," +
-                     " using the brokers max supported value of %s sec instead.", i,max);
+            log.warn("Ignoring the idle timeout %s set by the connection," +
+            		" using the brokers max value %s", i,max);
             return max;
         }
     }
@@ -221,7 +286,35 @@ public class ClientDelegate extends Conn
 
     }
 
+    private String getKerberosUser()
+    {
+        log.debug("Obtaining userID from kerberos");
+        String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName();
+        GSSManager manager = GSSManager.getInstance();
+
+        try
+        {
+            GSSName acceptorName = manager.createName(service,
+                GSSName.NT_HOSTBASED_SERVICE, KRB5_OID);
+
+            GSSContext secCtx = manager.createContext(acceptorName,
+                                                      KRB5_OID,
+                                                      null,
+                                                      GSSContext.INDEFINITE_LIFETIME);
 
+            secCtx.initSecContext(new byte[0], 0, 1);
 
+            if (secCtx.getSrcName() != null)
+            {
+                return secCtx.getSrcName().toString();
+            }
 
+        }
+        catch (GSSException e)
+        {
+            log.warn("Unable to retrieve userID from Kerberos due to error",e);
+        }
+
+        return null;
+    }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Oct 21 01:19:00 2011
@@ -25,29 +25,21 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.Connection.State.NEW;
 import static org.apache.qpid.transport.Connection.State.OPEN;
 import static org.apache.qpid.transport.Connection.State.OPENING;
+import static org.apache.qpid.transport.Connection.State.RESUMING;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslServer;
 
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
 import org.apache.qpid.transport.network.security.SecurityLayer;
-import org.apache.qpid.transport.network.security.SecurityLayerFactory;
 import org.apache.qpid.transport.util.Logger;
 import org.apache.qpid.transport.util.Waiter;
 import org.apache.qpid.util.Strings;
@@ -73,7 +65,6 @@ public class Connection extends Connecti
     public static final int MAX_CHANNEL_MAX = 0xFFFF;
     public static final int MIN_USABLE_CHANNEL_NUM = 0;
 
-
     public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
 
     static class DefaultConnectionListener implements ConnectionListener
@@ -121,14 +112,17 @@ public class Connection extends Connecti
     private SaslServer saslServer;
     private SaslClient saslClient;
     private int idleTimeout = 0;
+    private String _authorizationID;
     private Map<String,Object> _serverProperties;
     private String userID;
     private ConnectionSettings conSettings;
     private SecurityLayer securityLayer;
     private String _clientId;
-
+    
+    private static final AtomicLong idGenerator = new AtomicLong(0);
+    private final long _connectionId = idGenerator.incrementAndGet();
     private final AtomicBoolean connectionLost = new AtomicBoolean(false);
-
+    
     public Connection() {}
 
     public void setConnectionDelegate(ConnectionDelegate delegate)
@@ -239,24 +233,14 @@ public class Connection extends Connecti
             conSettings = settings;
             state = OPENING;
             userID = settings.getUsername();
-
-            securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
-
-            OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
-            Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
-            if(secureReceiver instanceof ConnectionListener)
-            {
-                addConnectionListener((ConnectionListener)secureReceiver);
-            }
-
-            NetworkConnection network = transport.connect(settings, secureReceiver, null);
-            final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender());
-            if(secureSender instanceof ConnectionListener)
-            {
-                addConnectionListener((ConnectionListener)secureSender);
-            }
-            sender = new Disassembler(secureSender, settings.getMaxFrameSize());
-
+            delegate = new ClientDelegate(settings);
+           
+            TransportBuilder transport = new TransportBuilder();
+            transport.init(this);
+            this.sender = transport.buildSenderPipe();
+            transport.buildReceiverPipe(this);
+            this.securityLayer = transport.getSecurityLayer();
+            
             send(new ProtocolHeader(1, 0, 10));
 
             Waiter w = new Waiter(lock, timeout);
@@ -337,31 +321,23 @@ public class Connection extends Connecti
             Waiter w = new Waiter(lock, timeout);
             while (w.hasTime() && state != OPEN && error == null)
             {
-                w.await();
+                w.await();                
             }
-
+            
             if (state != OPEN)
             {
                 throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
             }
-
+            
             Session ssn = _sessionFactory.newSession(this, name, expiry);
-            registerSession(ssn);
+            sessions.put(name, ssn);
             map(ssn);
             ssn.attach();
             return ssn;
         }
     }
 
-    public void registerSession(Session ssn)
-    {
-        synchronized (lock)
-        {
-            sessions.put(ssn.getName(),ssn);
-        }
-    }
-
-    public void removeSession(Session ssn)
+    void removeSession(Session ssn)
     {
         synchronized (lock)
         {
@@ -376,6 +352,11 @@ public class Connection extends Connecti
         _sessionFactory = sessionFactory;
     }
 
+    public long getConnectionId()
+    {
+        return _connectionId;
+    }
+
     public ConnectionDelegate getConnectionDelegate()
     {
         return delegate;
@@ -424,7 +405,7 @@ public class Connection extends Connecti
         else
         {
             throw new ProtocolViolationException(
-					"Received frames for an already detached session", null);
+					"Received frames for an already dettached session", null);
         }
     }
 
@@ -473,7 +454,7 @@ public class Connection extends Connecti
         }
     }
 
-    public Session getSession(int channel)
+    protected Session getSession(int channel)
     {
         synchronized (lock)
         {
@@ -487,10 +468,18 @@ public class Connection extends Connecti
         {
             for (Session ssn : sessions.values())
             {
-                map(ssn);
-                ssn.resume();
+                if (ssn.isTransacted())
+                {                    
+                    removeSession(ssn);
+                    ssn.setState(Session.State.CLOSED);
+                }
+                else
+                {                
+                    map(ssn);
+                    ssn.attach();
+                    ssn.resume();
+                }
             }
-
             setState(OPEN);
         }
     }
@@ -577,12 +566,12 @@ public class Connection extends Connecti
     {
         close(ConnectionCloseCode.NORMAL, null);
     }
-
+    
     public void mgmtClose()
     {
         close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface.");
     }
-
+    
     public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
     {
         synchronized (lock)
@@ -656,6 +645,16 @@ public class Connection extends Connecti
         return idleTimeout;
     }
 
+    public void setAuthorizationID(String authorizationID)
+    {
+        _authorizationID = authorizationID;
+    }
+
+    public String getAuthorizationID()
+    {
+        return _authorizationID;
+    }
+
     public String getUserID()
     {
         return userID;
@@ -685,24 +684,15 @@ public class Connection extends Connecti
     {
         return conSettings;
     }
-
+    
     public SecurityLayer getSecurityLayer()
     {
         return securityLayer;
     }
-
+    
     public boolean isConnectionResuming()
     {
         return connectionLost.get();
     }
 
-    protected Collection<Session> getChannels()
-    {
-        return channels.values();
-    }
-
-    public boolean hasSessionWithName(final String name)
-    {
-        return sessions.containsKey(new Binary(name.getBytes()));
-    }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java Fri Oct 21 01:19:00 2011
@@ -85,7 +85,7 @@ public abstract class ConnectionDelegate
     @Override public void sessionDetach(Connection conn, SessionDetach dtc)
     {
         Session ssn = conn.getSession(dtc.getChannel());
-        ssn.sessionDetached(dtc.getName(), ssn.getDetachCode() == null? SessionDetachCode.NORMAL: ssn.getDetachCode());
+        ssn.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL);
         conn.unmap(ssn);
         ssn.closed();
     }
@@ -95,7 +95,6 @@ public abstract class ConnectionDelegate
         Session ssn = conn.getSession(dtc.getChannel());
         if (ssn != null)
         {
-            ssn.setDetachCode(dtc.getCode());
             conn.unmap(ssn);
             ssn.closed();
         }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java Fri Oct 21 01:19:00 2011
@@ -30,8 +30,6 @@ import java.util.Map;
  */
 public class ConnectionSettings
 {
-    public static final String WILDCARD_ADDRESS = "*";
-
     String protocol = "tcp";
     String host = "localhost";
     String vhost;
@@ -58,7 +56,7 @@ public class ConnectionSettings
     boolean verifyHostname;
     
     // SASL props
-    String saslMechs = System.getProperty("qpid.sasl_mechs", null);
+    String saslMechs = System.getProperty("qpid.sasl_mechs", "PLAIN");
     String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP");
     String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost");
     boolean useSASLEncryption;

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Fri Oct 21 01:19:00 2011
@@ -75,7 +75,10 @@ public class ServerDelegate extends Conn
 
         if (mechanism == null || mechanism.length() == 0)
         {
-            tuneAuthorizedConnection(conn);
+            conn.connectionTune
+                (getChannelMax(),
+                 org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+                 0, getHeartbeatMax());
             return;
         }
 
@@ -94,7 +97,8 @@ public class ServerDelegate extends Conn
         }
         catch (SaslException e)
         {
-            connectionAuthFailed(conn, e);
+            conn.exception(e);
+            conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
         }
     }
 
@@ -105,52 +109,33 @@ public class ServerDelegate extends Conn
         return ss;
     }
 
-    protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
+    private void secure(Connection conn, byte[] response)
     {
+        SaslServer ss = conn.getSaslServer();
         try
         {
             byte[] challenge = ss.evaluateResponse(response);
             if (ss.isComplete())
             {
                 ss.dispose();
-                tuneAuthorizedConnection(conn);
+                conn.connectionTune
+                    (getChannelMax(),
+                     org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+                     0, getHeartbeatMax());
+                conn.setAuthorizationID(ss.getAuthorizationID());
             }
             else
             {
-                connectionAuthContinue(conn, challenge);
+                conn.connectionSecure(challenge);
             }
         }
         catch (SaslException e)
         {
-            connectionAuthFailed(conn, e);
+            conn.exception(e);
+            conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
         }
     }
 
-    protected void connectionAuthFailed(final Connection conn, Exception e)
-    {
-        conn.exception(e);
-        conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
-    }
-
-    protected void connectionAuthContinue(final Connection conn, byte[] challenge)
-    {
-        conn.connectionSecure(challenge);
-    }
-
-    protected void tuneAuthorizedConnection(final Connection conn)
-    {
-        conn.connectionTune
-            (getChannelMax(),
-             org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
-             0, getHeartbeatMax());
-    }
-    
-    protected void secure(final Connection conn, final byte[] response)
-    {
-        final SaslServer ss = conn.getSaslServer();
-        secure(ss, conn, response);
-    }
-
     protected int getHeartbeatMax()
     {
         return 0xFFFF;
@@ -170,7 +155,22 @@ public class ServerDelegate extends Conn
     @Override
     public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
     {
+        int okChannelMax = ok.getChannelMax();
+        
+        if (okChannelMax > getChannelMax())
+        {
+            _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " +
+                    "client connectionTuneOk returned a channelMax (" + okChannelMax +
+                    ") above the servers offered limit (" + getChannelMax() +")");
 
+            //Due to the error we must forcefully close the connection without negotiation
+            conn.getSender().close();
+            return;
+        }
+
+        //0 means no implied limit, except available server resources
+        //(or that forced by protocol limitations [0xFFFF])
+        conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
     }
 
     @Override
@@ -200,11 +200,4 @@ public class ServerDelegate extends Conn
         ssn.sessionAttached(atc.getName());
         ssn.setState(Session.State.OPEN);
     }
-
-    protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)
-    {
-        //0 means no implied limit, except available server resources
-        //(or that forced by protocol limitations [0xFFFF])
-        conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
-    }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java Fri Oct 21 01:19:00 2011
@@ -30,8 +30,6 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.Session.State.NEW;
 import static org.apache.qpid.transport.Session.State.OPEN;
 import static org.apache.qpid.transport.Session.State.RESUMING;
-
-import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.transport.network.Frame;
 import static org.apache.qpid.transport.util.Functions.mod;
 import org.apache.qpid.transport.util.Logger;
@@ -44,9 +42,7 @@ import static org.apache.qpid.util.Seria
 import static org.apache.qpid.util.Strings.toUTF8;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -59,6 +55,7 @@ import java.util.concurrent.TimeUnit;
 
 public class Session extends SessionInvoker
 {
+
     private static final Logger log = Logger.get(Session.class);
 
     public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
@@ -92,9 +89,7 @@ public class Session extends SessionInvo
     private int channel;
     private SessionDelegate delegate;
     private SessionListener listener = new DefaultSessionListener();
-    private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
-                                        Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
-                                                     ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
+    private long timeout = 60000;
     private boolean autoSync = false;
 
     private boolean incomingInit;
@@ -122,9 +117,7 @@ public class Session extends SessionInvo
 
     private Thread resumer = null;
     private boolean transacted = false;
-    private SessionDetachCode detachCode;
-    private final Object stateLock = new Object();
-
+    
     protected Session(Connection connection, Binary name, long expiry)
     {
         this(connection, new SessionDelegate(), name, expiry);
@@ -259,8 +252,6 @@ public class Session extends SessionInvo
     {
         synchronized (commands)
         {
-            attach();
-
             for (int i = maxComplete + 1; lt(i, commandsOut); i++)
             {
                 Method m = commands[mod(i, commands.length)];
@@ -271,48 +262,16 @@ public class Session extends SessionInvo
                 }
                 else if (m instanceof MessageTransfer)
                 {
-                	MessageTransfer xfr = (MessageTransfer)m;
-                	
-                	if (xfr.getHeader() != null)
-                	{
-                		if (xfr.getHeader().get(DeliveryProperties.class) != null)
-                		{
-                		   xfr.getHeader().get(DeliveryProperties.class).setRedelivered(true);
-                		}
-                		else
-                		{
-                			Struct[] structs = xfr.getHeader().getStructs();
-                			DeliveryProperties deliveryProps = new DeliveryProperties();
-                    		deliveryProps.setRedelivered(true);
-                    		
-                    		List<Struct> list = Arrays.asList(structs);
-                    		list.add(deliveryProps);
-                    		xfr.setHeader(new Header(list));
-                		}
-                		
-                	}
-                	else
-                	{
-                		DeliveryProperties deliveryProps = new DeliveryProperties();
-                		deliveryProps.setRedelivered(true);
-                		xfr.setHeader(new Header(deliveryProps));
-                	}
+                    ((MessageTransfer)m).getHeader().get(DeliveryProperties.class).setRedelivered(true);
                 }
                 sessionCommandPoint(m.getId(), 0);
                 send(m);
             }
-
+           
             sessionCommandPoint(commandsOut, 0);
-
             sessionFlush(COMPLETED);
             resumer = Thread.currentThread();
             state = RESUMING;
-
-            if(isTransacted())
-            {
-                txSelect();
-            }
-
             listener.resumed(this);
             resumer = null;
         }
@@ -463,10 +422,7 @@ public class Session extends SessionInvo
             {
                 return;
             }
-            if (copy.size() > 0)
-            {
-	            sessionCompleted(copy, options);
-            }
+            sessionCompleted(copy, options);
         }
     }
 
@@ -576,6 +532,17 @@ public class Session extends SessionInvo
     {
         if (m.getEncodedTrack() == Frame.L4)
         {
+            
+            if (state == DETACHED && transacted)
+            {
+                state = CLOSED;
+                delegate.closed(this);
+                connection.removeSession(this);
+                throw new SessionException(
+                        "Session failed over, possibly in the middle of a transaction. " +
+                        "Closing the session. Any Transaction in progress will be rolledback.");
+            }
+            
             if (m.hasPayload())
             {
                 acquireCredit();
@@ -583,30 +550,24 @@ public class Session extends SessionInvo
             
             synchronized (commands)
             {
-                //allow the txSelect operation to be invoked during resume
-                boolean skipWait = m instanceof TxSelect && state == RESUMING;
-
-                if(!skipWait)
+                if (state == DETACHED && m.isUnreliable())
                 {
-                    if (state == DETACHED && m.isUnreliable())
+                    Thread current = Thread.currentThread();
+                    if (!current.equals(resumer))
                     {
-                        Thread current = Thread.currentThread();
-                        if (!current.equals(resumer))
-                        {
-                            return;
-                        }
+                        return;
                     }
+                }
 
-                    if (state != OPEN && state != CLOSED && state != CLOSING)
+                if (state != OPEN && state != CLOSED && state != CLOSING)
+                {
+                    Thread current = Thread.currentThread();
+                    if (!current.equals(resumer))
                     {
-                        Thread current = Thread.currentThread();
-                        if (!current.equals(resumer))
+                        Waiter w = new Waiter(commands, timeout);
+                        while (w.hasTime() && (state != OPEN && state != CLOSED))
                         {
-                            Waiter w = new Waiter(commands, timeout);
-                            while (w.hasTime() && (state != OPEN && state != CLOSED))
-                            {
-                                w.await();
-                            }
+                            w.await();
                         }
                     }
                 }
@@ -700,12 +661,7 @@ public class Session extends SessionInvo
                 {
                     sessionCommandPoint(0, 0);
                 }
-                
-                boolean replayTransfer = !closing && !transacted &&
-                                         m instanceof MessageTransfer &&
-                                         ! m.isUnreliable();
-                
-                if ((replayTransfer) || m.hasCompletionListener())
+                if ((!closing && !transacted && m instanceof MessageTransfer) || m.hasCompletionListener())
                 {
                     commands[mod(next, commands.length)] = m;
                     commandBytes += m.getBodySize();
@@ -970,29 +926,16 @@ public class Session extends SessionInvo
 
     public void close()
     {
-        if (log.isDebugEnabled())
-        {
-            log.debug("Closing [%s] in state [%s]", this, state);
-        }
         synchronized (commands)
         {
-            switch(state)
-            {
-                case DETACHED:
-                    state = CLOSED;
-                    delegate.closed(this);
-                    connection.removeSession(this);
-                    listener.closed(this);
-                    break;
-                case CLOSED:
-                    break;
-                default:
-                    state = CLOSING;
-                    setClose(true);
-                    sessionRequestTimeout(0);
-                    sessionDetach(name.getBytes());
-                    awaitClose();
-            }
+            state = CLOSING;
+            setClose(true);
+            sessionRequestTimeout(0);
+            sessionDetach(name.getBytes());
+
+            awaitClose();
+ 
+
         }
     }
 
@@ -1052,8 +995,7 @@ public class Session extends SessionInvo
 
         if(state == CLOSED)
         {
-            connection.removeSession(this);   
-            listener.closed(this);
+            connection.removeSession(this);            
         }
     }
 
@@ -1066,55 +1008,13 @@ public class Session extends SessionInvo
     {
         return String.format("ssn:%s", name);
     }
-
+    
     public void setTransacted(boolean b) {
         this.transacted = b;
     }
-
+    
     public boolean isTransacted(){
         return transacted;
     }
-
-    public void setDetachCode(SessionDetachCode dtc)
-    {
-        this.detachCode = dtc;
-    }
-
-    public SessionDetachCode getDetachCode()
-    {
-        return this.detachCode;
-    }
-
-    public void awaitOpen()
-    {
-        switch (state)
-        {
-        case NEW:
-            synchronized(stateLock)
-            {
-                Waiter w = new Waiter(stateLock, timeout);
-                while (w.hasTime() && state == NEW)
-                {
-                    w.await();
-                }
-            }
-
-            if (state != OPEN)
-            {
-                throw new SessionException("Timed out waiting for Session to open");
-            }
-            break;
-        case DETACHED:
-        case CLOSING:
-        case CLOSED:
-            throw new SessionException("Session closed");
-        default :
-            break;
-        }
-    }
-
-    public Object getStateLock()
-    {
-        return stateLock;
-    }
+    
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Fri Oct 21 01:19:00 2011
@@ -76,10 +76,6 @@ public class SessionDelegate
     @Override public void sessionAttached(Session ssn, SessionAttached atc)
     {
         ssn.setState(Session.State.OPEN);
-        synchronized (ssn.getStateLock())
-        {
-            ssn.getStateLock().notifyAll();
-        }
     }
 
     @Override public void sessionTimeout(Session ssn, SessionTimeout t)
@@ -206,19 +202,11 @@ public class SessionDelegate
 
     public void closed(Session session)
     {
-        log.debug("CLOSED: [%s]", session);
-        synchronized (session.getStateLock())
-        {
-            session.getStateLock().notifyAll();
-        }
+        log.warn("CLOSED: [%s]", session);
     }
 
     public void detached(Session session)
     {
-        log.debug("DETACHED: [%s]", session);
-        synchronized (session.getStateLock())
-        {
-            session.getStateLock().notifyAll();
-        }
+        log.warn("DETACHED: [%s]", session);
     }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java Fri Oct 21 01:19:00 2011
@@ -63,7 +63,6 @@ abstract class AbstractEncoder implement
         ENCODINGS.put(Double.class, Type.DOUBLE);
         ENCODINGS.put(Character.class, Type.CHAR);
         ENCODINGS.put(byte[].class, Type.VBIN32);
-        ENCODINGS.put(UUID.class, Type.UUID);
     }
 
     private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>()

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java Fri Oct 21 01:19:00 2011
@@ -20,11 +20,19 @@
  */
 package org.apache.qpid.transport.network;
 
-/**
- * A network transport is responsible for the establishment of network connections.
- * NetworkTransport implementations are pluggable via the {@link Transport} class.
- */
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ConnectionSettings;
+
 public interface NetworkTransport
 {
+    public void init(ConnectionSettings settings);
+    
+    public Sender<ByteBuffer> sender();
+    
+    public void receiver(Receiver<ByteBuffer> delegate);    
+    
     public void close();
-}
+}
\ No newline at end of file

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Fri Oct 21 01:19:00 2011
@@ -21,49 +21,57 @@
 package org.apache.qpid.transport.network.io;
 
 import java.io.IOException;
-import java.net.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
 import java.nio.ByteBuffer;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocketFactory;
-
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.NetworkTransport;
 import org.apache.qpid.transport.util.Logger;
 
-public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+public class IoNetworkTransport implements NetworkTransport, IoContext
 {
+    static
+    {
+        org.apache.mina.common.ByteBuffer.setAllocator
+            (new org.apache.mina.common.SimpleByteBufferAllocator());
+        org.apache.mina.common.ByteBuffer.setUseDirectBuffers
+            (Boolean.getBoolean("amqj.enableDirectBuffers"));
+    }
 
-    private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
-
-    private Socket _socket;
-    private IoNetworkConnection _connection;
-    private long _timeout = 60000;
-    private AcceptingThread _acceptor;
+    private static final Logger log = Logger.get(IoNetworkTransport.class);
 
-    public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext)
+    private Socket socket;
+    private Sender<ByteBuffer> sender;
+    private IoReceiver receiver;
+    private long timeout = 60000; 
+    private ConnectionSettings settings;    
+    
+    public void init(ConnectionSettings settings)
     {
-        int sendBufferSize = settings.getWriteBufferSize();
-        int receiveBufferSize = settings.getReadBufferSize();
-
         try
         {
-            _socket = new Socket();
-            _socket.setReuseAddress(true);
-            _socket.setTcpNoDelay(settings.isTcpNodelay());
-            _socket.setSendBufferSize(sendBufferSize);
-            _socket.setReceiveBufferSize(receiveBufferSize);
+            this.settings = settings;
+            InetAddress address = InetAddress.getByName(settings.getHost());
+            socket = new Socket();
+            socket.setReuseAddress(true);
+            socket.setTcpNoDelay(settings.isTcpNodelay());
 
-            LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize());
-            LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize());
+            log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+            log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
 
-            InetAddress address = InetAddress.getByName(settings.getHost());
+            socket.setSendBufferSize(settings.getWriteBufferSize());
+            socket.setReceiveBufferSize(settings.getReadBufferSize());
+
+            log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+            log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
 
-            _socket.connect(new InetSocketAddress(address, settings.getPort()));
+            socket.connect(new InetSocketAddress(address, settings.getPort()));
         }
         catch (SocketException e)
         {
@@ -73,159 +81,36 @@ public class IoNetworkTransport implemen
         {
             throw new TransportException("Error connecting to broker", e);
         }
+    }
 
-        try
-        {
-            _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout);
-            _connection.start();
-        }
-        catch(Exception e)
-        {
-            try
-            {
-                _socket.close();
-            }
-            catch(IOException ioe)
-            {
-                //ignored, throw based on original exception
-            }
-
-            throw new TransportException("Error creating network connection", e);
-        }
+    public void receiver(Receiver<ByteBuffer> delegate)
+    {
+        receiver = new IoReceiver(this, delegate,
+                2*settings.getReadBufferSize() , timeout);
+    }
 
-        return _connection;
+    public Sender<ByteBuffer> sender()
+    {
+        return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
     }
 
     public void close()
     {
-        if(_connection != null)
-        {
-            _connection.close();
-        }
-        if(_acceptor != null)
-        {
-            _acceptor.close();
-        }
+        
     }
 
-    public NetworkConnection getConnection()
+    public Sender<ByteBuffer> getSender()
     {
-        return _connection;
+        return sender;
     }
 
-    public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext)
+    public IoReceiver getReceiver()
     {
-
-        try
-        {
-            _acceptor = new AcceptingThread(config, factory, sslContext);
-
-            _acceptor.start();
-        }
-        catch (IOException e)
-        {
-            throw new TransportException("Unable to start server socket", e);
-        }
-
-
+        return receiver;
     }
 
-    private class AcceptingThread extends Thread
+    public Socket getSocket()
     {
-        private NetworkTransportConfiguration _config;
-        private ProtocolEngineFactory _factory;
-        private SSLContext _sslContent;
-        private ServerSocket _serverSocket;
-
-        private AcceptingThread(NetworkTransportConfiguration config,
-                                ProtocolEngineFactory factory,
-                                SSLContext sslContext)
-                throws IOException
-        {
-            _config = config;
-            _factory = factory;
-            _sslContent = sslContext;
-
-            InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort());
-
-            if(sslContext == null)
-            {
-                _serverSocket = new ServerSocket();
-            }
-            else
-            {
-                SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory();
-                _serverSocket = socketFactory.createServerSocket();
-            }
-
-            _serverSocket.bind(address);
-            _serverSocket.setReuseAddress(true);
-
-
-        }
-
-
-        /**
-            Close the underlying ServerSocket if it has not already been closed.
-         */
-        public void close()
-        {
-            if (!_serverSocket.isClosed())
-            {
-                try
-                {
-                    _serverSocket.close();
-                }
-                catch (IOException e)
-                {
-                    throw new TransportException(e);
-                }
-            }
-        }
-
-        @Override
-        public void run()
-        {
-            try
-            {
-                while (true)
-                {
-                    try
-                    {
-                        Socket socket = _serverSocket.accept();
-                        socket.setTcpNoDelay(_config.getTcpNoDelay());
-
-                        final Integer sendBufferSize = _config.getSendBufferSize();
-                        final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
-                        socket.setSendBufferSize(sendBufferSize);
-                        socket.setReceiveBufferSize(receiveBufferSize);
-
-                        ProtocolEngine engine = _factory.newProtocolEngine();
-
-                        NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout);
-
-
-                        engine.setNetworkConnection(connection, connection.getSender());
-
-                        connection.start();
-
-
-                    }
-                    catch(RuntimeException e)
-                    {
-                        LOGGER.error(e, "Error in Acceptor thread " + _config.getPort());
-                    }
-                }
-            }
-            catch (IOException e)
-            {
-                LOGGER.debug(e, "SocketException - no new connections will be accepted on port "
-                        + _config.getPort());
-            }
-        }
-
-
+        return socket;
     }
-
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Fri Oct 21 01:19:00 2011
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.transport.network.io;
 
-import org.apache.qpid.common.Closeable;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.TransportException;
@@ -38,77 +37,56 @@ import java.util.concurrent.atomic.Atomi
  *
  */
 
-final class IoReceiver implements Runnable, Closeable
+final class IoReceiver implements Runnable
 {
 
     private static final Logger log = Logger.get(IoReceiver.class);
 
+    private final IoContext ioCtx;
     private final Receiver<ByteBuffer> receiver;
     private final int bufferSize;
     private final Socket socket;
     private final long timeout;
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final Thread receiverThread;
-    private static final boolean shutdownBroken;
-    static
-    {
-        String osName = System.getProperty("os.name");
-        shutdownBroken = osName == null ? false : osName.matches("(?i).*windows.*");
-    }
+    private final boolean shutdownBroken =
+        ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
 
-    public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
+    public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver,
+                      int bufferSize, long timeout)
     {
+        this.ioCtx = ioCtx;
         this.receiver = receiver;
         this.bufferSize = bufferSize;
-        this.socket = socket;
+        this.socket = ioCtx.getSocket();
         this.timeout = timeout;
 
         try
         {
-            //Create but deliberately don't start the thread.
             receiverThread = Threading.getThreadFactory().createThread(this);
         }
         catch(Exception e)
         {
-            throw new RuntimeException("Error creating IOReceiver thread",e);
+            throw new Error("Error creating IOReceiver thread",e);
         }
         receiverThread.setDaemon(true);
         receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
-    }
-
-    public void initiate()
-    {
         receiverThread.start();
     }
 
-    public void close()
-    {
-        close(false);
-    }
-
     void close(boolean block)
     {
         if (!closed.getAndSet(true))
         {
             try
             {
-                try
+                if (shutdownBroken)
                 {
-                    if (shutdownBroken)
-                    {
-                       socket.close();
-                    }
-                    else
-                    {
-                        socket.shutdownInput();
-                    }
+                   socket.close();
                 }
-                catch(SocketException se)
+                else
                 {
-                    if(!socket.isClosed() && !socket.isInputShutdown())
-                    {
-                        throw se;
-                    }
+                    socket.shutdownInput();
                 }
                 if (block && Thread.currentThread() != receiverThread)
                 {
@@ -127,7 +105,6 @@ final class IoReceiver implements Runnab
             {
                 throw new TransportException(e);
             }
-
         }
     }
 

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Fri Oct 21 01:19:00 2011
@@ -24,14 +24,10 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.qpid.common.Closeable;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderClosedException;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.util.Logger;
@@ -47,6 +43,7 @@ public final class IoSender implements R
     // we can test other cases as well
     private final static int START = Integer.MAX_VALUE - 10;
 
+    private final IoContext ioCtx;
     private final long timeout;
     private final Socket socket;
     private final OutputStream out;
@@ -59,13 +56,14 @@ public final class IoSender implements R
     private final Object notEmpty = new Object();
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final Thread senderThread;
-    private final List<Closeable> _listeners = new ArrayList<Closeable>();
-
+    
     private volatile Throwable exception = null;
 
-    public IoSender(Socket socket, int bufferSize, long timeout)
+
+    public IoSender(IoContext ioCtx, int bufferSize, long timeout)
     {
-        this.socket = socket;
+        this.ioCtx = ioCtx;
+        this.socket = ioCtx.getSocket();
         this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
         this.timeout = timeout;
 
@@ -80,20 +78,15 @@ public final class IoSender implements R
 
         try
         {
-            //Create but deliberately don't start the thread.
-            senderThread = Threading.getThreadFactory().createThread(this);
+            senderThread = Threading.getThreadFactory().createThread(this);                      
         }
         catch(Exception e)
         {
             throw new Error("Error creating IOSender thread",e);
         }
-
+        
         senderThread.setDaemon(true);
         senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
-    }
-
-    public void initiate()
-    {
         senderThread.start();
     }
 
@@ -111,11 +104,7 @@ public final class IoSender implements R
     {
         if (closed.get())
         {
-            throw new SenderClosedException("sender is closed", exception);
-        }
-        if(!senderThread.isAlive())
-        {
-            throw new SenderException("sender thread not alive");
+            throw new SenderException("sender is closed", exception);
         }
 
         final int size = buffer.length;
@@ -148,7 +137,7 @@ public final class IoSender implements R
 
                     if (closed.get())
                     {
-                        throw new SenderClosedException("sender is closed", exception);
+                        throw new SenderException("sender is closed", exception);
                     }
 
                     if (head - tail >= size)
@@ -215,20 +204,16 @@ public final class IoSender implements R
                     senderThread.join(timeout);
                     if (senderThread.isAlive())
                     {
-                        log.error("join timed out");
                         throw new SenderException("join timed out");
                     }
                 }
+                ioCtx.getReceiver().close(false);
             }
             catch (InterruptedException e)
             {
-                log.error("interrupted whilst waiting for sender thread to stop");
                 throw new SenderException(e);
             }
-            finally
-            {
-                closeListeners();
-            }
+
             if (reportException && exception != null)
             {
                 throw new SenderException(exception);
@@ -236,31 +221,9 @@ public final class IoSender implements R
         }
     }
 
-    private void closeListeners()
-    {
-        Exception ex = null;
-        for(Closeable listener : _listeners)
-        {
-            try
-            {
-                listener.close();
-            }
-            catch(Exception e)
-            {
-                log.error("Exception closing listener: " + e.getMessage());
-                ex = e;
-            }
-        }
-
-        if (ex != null)
-        {
-            throw new SenderException(ex.getMessage(), ex);
-        }
-    }
-
     public void run()
     {
-        final int size = buffer.length;
+        final int size = buffer.length;       
         while (true)
         {
             final int hd = head;
@@ -341,9 +304,4 @@ public final class IoSender implements R
             throw new SenderException(e);
         }
     }
-
-    public void registerCloseListener(Closeable listener)
-    {
-        _listeners.add(listener);
-    }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java Fri Oct 21 01:19:00 2011
@@ -25,8 +25,8 @@ import java.nio.ByteBuffer;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
-import org.apache.qpid.ssl.SSLContextFactory;
 import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionListener;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Sender;
@@ -37,12 +37,149 @@ import org.apache.qpid.transport.network
 import org.apache.qpid.transport.network.security.ssl.SSLSender;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
-public interface SecurityLayer
+public class SecurityLayer
 {
-
-    public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate);
-    public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate);
-    public String getUserID();
-
+    ConnectionSettings settings;
+    Connection con;
+    SSLSecurityLayer sslLayer;
+    SASLSecurityLayer saslLayer;
+    
+    public void init(Connection con) throws TransportException
+    {
+        this.con = con;
+        this.settings = con.getConnectionSettings();
+        if (settings.isUseSSL())
+        {
+            sslLayer = new SSLSecurityLayer();
+        }
+        if (settings.isUseSASLEncryption())
+        {
+            saslLayer = new SASLSecurityLayer();
+        }        
+        
+    }
+    
+    public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+    {
+        Sender<ByteBuffer> sender = delegate;
+        
+        if (settings.isUseSSL())
+        {
+            sender = sslLayer.sender(sender);
+        }     
+        
+        if (settings.isUseSASLEncryption())
+        {
+            sender = saslLayer.sender(sender);
+        }
+        
+        return sender;
+    }
+    
+    public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+    {
+        Receiver<ByteBuffer> receiver = delegate;
+        
+        if (settings.isUseSSL())
+        {
+            receiver = sslLayer.receiver(receiver);
+        }        
+        
+        if (settings.isUseSASLEncryption())
+        {
+            receiver = saslLayer.receiver(receiver);
+        }
+        
+        return receiver;
+    }
+    
+    public String getUserID()
+    {
+        if (settings.isUseSSL())
+        {
+            return sslLayer.getUserID();
+        }
+        else
+        {
+            return null;
+        }
+    }
+    
+    class SSLSecurityLayer
+    {
+        SSLEngine engine;
+        SSLSender sender;
+                
+        public SSLSecurityLayer() 
+        {
+            SSLContext sslCtx;
+            try
+            {
+                sslCtx = SSLUtil.createSSLContext(settings);
+            }
+            catch (Exception e)
+            {
+                throw new TransportException("Error creating SSL Context", e);
+            }
+            
+            try
+            {
+                engine = sslCtx.createSSLEngine();
+                engine.setUseClientMode(true);
+            }
+            catch(Exception e)
+            {
+                throw new TransportException("Error creating SSL Engine", e);
+            }
+        }
+        
+        public SSLSender sender(Sender<ByteBuffer> delegate)
+        {
+            sender = new SSLSender(engine,delegate);
+            sender.setConnectionSettings(settings);
+            return sender;
+        }
+        
+        public SSLReceiver receiver(Receiver<ByteBuffer> delegate)
+        {
+            if (sender == null)
+            {
+                throw new  
+                IllegalStateException("SecurityLayer.sender method should be " +
+                		"invoked before SecurityLayer.receiver");
+            }
+            
+            SSLReceiver receiver = new SSLReceiver(engine,delegate,sender);
+            receiver.setConnectionSettings(settings);
+            return receiver;
+        }
+        
+        public String getUserID()
+        {
+            return SSLUtil.retriveIdentity(engine);
+        }
+        
+    }
+    
+    class SASLSecurityLayer
+    {
+        public SASLSecurityLayer() 
+        {
+        }
+        
+        public SASLSender sender(Sender<ByteBuffer> delegate)
+        {
+            SASLSender sender = new SASLSender(delegate);
+            con.addConnectionListener((ConnectionListener)sender);
+            return sender;
+        }
+        
+        public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
+        {
+            SASLReceiver receiver = new SASLReceiver(delegate);
+            con.addConnectionListener((ConnectionListener)receiver);
+            return receiver;
+        }
+        
+    }
 }
-

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java Fri Oct 21 01:19:00 2011
@@ -43,7 +43,8 @@ public class SASLSender extends SASLEncr
         this.delegate = delegate;
         log.debug("SASL Sender enabled");
     }
-
+    
+    @Override
     public void close() 
     {
         
@@ -64,11 +65,13 @@ public class SASLSender extends SASLEncr
         }
     }
 
+    @Override
     public void flush() 
     {
        delegate.flush();
     }
 
+    @Override
     public void send(ByteBuffer buf) 
     {        
         if (closed.get())
@@ -105,6 +108,7 @@ public class SASLSender extends SASLEncr
         }        
     }
 
+    @Override
     public void setIdleTimeout(int i) 
     {
         delegate.setIdleTimeout(i);

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,7 @@
  */
 package org.apache.qpid.transport.network.security.ssl;
 
-import java.io.IOException;
 import java.net.Socket;
-import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.Principal;
 import java.security.PrivateKey;
@@ -42,7 +40,7 @@ public class QpidClientX509KeyManager ex
     String alias;
     
     public QpidClientX509KeyManager(String alias, String keyStorePath,
-                           String keyStorePassword,String keyStoreCertType) throws GeneralSecurityException, IOException
+                           String keyStorePassword,String keyStoreCertType) throws Exception
     {
         this.alias = alias;    
         KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath,keyStorePassword);
@@ -50,45 +48,51 @@ public class QpidClientX509KeyManager ex
         kmf.init(ks, keyStorePassword.toCharArray());
         this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0];
     }
-
+        
+    @Override
     public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket)
     {
         log.debug("chooseClientAlias:Returning alias " + alias);
         return alias;
     }
 
+    @Override
     public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket)
     {
         return delegate.chooseServerAlias(keyType, issuers, socket);
     }
 
+    @Override
     public X509Certificate[] getCertificateChain(String alias)
     {
         return delegate.getCertificateChain(alias);
     }
 
+    @Override
     public String[] getClientAliases(String keyType, Principal[] issuers)
     {
         log.debug("getClientAliases:Returning alias " + alias);
         return new String[]{alias};
     }
 
+    @Override
     public PrivateKey getPrivateKey(String alias)
     {
         return delegate.getPrivateKey(alias);
     }
 
+    @Override
     public String[] getServerAliases(String keyType, Principal[] issuers)
     {
         return delegate.getServerAliases(keyType, issuers);
     }
-
+    
     public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine)
     {
         log.debug("chooseEngineClientAlias:Returning alias " + alias);
         return alias;
     }
-
+    
     public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) 
     {
         return delegate.chooseEngineServerAlias(keyType, issuers, engine);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org