You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC

svn commit: r1187375 [35/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java Fri Oct 21 14:42:12 2011
@@ -20,11 +20,11 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
 
 public class HeartbeatBodyFactory implements BodyFactory
 {
-    public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+    public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException
     {
         return new HeartbeatBody();
     }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Fri Oct 21 14:42:12 2011
@@ -22,6 +22,10 @@ package org.apache.qpid.framing;
 
 import org.apache.qpid.AMQException;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -62,35 +66,30 @@ public class ProtocolInitiation extends 
              pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion());
     }
 
-    public ProtocolInitiation(ByteBuffer in)
+    public ProtocolInitiation(DataInputStream in) throws IOException
     {
         _protocolHeader = new byte[4];
-        in.get(_protocolHeader);
+        in.read(_protocolHeader);
 
-        _protocolClass = in.get();
-        _protocolInstance = in.get();
-        _protocolMajor = in.get();
-        _protocolMinor = in.get();
+        _protocolClass = in.readByte();
+        _protocolInstance = in.readByte();
+        _protocolMajor = in.readByte();
+        _protocolMinor = in.readByte();
     }
 
-    public void writePayload(org.apache.mina.common.ByteBuffer buffer)
-    {
-        writePayload(buffer.buf());
-    }
-    
     public long getSize()
     {
         return 4 + 1 + 1 + 1 + 1;
     }
 
-    public void writePayload(ByteBuffer buffer)
+    public void writePayload(DataOutputStream buffer) throws IOException
     {
 
-        buffer.put(_protocolHeader);
-        buffer.put(_protocolClass);
-        buffer.put(_protocolInstance);
-        buffer.put(_protocolMajor);
-        buffer.put(_protocolMinor);
+        buffer.write(_protocolHeader);
+        buffer.write(_protocolClass);
+        buffer.write(_protocolInstance);
+        buffer.write(_protocolMajor);
+        buffer.write(_protocolMinor);
     }
 
     public boolean equals(Object o)
@@ -144,9 +143,9 @@ public class ProtocolInitiation extends 
          * @return true if we have enough data to decode the PI frame fully, false if more
          * data is required
          */
-        public boolean decodable(ByteBuffer in)
+        public boolean decodable(DataInputStream in) throws IOException
         {
-            return (in.remaining() >= 8);
+            return (in.available() >= 8);
         }
 
     }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java Fri Oct 21 14:42:12 2011
@@ -21,7 +21,8 @@
 
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
 
 public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
 {
@@ -68,7 +69,7 @@ public class SmallCompositeAMQDataBlock 
         return frameSize;
     }
 
-    public void writePayload(ByteBuffer buffer)
+    public void writePayload(DataOutputStream buffer) throws IOException
     {
         if (_firstFrame != null)
         {

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,8 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
 
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 
@@ -144,7 +145,7 @@ public class VersionSpecificRegistry
 
     }
 
-    public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size) throws AMQFrameDecodingException
+    public AMQMethodBody get(short classID, short methodID, DataInputStream in, long size) throws AMQFrameDecodingException, IOException
     {
         AMQMethodBodyInstanceFactory bodyFactory;
         try

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java Fri Oct 21 14:42:12 2011
@@ -21,12 +21,10 @@
 
 package org.apache.qpid.framing.abstraction;
 
-import org.apache.mina.common.ByteBuffer;
-
 public interface ContentChunk
 {
     int getSize();
-    ByteBuffer getData();
+    byte[] getData();
 
     void reduceToFit();
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java Fri Oct 21 14:42:12 2011
@@ -23,8 +23,6 @@ package org.apache.qpid.framing.abstract
 
 import org.apache.qpid.framing.AMQBody;
 
-import java.nio.ByteBuffer;
-
 public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
 {
     AMQBody convertToBody(ContentChunk contentBody);
@@ -32,5 +30,5 @@ public interface ProtocolVersionMethodCo
 
     void configure();
 
-    AMQBody convertToBody(ByteBuffer buf);
+    AMQBody convertToBody(byte[] input);
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Fri Oct 21 14:42:12 2011
@@ -21,16 +21,13 @@
 
 package org.apache.qpid.framing.amqp_0_9;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.*;
-import org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl;
+
 
 public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
 {
@@ -72,9 +69,9 @@ public class MethodConverter_0_9 extends
 
     }
 
-    public AMQBody convertToBody(java.nio.ByteBuffer buf)
+    public AMQBody convertToBody(byte[] data)
     {
-        return new ContentBody(ByteBuffer.wrap(buf));
+        return new ContentBody(data);
     }
 
     public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
@@ -116,9 +113,9 @@ public class MethodConverter_0_9 extends
             return _contentBodyChunk.getSize();
         }
 
-        public ByteBuffer getData()
+        public byte[] getData()
         {
-            return _contentBodyChunk.payload;
+            return _contentBodyChunk._payload;
         }
 
         public void reduceToFit()

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java Fri Oct 21 14:42:12 2011
@@ -21,8 +21,6 @@
 
 package org.apache.qpid.framing.amqp_0_91;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -70,9 +68,9 @@ public class MethodConverter_0_91 extend
 
     }
 
-    public AMQBody convertToBody(java.nio.ByteBuffer buf)
+    public AMQBody convertToBody(byte[] data)
     {
-        return new ContentBody(ByteBuffer.wrap(buf));
+        return new ContentBody(data);
     }
 
     public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
@@ -114,9 +112,9 @@ public class MethodConverter_0_91 extend
             return _contentBodyChunk.getSize();
         }
 
-        public ByteBuffer getData()
+        public byte[] getData()
         {
-            return _contentBodyChunk.payload;
+            return _contentBodyChunk._payload;
         }
 
         public void reduceToFit()

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java Fri Oct 21 14:42:12 2011
@@ -26,11 +26,8 @@ import org.apache.qpid.framing.abstracti
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
 import org.apache.qpid.framing.*;
 
-import org.apache.mina.common.ByteBuffer;
-
 public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
 {
     private int _basicPublishClassId;
@@ -60,9 +57,9 @@ public class MethodConverter_8_0 extends
                 return contentBodyChunk.getSize();
             }
 
-            public ByteBuffer getData()
+            public byte[] getData()
             {
-                return contentBodyChunk.payload;
+                return contentBodyChunk._payload;
             }
 
             public void reduceToFit()
@@ -81,9 +78,9 @@ public class MethodConverter_8_0 extends
                 
     }
    
-    public AMQBody convertToBody(java.nio.ByteBuffer buf)
+    public AMQBody convertToBody(byte[] data)
     {
-        return new ContentBody(ByteBuffer.wrap(buf));
+        return new ContentBody(data);
     }
 
     public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Fri Oct 21 14:42:12 2011
@@ -80,7 +80,7 @@ public final class AMQConstant
     /**
      * An operator intervened to close the connection for some reason. The client may retry at some later date.
      */
-    public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
+    public static final AMQConstant CONNECTION_FORCED = new AMQConstant(320, "connection forced", true);
 
     /** The client tried to work with an unknown virtual host or cluster. */
     public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
@@ -104,7 +104,7 @@ public final class AMQConstant
 
     public static final AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "Request Timeout", true);
 
-    public static final AMQConstant INVALID_ARGUMENT = new AMQConstant(409, "argument invalid", true);
+    public static final AMQConstant ARGUMENT_INVALID = new AMQConstant(409, "argument invalid", true);
 
     /**
      * The client sent a malformed frame that the server could not decode. This strongly implies a programming error
@@ -153,10 +153,7 @@ public final class AMQConstant
 
     public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true);
 
-    /**
-     * The server does not support the protocol version
-     */
-    public static final AMQConstant UNSUPPORTED_BROKER_PROTOCOL_ERROR = new AMQConstant(542, "broker unsupported protocol", true);
+    public static final AMQConstant INVALID_ARGUMENT = new AMQConstant(542, "invalid argument", true);
     /**
      * The client imp does not support the protocol version
      */

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Fri Oct 21 14:42:12 2011
@@ -21,10 +21,11 @@
 package org.apache.qpid.protocol;
 
 import java.net.SocketAddress;
+import java.nio.ByteBuffer;
 
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 /**
  * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
@@ -32,9 +33,6 @@ import org.apache.qpid.transport.Receive
  */
 public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
 {
-   // Sets the network driver providing data for this ProtocolEngine
-   void setNetworkDriver (NetworkDriver driver);
-
    // Returns the remote address of the NetworkDriver
    SocketAddress getRemoteAddress();
 
@@ -58,4 +56,6 @@ public interface ProtocolEngine extends 
    void readerIdle();
 
 
+    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+
 }
\ No newline at end of file

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java Fri Oct 21 14:42:12 2011
@@ -20,12 +20,12 @@
  */
 package org.apache.qpid.protocol;
 
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 public interface ProtocolEngineFactory  
 { 
  
   // Returns a new instance of a ProtocolEngine 
-  ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); 
+  ProtocolEngine newProtocolEngine();
    
 } 
\ No newline at end of file

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java Fri Oct 21 14:42:12 2011
@@ -20,18 +20,17 @@
  */
 package org.apache.qpid.ssl;
 
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 
+import org.apache.qpid.transport.network.security.ssl.QpidClientX509KeyManager;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
 /**
@@ -39,157 +38,92 @@ import org.apache.qpid.transport.network
  * before this will work.
  * 
  */
-public class SSLContextFactory {
-	
-	/**
-	 * Path to the Java keystore file
-	 */
-	private String _keyStorePath;
-	
-	/**
-	 * Password for the keystore
-	 */
-	private String _keyStorePassword;
-	
-	/**
-	 * Cert type to use in keystore
-	 */
-	private String _keyStoreCertType;
-	
-	/**
-     * Path to the Java truststore file
-     */
-    private String _trustStorePath;
-    
-    /**
-     * Password for the truststore
-     */
-    private String _trustStorePassword;
-    
-    /**
-     * Cert type to use in truststore
-     */
-    private String _trustStoreCertType;
-    
-	private KeyManager customKeyManager;
-    
-    public SSLContextFactory(String trustStorePath, String trustStorePassword,
-            String trustStoreCertType) 
+public class SSLContextFactory
+{
+    public static final String JAVA_KEY_STORE_CODE = "JKS";
+    public static final String TRANSPORT_LAYER_SECURITY_CODE = "TLS";
+    public static final String KEY_STORE_CERTIFICATE_TYPE = "SunX509";
+
+    private SSLContextFactory()
     {
-        this(trustStorePath,trustStorePassword,trustStoreCertType,
-                          trustStorePath,trustStorePassword,trustStoreCertType);
+        //no instances
     }
 
-    /**
-	 * Create a factory instance
-	 * @param keystorePath path to the Java keystore file
-	 * @param keystorePassword password for the Java keystore
-	 * @param certType certificate type
-	 */
-	public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
-            String keyStorePath, String keyStorePassword, String keyStoreCertType) 
-	{
-
-	    _trustStorePath = trustStorePath;
-        _trustStorePassword = trustStorePassword;
-                
-        if (_trustStorePassword != null && _trustStorePassword.equals("none"))
-        {
-            _trustStorePassword = null;
-        }
-        _trustStoreCertType = trustStoreCertType;
-        
-	    _keyStorePath = keyStorePath;
-		_keyStorePassword = keyStorePassword;
-				
-		if (_keyStorePassword != null && _keyStorePassword.equals("none"))
-		{
-			_keyStorePassword = null;
-		}
-		_keyStoreCertType = keyStoreCertType;
-		
-		if (_trustStorePath == null) {
-			throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
-		}
-		if (_trustStoreCertType == null) {
-			throw new IllegalArgumentException("Cert type must be specified");
-		}
-	}
-	
-	public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
-	                         KeyManager customKeyManager) 
+    public static SSLContext buildServerContext(final String keyStorePath,
+            final String keyStorePassword, final String keyStoreCertType)
+            throws GeneralSecurityException, IOException
     {
+        return buildContext(null, null, null, keyStorePath, keyStorePassword,
+                keyStoreCertType, null);
+    }
 
-        _trustStorePath = trustStorePath;
-        _trustStorePassword = trustStorePassword;
-                
-        if (_trustStorePassword != null && _trustStorePassword.equals("none"))
-        {
-            _trustStorePassword = null;
-        }
-        _trustStoreCertType = trustStoreCertType;
-        
-        if (_trustStorePath == null) {
-            throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
-        }
-        if (_trustStoreCertType == null) {
-            throw new IllegalArgumentException("Cert type must be specified");
-        }
-        
-        this.customKeyManager = customKeyManager;
+    public static SSLContext buildClientContext(final String trustStorePath,
+            final String trustStorePassword, final String trustStoreCertType,
+            final String keyStorePath, final String keyStorePassword,
+            final String keyStoreCertType, final String certAlias)
+            throws GeneralSecurityException, IOException
+    {
+        return buildContext(trustStorePath, trustStorePassword,
+                trustStoreCertType, keyStorePath, keyStorePassword,
+                keyStoreCertType, certAlias);
     }
-	
-	
-	/**
-	 * Builds a SSLContext appropriate for use with a server
-	 * @return SSLContext
-	 * @throws GeneralSecurityException
-	 * @throws IOException
-	 */
-
-	public SSLContext buildServerContext() throws GeneralSecurityException, IOException
-	{
-        KeyStore ts = SSLUtil.getInitializedKeyStore(_trustStorePath,_trustStorePassword);
-        TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
-        tmf.init(ts);
-        
+    
+    private static SSLContext buildContext(final String trustStorePath,
+            final String trustStorePassword, final String trustStoreCertType,
+            final String keyStorePath, final String keyStorePassword,
+            final String keyStoreCertType, final String certAlias)
+            throws GeneralSecurityException, IOException
+    {
         // Initialize the SSLContext to work with our key managers.
-        SSLContext sslContext = SSLContext.getInstance("TLS");
-        
-        if (customKeyManager != null)
+        final SSLContext sslContext = SSLContext
+                .getInstance(TRANSPORT_LAYER_SECURITY_CODE);
+
+        final TrustManager[] trustManagers;
+        final KeyManager[] keyManagers;
+
+        if (trustStorePath != null)
         {
-            sslContext.init(new KeyManager[]{customKeyManager},
-                            tmf.getTrustManagers(), null);
-            
+            final KeyStore ts = SSLUtil.getInitializedKeyStore(trustStorePath,
+                    trustStorePassword);
+            final TrustManagerFactory tmf = TrustManagerFactory
+                    .getInstance(trustStoreCertType);
+            tmf.init(ts);
+
+            trustManagers = tmf.getTrustManagers();
         }
         else
         {
-            // Create keystore
-            KeyStore ks = SSLUtil.getInitializedKeyStore(_keyStorePath,_keyStorePassword);
-            // Set up key manager factory to use our key store
-            KeyManagerFactory kmf = KeyManagerFactory.getInstance(_keyStoreCertType);
-            kmf.init(ks, _keyStorePassword.toCharArray());
+            trustManagers = null;
+        }
 
-            sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);    
+        if (keyStorePath != null)
+        {
+            if (certAlias != null)
+            {
+                keyManagers = new KeyManager[] { new QpidClientX509KeyManager(
+                        certAlias, keyStorePath, keyStorePassword,
+                        keyStoreCertType) };
+            }
+            else
+            {
+                final KeyStore ks = SSLUtil.getInitializedKeyStore(
+                        keyStorePath, keyStorePassword);
+
+                char[] keyStoreCharPassword = keyStorePassword == null ? null : keyStorePassword.toCharArray();
+                // Set up key manager factory to use our key store
+                final KeyManagerFactory kmf = KeyManagerFactory
+                        .getInstance(keyStoreCertType);
+                kmf.init(ks, keyStoreCharPassword);
+                keyManagers = kmf.getKeyManagers();
+            }
         }
-        
-        return sslContext;		
-	}
-	
-	/**
-	 * Creates a SSLContext factory appropriate for use with a client
-	 * @return SSLContext
-	 * @throws GeneralSecurityException
-	 * @throws IOException
-	 */
-	public SSLContext buildClientContext() throws GeneralSecurityException, IOException
-	{
-		KeyStore ks = SSLUtil.getInitializedKeyStore(_trustStorePath,_trustStorePassword);
-        TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
-        tmf.init(ks);
-        SSLContext context = SSLContext.getInstance("TLS");
-        context.init(null, tmf.getTrustManagers(), null);
-        return context;		
-	}
-	
+        else
+        {
+            keyManagers = null;
+        }
+
+        sslContext.init(keyManagers, trustManagers, null);
+
+        return sslContext;
+    }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java Fri Oct 21 14:42:12 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.thread;
 
 import org.apache.qpid.thread.Threading;
 
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import java.util.concurrent.Executor;
 
 public class QpidThreadExecutor implements Executor
 {

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Fri Oct 21 14:42:12 2011
@@ -20,28 +20,20 @@
  */
 package org.apache.qpid.transport;
 
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.GSSName;
-import org.ietf.jgss.Oid;
-
-import org.apache.qpid.security.UsernamePasswordCallbackHandler;
 import static org.apache.qpid.transport.Connection.State.OPEN;
 import static org.apache.qpid.transport.Connection.State.RESUMING;
-import org.apache.qpid.transport.util.Logger;
 
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.transport.util.Logger;
+
 
 /**
  * ClientDelegate
@@ -52,31 +44,13 @@ public class ClientDelegate extends Conn
 {
     private static final Logger log = Logger.get(ClientDelegate.class);
 
-    private static final String KRB5_OID_STR = "1.2.840.113554.1.2.2";
-    protected static final Oid KRB5_OID;
 
-    static
-    {
-        Oid oid;
-        try
-        {
-            oid = new Oid(KRB5_OID_STR);
-        }
-        catch (GSSException ignore)
-        {
-            oid = null;
-        }
 
-        KRB5_OID = oid;
-    }
-
-    private List<String> clientMechs;
-    private ConnectionSettings conSettings;
+    protected final ConnectionSettings _conSettings;
 
     public ClientDelegate(ConnectionSettings settings)
     {
-        this.conSettings = settings;
-        this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" "));
+        this._conSettings = settings;
     }
 
     public void init(Connection conn, ProtocolHeader hdr)
@@ -92,9 +66,9 @@ public class ClientDelegate extends Conn
     {
         Map<String,Object> clientProperties = new HashMap<String,Object>();
 
-        if(this.conSettings.getClientProperties() != null)
+        if(this._conSettings.getClientProperties() != null)
         {
-            clientProperties.putAll(this.conSettings.getClientProperties());
+            clientProperties.putAll(_conSettings.getClientProperties());
         }
 
         clientProperties.put("qpid.session_flow", 1);
@@ -109,41 +83,12 @@ public class ClientDelegate extends Conn
                 (clientProperties, null, null, conn.getLocale());
             return;
         }
-
-        List<String> choosenMechs = new ArrayList<String>();
-        for (String mech:clientMechs)
-        {
-            if (brokerMechs.contains(mech))
-            {
-                choosenMechs.add(mech);
-            }
-        }
-
-        if (choosenMechs.size() == 0)
-        {
-            conn.exception(new ConnectionException("The following SASL mechanisms " +
-                    clientMechs.toString()  +
-                    " specified by the client are not supported by the broker"));
-            return;
-        }
-
-        String[] mechs = new String[choosenMechs.size()];
-        choosenMechs.toArray(mechs);
-
         conn.setServerProperties(start.getServerProperties());
 
         try
         {
-            Map<String,Object> saslProps = new HashMap<String,Object>();
-            if (conSettings.isUseSASLEncryption())
-            {
-                saslProps.put(Sasl.QOP, "auth-conf");
-            }
-            UsernamePasswordCallbackHandler handler =
-                new UsernamePasswordCallbackHandler();
-            handler.initialise(conSettings.getUsername(), conSettings.getPassword());
-            SaslClient sc = Sasl.createSaslClient
-                (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler);
+            final SaslClient sc = createSaslClient(brokerMechs);
+
             conn.setSaslClient(sc);
 
             byte[] response = sc.hasInitialResponse() ?
@@ -152,12 +97,22 @@ public class ClientDelegate extends Conn
                 (clientProperties, sc.getMechanismName(), response,
                  conn.getLocale());
         }
+        catch (ConnectionException ce)
+        {
+            conn.exception(ce);
+        }
         catch (SaslException e)
         {
             conn.exception(e);
         }
     }
 
+
+    protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException, SaslException
+    {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void connectionSecure(Connection conn, ConnectionSecure secure)
     {
@@ -176,7 +131,7 @@ public class ClientDelegate extends Conn
     @Override
     public void connectionTune(Connection conn, ConnectionTune tune)
     {
-        int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(),
+        int hb_interval = calculateHeartbeatInterval(_conSettings.getHeartbeatInterval(),
                                                      tune.getHeartbeatMin(),
                                                      tune.getHeartbeatMax()
                                                      );
@@ -191,32 +146,12 @@ public class ClientDelegate extends Conn
         //(or that forced by protocol limitations [0xFFFF])
         conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
 
-        conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST);
+        conn.connectionOpen(_conSettings.getVhost(), null, Option.INSIST);
     }
 
     @Override
     public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
     {
-        SaslClient sc = conn.getSaslClient();
-        if (sc != null)
-        {
-            if (sc.getMechanismName().equals("GSSAPI"))
-            {
-                String id = getKerberosUser();
-                if (id != null)
-                {
-                    conn.setUserID(id);
-                }
-            }
-            else if (sc.getMechanismName().equals("EXTERNAL"))
-            {
-                if (conn.getSecurityLayer() != null)
-                {
-                    conn.setUserID(conn.getSecurityLayer().getUserID());
-                }
-            }
-        }
-        
         if (conn.isConnectionResuming())
         {
             conn.setState(RESUMING);
@@ -247,7 +182,7 @@ public class ClientDelegate extends Conn
         int i = heartbeat;
         if (i == 0)
         {
-            log.warn("Idle timeout is zero. Heartbeats are disabled");
+            log.info("Idle timeout is 0 sec. Heartbeats are disabled.");
             return 0; // heartbeats are disabled.
         }
         else if (i >= min && i <= max)
@@ -256,8 +191,8 @@ public class ClientDelegate extends Conn
         }
         else
         {
-            log.warn("Ignoring the idle timeout %s set by the connection," +
-            		" using the brokers max value %s", i,max);
+            log.info("The broker does not support the configured connection idle timeout of %s sec," +
+                     " using the brokers max supported value of %s sec instead.", i,max);
             return max;
         }
     }
@@ -286,35 +221,7 @@ public class ClientDelegate extends Conn
 
     }
 
-    private String getKerberosUser()
-    {
-        log.debug("Obtaining userID from kerberos");
-        String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName();
-        GSSManager manager = GSSManager.getInstance();
-
-        try
-        {
-            GSSName acceptorName = manager.createName(service,
-                GSSName.NT_HOSTBASED_SERVICE, KRB5_OID);
-
-            GSSContext secCtx = manager.createContext(acceptorName,
-                                                      KRB5_OID,
-                                                      null,
-                                                      GSSContext.INDEFINITE_LIFETIME);
 
-            secCtx.initSecContext(new byte[0], 0, 1);
 
-            if (secCtx.getSrcName() != null)
-            {
-                return secCtx.getSrcName().toString();
-            }
 
-        }
-        catch (GSSException e)
-        {
-            log.warn("Unable to retrieve userID from Kerberos due to error",e);
-        }
-
-        return null;
-    }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Oct 21 14:42:12 2011
@@ -25,21 +25,29 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.Connection.State.NEW;
 import static org.apache.qpid.transport.Connection.State.OPEN;
 import static org.apache.qpid.transport.Connection.State.OPENING;
-import static org.apache.qpid.transport.Connection.State.RESUMING;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslServer;
 
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
 import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.network.security.SecurityLayerFactory;
 import org.apache.qpid.transport.util.Logger;
 import org.apache.qpid.transport.util.Waiter;
 import org.apache.qpid.util.Strings;
@@ -65,6 +73,7 @@ public class Connection extends Connecti
     public static final int MAX_CHANNEL_MAX = 0xFFFF;
     public static final int MIN_USABLE_CHANNEL_NUM = 0;
 
+
     public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
 
     static class DefaultConnectionListener implements ConnectionListener
@@ -112,17 +121,14 @@ public class Connection extends Connecti
     private SaslServer saslServer;
     private SaslClient saslClient;
     private int idleTimeout = 0;
-    private String _authorizationID;
     private Map<String,Object> _serverProperties;
     private String userID;
     private ConnectionSettings conSettings;
     private SecurityLayer securityLayer;
     private String _clientId;
-    
-    private static final AtomicLong idGenerator = new AtomicLong(0);
-    private final long _connectionId = idGenerator.incrementAndGet();
+
     private final AtomicBoolean connectionLost = new AtomicBoolean(false);
-    
+
     public Connection() {}
 
     public void setConnectionDelegate(ConnectionDelegate delegate)
@@ -233,14 +239,24 @@ public class Connection extends Connecti
             conSettings = settings;
             state = OPENING;
             userID = settings.getUsername();
-            delegate = new ClientDelegate(settings);
-           
-            TransportBuilder transport = new TransportBuilder();
-            transport.init(this);
-            this.sender = transport.buildSenderPipe();
-            transport.buildReceiverPipe(this);
-            this.securityLayer = transport.getSecurityLayer();
-            
+
+            securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
+
+            OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
+            Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
+            if(secureReceiver instanceof ConnectionListener)
+            {
+                addConnectionListener((ConnectionListener)secureReceiver);
+            }
+
+            NetworkConnection network = transport.connect(settings, secureReceiver, null);
+            final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender());
+            if(secureSender instanceof ConnectionListener)
+            {
+                addConnectionListener((ConnectionListener)secureSender);
+            }
+            sender = new Disassembler(secureSender, settings.getMaxFrameSize());
+
             send(new ProtocolHeader(1, 0, 10));
 
             Waiter w = new Waiter(lock, timeout);
@@ -321,23 +337,31 @@ public class Connection extends Connecti
             Waiter w = new Waiter(lock, timeout);
             while (w.hasTime() && state != OPEN && error == null)
             {
-                w.await();                
+                w.await();
             }
-            
+
             if (state != OPEN)
             {
                 throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
             }
-            
+
             Session ssn = _sessionFactory.newSession(this, name, expiry);
-            sessions.put(name, ssn);
+            registerSession(ssn);
             map(ssn);
             ssn.attach();
             return ssn;
         }
     }
 
-    void removeSession(Session ssn)
+    public void registerSession(Session ssn)
+    {
+        synchronized (lock)
+        {
+            sessions.put(ssn.getName(),ssn);
+        }
+    }
+
+    public void removeSession(Session ssn)
     {
         synchronized (lock)
         {
@@ -352,11 +376,6 @@ public class Connection extends Connecti
         _sessionFactory = sessionFactory;
     }
 
-    public long getConnectionId()
-    {
-        return _connectionId;
-    }
-
     public ConnectionDelegate getConnectionDelegate()
     {
         return delegate;
@@ -405,7 +424,7 @@ public class Connection extends Connecti
         else
         {
             throw new ProtocolViolationException(
-					"Received frames for an already dettached session", null);
+					"Received frames for an already detached session", null);
         }
     }
 
@@ -454,7 +473,7 @@ public class Connection extends Connecti
         }
     }
 
-    protected Session getSession(int channel)
+    public Session getSession(int channel)
     {
         synchronized (lock)
         {
@@ -468,18 +487,10 @@ public class Connection extends Connecti
         {
             for (Session ssn : sessions.values())
             {
-                if (ssn.isTransacted())
-                {                    
-                    removeSession(ssn);
-                    ssn.setState(Session.State.CLOSED);
-                }
-                else
-                {                
-                    map(ssn);
-                    ssn.attach();
-                    ssn.resume();
-                }
+                map(ssn);
+                ssn.resume();
             }
+
             setState(OPEN);
         }
     }
@@ -515,10 +526,6 @@ public class Connection extends Connecti
     {
         synchronized (lock)
         {
-            for (Session ssn : channels.values())
-            {
-                ssn.closeCode(close);
-            }
             ConnectionCloseCode code = close.getReplyCode();
             if (code != ConnectionCloseCode.NORMAL)
             {
@@ -566,12 +573,12 @@ public class Connection extends Connecti
     {
         close(ConnectionCloseCode.NORMAL, null);
     }
-    
+
     public void mgmtClose()
     {
         close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface.");
     }
-    
+
     public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
     {
         synchronized (lock)
@@ -645,16 +652,6 @@ public class Connection extends Connecti
         return idleTimeout;
     }
 
-    public void setAuthorizationID(String authorizationID)
-    {
-        _authorizationID = authorizationID;
-    }
-
-    public String getAuthorizationID()
-    {
-        return _authorizationID;
-    }
-
     public String getUserID()
     {
         return userID;
@@ -684,15 +681,33 @@ public class Connection extends Connecti
     {
         return conSettings;
     }
-    
+
     public SecurityLayer getSecurityLayer()
     {
         return securityLayer;
     }
-    
+
     public boolean isConnectionResuming()
     {
         return connectionLost.get();
     }
 
+    protected Collection<Session> getChannels()
+    {
+        return channels.values();
+    }
+
+    public boolean hasSessionWithName(final String name)
+    {
+        return sessions.containsKey(new Binary(name.getBytes()));
+    }
+
+    public void notifyFailoverRequired()
+    {
+        List<Session> values = new ArrayList<Session>(channels.values());
+        for (Session ssn : values)
+        {
+            ssn.notifyFailoverRequired();
+        }
+    }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java Fri Oct 21 14:42:12 2011
@@ -85,7 +85,7 @@ public abstract class ConnectionDelegate
     @Override public void sessionDetach(Connection conn, SessionDetach dtc)
     {
         Session ssn = conn.getSession(dtc.getChannel());
-        ssn.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL);
+        ssn.sessionDetached(dtc.getName(), ssn.getDetachCode() == null? SessionDetachCode.NORMAL: ssn.getDetachCode());
         conn.unmap(ssn);
         ssn.closed();
     }
@@ -95,6 +95,7 @@ public abstract class ConnectionDelegate
         Session ssn = conn.getSession(dtc.getChannel());
         if (ssn != null)
         {
+            ssn.setDetachCode(dtc.getCode());
             conn.unmap(ssn);
             ssn.closed();
         }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java Fri Oct 21 14:42:12 2011
@@ -30,6 +30,8 @@ import java.util.Map;
  */
 public class ConnectionSettings
 {
+    public static final String WILDCARD_ADDRESS = "*";
+
     String protocol = "tcp";
     String host = "localhost";
     String vhost;
@@ -56,7 +58,7 @@ public class ConnectionSettings
     boolean verifyHostname;
     
     // SASL props
-    String saslMechs = System.getProperty("qpid.sasl_mechs", "PLAIN");
+    String saslMechs = System.getProperty("qpid.sasl_mechs", null);
     String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP");
     String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost");
     boolean useSASLEncryption;

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Fri Oct 21 14:42:12 2011
@@ -75,10 +75,7 @@ public class ServerDelegate extends Conn
 
         if (mechanism == null || mechanism.length() == 0)
         {
-            conn.connectionTune
-                (getChannelMax(),
-                 org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
-                 0, getHeartbeatMax());
+            tuneAuthorizedConnection(conn);
             return;
         }
 
@@ -97,8 +94,7 @@ public class ServerDelegate extends Conn
         }
         catch (SaslException e)
         {
-            conn.exception(e);
-            conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+            connectionAuthFailed(conn, e);
         }
     }
 
@@ -109,33 +105,52 @@ public class ServerDelegate extends Conn
         return ss;
     }
 
-    private void secure(Connection conn, byte[] response)
+    protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
     {
-        SaslServer ss = conn.getSaslServer();
         try
         {
             byte[] challenge = ss.evaluateResponse(response);
             if (ss.isComplete())
             {
                 ss.dispose();
-                conn.connectionTune
-                    (getChannelMax(),
-                     org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
-                     0, getHeartbeatMax());
-                conn.setAuthorizationID(ss.getAuthorizationID());
+                tuneAuthorizedConnection(conn);
             }
             else
             {
-                conn.connectionSecure(challenge);
+                connectionAuthContinue(conn, challenge);
             }
         }
         catch (SaslException e)
         {
-            conn.exception(e);
-            conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+            connectionAuthFailed(conn, e);
         }
     }
 
+    protected void connectionAuthFailed(final Connection conn, Exception e)
+    {
+        conn.exception(e);
+        conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+    }
+
+    protected void connectionAuthContinue(final Connection conn, byte[] challenge)
+    {
+        conn.connectionSecure(challenge);
+    }
+
+    protected void tuneAuthorizedConnection(final Connection conn)
+    {
+        conn.connectionTune
+            (getChannelMax(),
+             org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+             0, getHeartbeatMax());
+    }
+    
+    protected void secure(final Connection conn, final byte[] response)
+    {
+        final SaslServer ss = conn.getSaslServer();
+        secure(ss, conn, response);
+    }
+
     protected int getHeartbeatMax()
     {
         return 0xFFFF;
@@ -155,22 +170,7 @@ public class ServerDelegate extends Conn
     @Override
     public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
     {
-        int okChannelMax = ok.getChannelMax();
-        
-        if (okChannelMax > getChannelMax())
-        {
-            _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " +
-                    "client connectionTuneOk returned a channelMax (" + okChannelMax +
-                    ") above the servers offered limit (" + getChannelMax() +")");
 
-            //Due to the error we must forcefully close the connection without negotiation
-            conn.getSender().close();
-            return;
-        }
-
-        //0 means no implied limit, except available server resources
-        //(or that forced by protocol limitations [0xFFFF])
-        conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
     }
 
     @Override
@@ -200,4 +200,11 @@ public class ServerDelegate extends Conn
         ssn.sessionAttached(atc.getName());
         ssn.setState(Session.State.OPEN);
     }
+
+    protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)
+    {
+        //0 means no implied limit, except available server resources
+        //(or that forced by protocol limitations [0xFFFF])
+        conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
+    }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java Fri Oct 21 14:42:12 2011
@@ -30,6 +30,8 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.Session.State.NEW;
 import static org.apache.qpid.transport.Session.State.OPEN;
 import static org.apache.qpid.transport.Session.State.RESUMING;
+
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.transport.network.Frame;
 import static org.apache.qpid.transport.util.Functions.mod;
 import org.apache.qpid.transport.util.Logger;
@@ -42,10 +44,13 @@ import static org.apache.qpid.util.Seria
 import static org.apache.qpid.util.Strings.toUTF8;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Session
@@ -55,7 +60,6 @@ import java.util.concurrent.TimeUnit;
 
 public class Session extends SessionInvoker
 {
-
     private static final Logger log = Logger.get(Session.class);
 
     public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
@@ -89,7 +93,9 @@ public class Session extends SessionInvo
     private int channel;
     private SessionDelegate delegate;
     private SessionListener listener = new DefaultSessionListener();
-    private long timeout = 60000;
+    private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
+                                        Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
+                                                     ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
     private boolean autoSync = false;
 
     private boolean incomingInit;
@@ -117,7 +123,11 @@ public class Session extends SessionInvo
 
     private Thread resumer = null;
     private boolean transacted = false;
-    
+    private SessionDetachCode detachCode;
+    private final Object stateLock = new Object();
+
+    private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
+
     protected Session(Connection connection, Binary name, long expiry)
     {
         this(connection, new SessionDelegate(), name, expiry);
@@ -250,8 +260,11 @@ public class Session extends SessionInvo
 
     void resume()
     {
+        _failoverRequired.set(false);
         synchronized (commands)
         {
+            attach();
+
             for (int i = maxComplete + 1; lt(i, commandsOut); i++)
             {
                 Method m = commands[mod(i, commands.length)];
@@ -262,16 +275,48 @@ public class Session extends SessionInvo
                 }
                 else if (m instanceof MessageTransfer)
                 {
-                    ((MessageTransfer)m).getHeader().get(DeliveryProperties.class).setRedelivered(true);
+                	MessageTransfer xfr = (MessageTransfer)m;
+                	
+                	if (xfr.getHeader() != null)
+                	{
+                		if (xfr.getHeader().get(DeliveryProperties.class) != null)
+                		{
+                		   xfr.getHeader().get(DeliveryProperties.class).setRedelivered(true);
+                		}
+                		else
+                		{
+                			Struct[] structs = xfr.getHeader().getStructs();
+                			DeliveryProperties deliveryProps = new DeliveryProperties();
+                    		deliveryProps.setRedelivered(true);
+                    		
+                    		List<Struct> list = Arrays.asList(structs);
+                    		list.add(deliveryProps);
+                    		xfr.setHeader(new Header(list));
+                		}
+                		
+                	}
+                	else
+                	{
+                		DeliveryProperties deliveryProps = new DeliveryProperties();
+                		deliveryProps.setRedelivered(true);
+                		xfr.setHeader(new Header(deliveryProps));
+                	}
                 }
                 sessionCommandPoint(m.getId(), 0);
                 send(m);
             }
-           
+
             sessionCommandPoint(commandsOut, 0);
+
             sessionFlush(COMPLETED);
             resumer = Thread.currentThread();
             state = RESUMING;
+
+            if(isTransacted())
+            {
+                txSelect();
+            }
+
             listener.resumed(this);
             resumer = null;
         }
@@ -418,11 +463,14 @@ public class Session extends SessionInvo
 
         synchronized (commands)
         {
-            if (state == DETACHED || state == CLOSING)
+            if (state == DETACHED || state == CLOSING || state == CLOSED)
             {
                 return;
             }
-            sessionCompleted(copy, options);
+            if (copy.size() > 0)
+            {
+	            sessionCompleted(copy, options);
+            }
         }
     }
 
@@ -532,17 +580,6 @@ public class Session extends SessionInvo
     {
         if (m.getEncodedTrack() == Frame.L4)
         {
-            
-            if (state == DETACHED && transacted)
-            {
-                state = CLOSED;
-                delegate.closed(this);
-                connection.removeSession(this);
-                throw new SessionException(
-                        "Session failed over, possibly in the middle of a transaction. " +
-                        "Closing the session. Any Transaction in progress will be rolledback.");
-            }
-            
             if (m.hasPayload())
             {
                 acquireCredit();
@@ -562,11 +599,12 @@ public class Session extends SessionInvo
                 if (state != OPEN && state != CLOSED && state != CLOSING)
                 {
                     Thread current = Thread.currentThread();
-                    if (!current.equals(resumer))
+                    if (!current.equals(resumer) )
                     {
                         Waiter w = new Waiter(commands, timeout);
                         while (w.hasTime() && (state != OPEN && state != CLOSED))
                         {
+                            checkFailoverRequired("Command was interrupted because of failover, before being sent");
                             w.await();
                         }
                     }
@@ -635,6 +673,7 @@ public class Session extends SessionInvo
                                 }
                             }
                         }
+                        checkFailoverRequired("Command was interrupted because of failover, before being sent");
                         w.await();
                     }
                 }
@@ -661,7 +700,12 @@ public class Session extends SessionInvo
                 {
                     sessionCommandPoint(0, 0);
                 }
-                if ((!closing && !transacted && m instanceof MessageTransfer) || m.hasCompletionListener())
+                
+                boolean replayTransfer = !closing && !transacted &&
+                                         m instanceof MessageTransfer &&
+                                         ! m.isUnreliable();
+                
+                if ((replayTransfer) || m.hasCompletionListener())
                 {
                     commands[mod(next, commands.length)] = m;
                     commandBytes += m.getBodySize();
@@ -724,6 +768,14 @@ public class Session extends SessionInvo
         }
     }
 
+    private void checkFailoverRequired(String message)
+    {
+        if (_failoverRequired.get())
+        {
+            throw new SessionException(message);
+        }
+    }
+
     protected boolean shouldIssueFlush(int next)
     {
         return (next % 65536) == 0;
@@ -749,6 +801,7 @@ public class Session extends SessionInvo
             Waiter w = new Waiter(commands, timeout);
             while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
             {
+                checkFailoverRequired("Session sync was interrupted by failover.");
                 log.debug("%s   waiting for[%d]: %d, %s", this, point, maxComplete, commands);
                 w.await();
             }
@@ -809,13 +862,6 @@ public class Session extends SessionInvo
         }
     }
 
-    private ConnectionClose close = null;
-
-    void closeCode(ConnectionClose close)
-    {
-        this.close = close;
-    }
-
     ExecutionException getException()
     {
         synchronized (results)
@@ -866,6 +912,7 @@ public class Session extends SessionInvo
                 Waiter w = new Waiter(this, timeout);
                 while (w.hasTime() && state != CLOSED && !isDone())
                 {
+                    checkFailoverRequired("Operation was interrupted by failover.");
                     log.debug("%s waiting for result: %s", Session.this, this);
                     w.await();
                 }
@@ -877,7 +924,12 @@ public class Session extends SessionInvo
             }
             else if (state == CLOSED)
             {
-                throw new SessionException(getException());
+                ExecutionException ex = getException();
+                if(ex == null)
+                {
+                    throw new SessionClosedException();
+                }
+                throw new SessionException(ex);
             }
             else
             {
@@ -926,16 +978,29 @@ public class Session extends SessionInvo
 
     public void close()
     {
+        if (log.isDebugEnabled())
+        {
+            log.debug("Closing [%s] in state [%s]", this, state);
+        }
         synchronized (commands)
         {
-            state = CLOSING;
-            setClose(true);
-            sessionRequestTimeout(0);
-            sessionDetach(name.getBytes());
-
-            awaitClose();
- 
-
+            switch(state)
+            {
+                case DETACHED:
+                    state = CLOSED;
+                    delegate.closed(this);
+                    connection.removeSession(this);
+                    listener.closed(this);
+                    break;
+                case CLOSED:
+                    break;
+                default:
+                    state = CLOSING;
+                    setClose(true);
+                    sessionRequestTimeout(0);
+                    sessionDetach(name.getBytes());
+                    awaitClose();
+            }
         }
     }
 
@@ -944,6 +1009,7 @@ public class Session extends SessionInvo
         Waiter w = new Waiter(commands, timeout);
         while (w.hasTime() && state != CLOSED)
         {
+            checkFailoverRequired("close() was interrupted by failover.");
             w.await();
         }
 
@@ -995,7 +1061,8 @@ public class Session extends SessionInvo
 
         if(state == CLOSED)
         {
-            connection.removeSession(this);            
+            connection.removeSession(this);   
+            listener.closed(this);
         }
     }
 
@@ -1008,13 +1075,78 @@ public class Session extends SessionInvo
     {
         return String.format("ssn:%s", name);
     }
-    
+
     public void setTransacted(boolean b) {
         this.transacted = b;
     }
-    
+
     public boolean isTransacted(){
         return transacted;
     }
-    
+
+    public void setDetachCode(SessionDetachCode dtc)
+    {
+        this.detachCode = dtc;
+    }
+
+    public SessionDetachCode getDetachCode()
+    {
+        return this.detachCode;
+    }
+
+    public void awaitOpen()
+    {
+        switch (state)
+        {
+        case NEW:
+            synchronized(stateLock)
+            {
+                Waiter w = new Waiter(stateLock, timeout);
+                while (w.hasTime() && state == NEW)
+                {
+                    checkFailoverRequired("Session opening was interrupted by failover.");
+                    w.await();
+                }
+            }
+
+            if (state != OPEN)
+            {
+                throw new SessionException("Timed out waiting for Session to open");
+            }
+            break;
+        case DETACHED:
+        case CLOSING:
+        case CLOSED:
+            throw new SessionException("Session closed");
+        default :
+            break;
+        }
+    }
+
+    public Object getStateLock()
+    {
+        return stateLock;
+    }
+
+    protected void notifyFailoverRequired()
+    {
+        //ensure any operations waiting are aborted to
+        //prevent them waiting for timeout for 60 seconds
+        //and possibly preventing failover proceeding
+        _failoverRequired.set(true);
+        synchronized (commands)
+        {
+            commands.notifyAll();
+        }
+        synchronized (results)
+        {
+            for (ResultFuture<?> result : results.values())
+            {
+                synchronized(result)
+                {
+                    result.notifyAll();
+                }
+            }
+        }
+    }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Fri Oct 21 14:42:12 2011
@@ -76,6 +76,10 @@ public class SessionDelegate
     @Override public void sessionAttached(Session ssn, SessionAttached atc)
     {
         ssn.setState(Session.State.OPEN);
+        synchronized (ssn.getStateLock())
+        {
+            ssn.getStateLock().notifyAll();
+        }
     }
 
     @Override public void sessionTimeout(Session ssn, SessionTimeout t)
@@ -202,11 +206,19 @@ public class SessionDelegate
 
     public void closed(Session session)
     {
-        log.warn("CLOSED: [%s]", session);
+        log.debug("CLOSED: [%s]", session);
+        synchronized (session.getStateLock())
+        {
+            session.getStateLock().notifyAll();
+        }
     }
 
     public void detached(Session session)
     {
-        log.warn("DETACHED: [%s]", session);
+        log.debug("DETACHED: [%s]", session);
+        synchronized (session.getStateLock())
+        {
+            session.getStateLock().notifyAll();
+        }
     }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java Fri Oct 21 14:42:12 2011
@@ -63,6 +63,7 @@ abstract class AbstractEncoder implement
         ENCODINGS.put(Double.class, Type.DOUBLE);
         ENCODINGS.put(Character.class, Type.CHAR);
         ENCODINGS.put(byte[].class, Type.VBIN32);
+        ENCODINGS.put(UUID.class, Type.UUID);
     }
 
     private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>()

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java Fri Oct 21 14:42:12 2011
@@ -20,19 +20,11 @@
  */
 package org.apache.qpid.transport.network;
 
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.ConnectionSettings;
-
+/**
+ * A network transport is responsible for the establishment of network connections.
+ * NetworkTransport implementations are pluggable via the {@link Transport} class.
+ */
 public interface NetworkTransport
 {
-    public void init(ConnectionSettings settings);
-    
-    public Sender<ByteBuffer> sender();
-    
-    public void receiver(Receiver<ByteBuffer> delegate);    
-    
     public void close();
-}
\ No newline at end of file
+}

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Fri Oct 21 14:42:12 2011
@@ -21,57 +21,49 @@
 package org.apache.qpid.transport.network.io;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
+import java.net.*;
 import java.nio.ByteBuffer;
 
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.NetworkTransport;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocketFactory;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
 import org.apache.qpid.transport.util.Logger;
 
-public class IoNetworkTransport implements NetworkTransport, IoContext
+public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
 {
-    static
-    {
-        org.apache.mina.common.ByteBuffer.setAllocator
-            (new org.apache.mina.common.SimpleByteBufferAllocator());
-        org.apache.mina.common.ByteBuffer.setUseDirectBuffers
-            (Boolean.getBoolean("amqj.enableDirectBuffers"));
-    }
 
-    private static final Logger log = Logger.get(IoNetworkTransport.class);
+    private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
 
-    private Socket socket;
-    private Sender<ByteBuffer> sender;
-    private IoReceiver receiver;
-    private long timeout = 60000; 
-    private ConnectionSettings settings;    
-    
-    public void init(ConnectionSettings settings)
+    private Socket _socket;
+    private IoNetworkConnection _connection;
+    private long _timeout = 60000;
+    private AcceptingThread _acceptor;
+
+    public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext)
     {
+        int sendBufferSize = settings.getWriteBufferSize();
+        int receiveBufferSize = settings.getReadBufferSize();
+
         try
         {
-            this.settings = settings;
-            InetAddress address = InetAddress.getByName(settings.getHost());
-            socket = new Socket();
-            socket.setReuseAddress(true);
-            socket.setTcpNoDelay(settings.isTcpNodelay());
+            _socket = new Socket();
+            _socket.setReuseAddress(true);
+            _socket.setTcpNoDelay(settings.isTcpNodelay());
+            _socket.setSendBufferSize(sendBufferSize);
+            _socket.setReceiveBufferSize(receiveBufferSize);
 
-            log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
-            log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+            LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize());
+            LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize());
 
-            socket.setSendBufferSize(settings.getWriteBufferSize());
-            socket.setReceiveBufferSize(settings.getReadBufferSize());
-
-            log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
-            log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+            InetAddress address = InetAddress.getByName(settings.getHost());
 
-            socket.connect(new InetSocketAddress(address, settings.getPort()));
+            _socket.connect(new InetSocketAddress(address, settings.getPort()));
         }
         catch (SocketException e)
         {
@@ -81,36 +73,159 @@ public class IoNetworkTransport implemen
         {
             throw new TransportException("Error connecting to broker", e);
         }
-    }
 
-    public void receiver(Receiver<ByteBuffer> delegate)
-    {
-        receiver = new IoReceiver(this, delegate,
-                2*settings.getReadBufferSize() , timeout);
-    }
+        try
+        {
+            _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout);
+            _connection.start();
+        }
+        catch(Exception e)
+        {
+            try
+            {
+                _socket.close();
+            }
+            catch(IOException ioe)
+            {
+                //ignored, throw based on original exception
+            }
 
-    public Sender<ByteBuffer> sender()
-    {
-        return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
+            throw new TransportException("Error creating network connection", e);
+        }
+
+        return _connection;
     }
 
     public void close()
     {
-        
+        if(_connection != null)
+        {
+            _connection.close();
+        }
+        if(_acceptor != null)
+        {
+            _acceptor.close();
+        }
     }
 
-    public Sender<ByteBuffer> getSender()
+    public NetworkConnection getConnection()
     {
-        return sender;
+        return _connection;
     }
 
-    public IoReceiver getReceiver()
+    public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext)
     {
-        return receiver;
+
+        try
+        {
+            _acceptor = new AcceptingThread(config, factory, sslContext);
+
+            _acceptor.start();
+        }
+        catch (IOException e)
+        {
+            throw new TransportException("Unable to start server socket", e);
+        }
+
+
     }
 
-    public Socket getSocket()
+    private class AcceptingThread extends Thread
     {
-        return socket;
+        private NetworkTransportConfiguration _config;
+        private ProtocolEngineFactory _factory;
+        private SSLContext _sslContent;
+        private ServerSocket _serverSocket;
+
+        private AcceptingThread(NetworkTransportConfiguration config,
+                                ProtocolEngineFactory factory,
+                                SSLContext sslContext)
+                throws IOException
+        {
+            _config = config;
+            _factory = factory;
+            _sslContent = sslContext;
+
+            InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort());
+
+            if(sslContext == null)
+            {
+                _serverSocket = new ServerSocket();
+            }
+            else
+            {
+                SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory();
+                _serverSocket = socketFactory.createServerSocket();
+            }
+
+            _serverSocket.bind(address);
+            _serverSocket.setReuseAddress(true);
+
+
+        }
+
+
+        /**
+            Close the underlying ServerSocket if it has not already been closed.
+         */
+        public void close()
+        {
+            if (!_serverSocket.isClosed())
+            {
+                try
+                {
+                    _serverSocket.close();
+                }
+                catch (IOException e)
+                {
+                    throw new TransportException(e);
+                }
+            }
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                while (true)
+                {
+                    try
+                    {
+                        Socket socket = _serverSocket.accept();
+                        socket.setTcpNoDelay(_config.getTcpNoDelay());
+
+                        final Integer sendBufferSize = _config.getSendBufferSize();
+                        final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+                        socket.setSendBufferSize(sendBufferSize);
+                        socket.setReceiveBufferSize(receiveBufferSize);
+
+                        ProtocolEngine engine = _factory.newProtocolEngine();
+
+                        NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout);
+
+
+                        engine.setNetworkConnection(connection, connection.getSender());
+
+                        connection.start();
+
+
+                    }
+                    catch(RuntimeException e)
+                    {
+                        LOGGER.error(e, "Error in Acceptor thread " + _config.getPort());
+                    }
+                }
+            }
+            catch (IOException e)
+            {
+                LOGGER.debug(e, "SocketException - no new connections will be accepted on port "
+                        + _config.getPort());
+            }
+        }
+
+
     }
+
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Fri Oct 21 14:42:12 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.transport.network.io;
 
+import org.apache.qpid.common.Closeable;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.TransportException;
@@ -37,56 +38,77 @@ import java.util.concurrent.atomic.Atomi
  *
  */
 
-final class IoReceiver implements Runnable
+final class IoReceiver implements Runnable, Closeable
 {
 
     private static final Logger log = Logger.get(IoReceiver.class);
 
-    private final IoContext ioCtx;
     private final Receiver<ByteBuffer> receiver;
     private final int bufferSize;
     private final Socket socket;
     private final long timeout;
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final Thread receiverThread;
-    private final boolean shutdownBroken =
-        ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
+    private static final boolean shutdownBroken;
+    static
+    {
+        String osName = System.getProperty("os.name");
+        shutdownBroken = osName == null ? false : osName.matches("(?i).*windows.*");
+    }
 
-    public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver,
-                      int bufferSize, long timeout)
+    public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
     {
-        this.ioCtx = ioCtx;
         this.receiver = receiver;
         this.bufferSize = bufferSize;
-        this.socket = ioCtx.getSocket();
+        this.socket = socket;
         this.timeout = timeout;
 
         try
         {
+            //Create but deliberately don't start the thread.
             receiverThread = Threading.getThreadFactory().createThread(this);
         }
         catch(Exception e)
         {
-            throw new Error("Error creating IOReceiver thread",e);
+            throw new RuntimeException("Error creating IOReceiver thread",e);
         }
         receiverThread.setDaemon(true);
         receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+    }
+
+    public void initiate()
+    {
         receiverThread.start();
     }
 
+    public void close()
+    {
+        close(false);
+    }
+
     void close(boolean block)
     {
         if (!closed.getAndSet(true))
         {
             try
             {
-                if (shutdownBroken)
+                try
                 {
-                   socket.close();
+                    if (shutdownBroken)
+                    {
+                       socket.close();
+                    }
+                    else
+                    {
+                        socket.shutdownInput();
+                    }
                 }
-                else
+                catch(SocketException se)
                 {
-                    socket.shutdownInput();
+                    if(!socket.isClosed() && !socket.isInputShutdown())
+                    {
+                        throw se;
+                    }
                 }
                 if (block && Thread.currentThread() != receiverThread)
                 {
@@ -105,6 +127,7 @@ final class IoReceiver implements Runnab
             {
                 throw new TransportException(e);
             }
+
         }
     }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org