You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/03/12 16:41:48 UTC

svn commit: r1666224 [6/7] - in /qpid/trunk: ./ qpid/ qpid/cpp/src/ qpid/cpp/src/qpid/broker/ qpid/cpp/src/qpid/management/ qpid/cpp/src/tests/ qpid/java/ qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ qpid/java/bdbstore/src/...

Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java Thu Mar 12 15:41:46 2015
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.AuthenticationProvider;
 import org.apache.qpid.server.model.Broker;
@@ -58,7 +59,9 @@ public class HttpManagementTest extends
         when(_broker.getModel()).thenReturn(objectFactory.getModel());
         when(_broker.getCategoryClass()).thenReturn(Broker.class);
         when(_broker.getEventLogger()).thenReturn(mock(EventLogger.class));
-        when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class));
+        TaskExecutor taskExecutor = new TaskExecutorImpl();
+        taskExecutor.start();
+        when(_broker.getTaskExecutor()).thenReturn(taskExecutor);
 
         Map<String, Object> attributes = new HashMap<String, Object>();
         attributes.put(HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, false);

Modified: qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java Thu Mar 12 15:41:46 2015
@@ -32,6 +32,8 @@ import java.util.Set;
 import javax.management.InstanceAlreadyExistsException;
 import javax.management.JMException;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -105,7 +107,7 @@ public class JMXManagementPluginImpl
     }
 
     @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
-    private void doStart() throws JMException, IOException
+    private ListenableFuture<Void> doStart() throws JMException, IOException
     {
         _allowPortActivation = true;
         Broker<?> broker = getBroker();
@@ -125,7 +127,8 @@ public class JMXManagementPluginImpl
                 registryPort.setPortManager(this);
                 if(port.getState() != State.ACTIVE)
                 {
-                    port.start();
+                    // TODO - RG
+                    port.startAsync();
                 }
 
             }
@@ -135,7 +138,7 @@ public class JMXManagementPluginImpl
                 connectorPort.setPortManager(this);
                 if(port.getState() != State.ACTIVE)
                 {
-                    port.start();
+                    port.startAsync();
                 }
 
             }
@@ -175,6 +178,7 @@ public class JMXManagementPluginImpl
         _objectRegistry.start();
         setState(State.ACTIVE);
         _allowPortActivation = false;
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Thu Mar 12 15:41:46 2015
@@ -53,7 +53,7 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.transport.AcceptingTransport;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
@@ -81,9 +81,7 @@ class WebSocketProvider implements Accep
         _supported = supported;
         _defaultSupportedProtocolReply = defaultSupportedProtocolReply;
         _factory = new MultiVersionProtocolEngineFactory(
-                        _port.getParent(Broker.class), null,
-                        _port.getWantClientAuth(),
-                        _port.getNeedClientAuth(),
+                        _port.getParent(Broker.class),
                         _supported,
                         _defaultSupportedProtocolReply,
                         _port,
@@ -242,7 +240,7 @@ class WebSocketProvider implements Accep
         }
     }
 
-    private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer>
+    private class ConnectionWrapper implements NetworkConnection, ByteBufferSender
     {
         private final WebSocket.Connection _connection;
         private final SocketAddress _localAddress;
@@ -261,7 +259,7 @@ class WebSocketProvider implements Accep
         }
 
         @Override
-        public Sender<ByteBuffer> getSender()
+        public ByteBufferSender getSender()
         {
             return this;
         }
@@ -271,12 +269,6 @@ class WebSocketProvider implements Accep
         {
 
         }
-
-        @Override
-        public void setIdleTimeout(final int i)
-        {
-
-        }
 
         @Override
         public void send(final ByteBuffer msg)

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Mar 12 15:41:46 2015
@@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit;
 import javax.jms.JMSException;
 import javax.jms.XASession;
 
-import org.apache.qpid.transport.Receiver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +59,7 @@ import org.apache.qpid.jms.ChannelLimitR
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.OutgoingNetworkTransport;
@@ -522,12 +522,12 @@ public class AMQConnectionDelegate_8_0 i
     }
 
 
-    private static class ReceiverClosedWaiter implements Receiver<ByteBuffer>
+    private static class ReceiverClosedWaiter implements ByteBufferReceiver
     {
         private final CountDownLatch _closedWatcher;
-        private final Receiver<ByteBuffer> _receiver;
+        private final ByteBufferReceiver _receiver;
 
-        public ReceiverClosedWaiter(Receiver<ByteBuffer> receiver)
+        public ReceiverClosedWaiter(ByteBufferReceiver receiver)
         {
             _receiver = receiver;
             _closedWatcher = new CountDownLatch(1);

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Thu Mar 12 15:41:46 2015
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.client.handler;
 
-import java.nio.ByteBuffer;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +33,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.TransportException;
 
 public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
@@ -95,7 +93,7 @@ public class ConnectionCloseMethodHandle
         }
         finally
         {
-            Sender<ByteBuffer> sender = session.getSender();
+            ByteBufferSender sender = session.getSender();
 
             if (error != null)
             {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java Thu Mar 12 15:41:46 2015
@@ -30,7 +30,7 @@ import org.apache.qpid.client.failover.C
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.ConnectionRedirectBody;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.TransportException;
 
 public class ConnectionRedirectMethodHandler implements StateAwareMethodListener<ConnectionRedirectBody>
@@ -72,7 +72,7 @@ public class ConnectionRedirectMethodHan
 
         session.notifyError(new ConnectionRedirectException(host,port));
 
-        Sender<ByteBuffer> sender = session.getSender();
+        ByteBufferSender sender = session.getSender();
 
         // Close the open TCP connection
         try

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Mar 12 15:41:46 2015
@@ -66,8 +66,8 @@ import org.apache.qpid.protocol.AMQMetho
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.util.BytesDataOutput;
@@ -179,7 +179,7 @@ public class AMQProtocolHandler implemen
 
 
     private NetworkConnection _network;
-    private Sender<ByteBuffer> _sender;
+    private ByteBufferSender _sender;
     private long _lastReadTime = System.currentTimeMillis();
     private long _lastWriteTime = System.currentTimeMillis();
     private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
@@ -316,6 +316,11 @@ public class AMQProtocolHandler implemen
         }
     }
 
+    @Override
+    public void encryptedTransport()
+    {
+    }
+
     public void readerIdle()
     {
         _logger.debug("Protocol Session [" + this + "] idle: reader");
@@ -892,7 +897,7 @@ public class AMQProtocolHandler implemen
         setNetworkConnection(network, network.getSender());
     }
 
-    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
     {
         _network = network;
         _sender = sender;
@@ -910,7 +915,7 @@ public class AMQProtocolHandler implemen
         return _lastWriteTime;
     }
 
-    protected Sender<ByteBuffer> getSender()
+    protected ByteBufferSender getSender()
     {
         return _sender;
     }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Mar 12 15:41:46 2015
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.client.protocol;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -52,8 +51,8 @@ import org.apache.qpid.framing.ProtocolI
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
 
 /**
@@ -382,7 +381,7 @@ public class AMQProtocolSession implemen
         }
     }
 
-    public Sender<ByteBuffer> getSender()
+    public ByteBufferSender getSender()
     {
         return _protocolHandler.getSender();
     }
@@ -471,7 +470,7 @@ public class AMQProtocolSession implemen
         _protocolHandler.propagateExceptionToAllWaiters(error);
     }
 
-    public void setSender(Sender<java.nio.ByteBuffer> sender)
+    public void setSender(ByteBufferSender sender)
     {
         // No-op, interface munging
     }

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Thu Mar 12 15:41:46 2015
@@ -666,11 +666,11 @@ public class AMQSession_0_10Test extends
         }
     }
 
-    class MockSender implements Sender<ProtocolEvent>
+    class MockSender implements ProtocolEventSender
     {
         private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>();
 
-        public void setIdleTimeout(int i)
+        private void setIdleTimeout(int i)
         {
         }
 

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java Thu Mar 12 15:41:46 2015
@@ -22,16 +22,11 @@ package org.apache.qpid.client.transport
 
 import java.nio.ByteBuffer;
 
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 
-public class MockSender implements Sender<ByteBuffer>
+public class MockSender implements ByteBufferSender
 {
 
-    public void setIdleTimeout(int i)
-    {
-
-    }
-
     public void send(ByteBuffer msg)
     {
 

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java Thu Mar 12 15:41:46 2015
@@ -20,18 +20,18 @@
  */
 package org.apache.qpid.client.transport;
 
-import java.security.Principal;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
-
 import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.security.Principal;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 /**
  * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
@@ -147,7 +147,7 @@ public class TestNetworkConnection imple
         _remoteAddress = address;
     }
 
-    public Sender<ByteBuffer> getSender()
+    public ByteBufferSender getSender()
     {
         return _sender;
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Thu Mar 12 15:41:46 2015
@@ -20,12 +20,13 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+
 public interface AMQBody
 {
     public  byte getFrameType();
@@ -39,4 +40,6 @@ public interface AMQBody
     public void writePayload(DataOutput buffer) throws IOException;
     
     void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
+
+    long writePayload(ByteBufferSender sender) throws IOException;
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.framing;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.qpid.transport.ByteBufferSender;
+
 
 /**
  * A data block represents something that has a size in bytes and the ability to write itself to a byte
@@ -44,4 +46,6 @@ public abstract class AMQDataBlock imple
      */
     public abstract void writePayload(DataOutput buffer) throws IOException;
 
+    public abstract long writePayload(ByteBufferSender sender) throws IOException;
+
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Thu Mar 12 15:41:46 2015
@@ -22,6 +22,10 @@ package org.apache.qpid.framing;
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
 
 public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
 {
@@ -57,6 +61,25 @@ public class AMQFrame extends AMQDataBlo
         buffer.writeByte(FRAME_END_BYTE);
     }
 
+    private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { FRAME_END_BYTE };
+
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        byte[] frameHeader = new byte[7];
+        BytesDataOutput buffer = new BytesDataOutput(frameHeader);
+
+        buffer.writeByte(_bodyFrame.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, _channel);
+        EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
+        sender.send(ByteBuffer.wrap(frameHeader));
+
+        long size = 8 + _bodyFrame.writePayload(sender);
+
+        sender.send(ByteBuffer.wrap(FRAME_END_BYTE_ARRAY));
+        return size;
+    }
+
     public final int getChannel()
     {
         return _channel;

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Thu Mar 12 15:41:46 2015
@@ -24,12 +24,15 @@ package org.apache.qpid.framing;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
 
 public abstract class AMQMethodBodyImpl implements AMQMethodBody
 {
@@ -105,6 +108,16 @@ public abstract class AMQMethodBodyImpl
         writeMethodPayload(buffer);
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        final int size = getSize();
+        byte[] bytes = new byte[size];
+        BytesDataOutput buffer = new BytesDataOutput(bytes);
+        writePayload(buffer);
+        sender.send(ByteBuffer.wrap(bytes));
+        return size;
+    }
 
     protected int getSizeOf(AMQShortString string)
     {

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Thu Mar 12 15:41:46 2015
@@ -23,10 +23,14 @@ package org.apache.qpid.framing;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
+
 public class BasicContentHeaderProperties
 {
     //persistent & non-persistent constants, values as per JMS DeliveryMode
@@ -314,6 +318,26 @@ public class BasicContentHeaderPropertie
         }
     }
 
+
+    public long writePropertyListPayload(final ByteBufferSender sender) throws IOException
+    {
+        if(useEncodedForm())
+        {
+            sender.send(ByteBuffer.wrap(_encodedForm));
+            return _encodedForm.length;
+        }
+        else
+        {
+            int propertyListSize = getPropertyListSize();
+            byte[] data = new byte[propertyListSize];
+            BytesDataOutput out = new BytesDataOutput(data);
+            writePropertyListPayload(out);
+            sender.send(ByteBuffer.wrap(data));
+            return propertyListSize;
+        }
+
+    }
+
     public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
     {
         _propertyFlags = propertyFlags;

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.framing;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.qpid.transport.ByteBufferSender;
+
 public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
 {
 
@@ -58,6 +60,17 @@ public class CompositeAMQDataBlock exten
         }
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        long size = 0l;
+        for (int i = 0; i < _blocks.length; i++)
+        {
+            size += _blocks[i].writePayload(sender);
+        }
+        return size;
+    }
+
     public String toString()
     {
         if (_blocks == null)

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Thu Mar 12 15:41:46 2015
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
 
 public class ContentBody implements AMQBody
 {
@@ -72,6 +73,20 @@ public class ContentBody implements AMQB
         session.contentBodyReceived(channelId, this);
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        if(_payload != null)
+        {
+            sender.send(ByteBuffer.wrap(_payload));
+            return _payload.length;
+        }
+        else
+        {
+            return 0l;
+        }
+    }
+
     public byte[] getPayload()
     {
         return _payload;
@@ -133,6 +148,23 @@ public class ContentBody implements AMQB
             }
         }
 
+        @Override
+        public long writePayload(final ByteBufferSender sender) throws IOException
+        {
+            if(_buf.hasArray())
+            {
+                sender.send(ByteBuffer.wrap(_buf.array(), _buf.arrayOffset() + _offset, _length));
+            }
+            else
+            {
+                ByteBuffer buf = _buf.duplicate();
+
+                buf.position(_offset);
+                buf.limit(_offset+_length);
+                sender.send(buf);
+            }
+            return _length;
+        }
 
         public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
         {

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Thu Mar 12 15:41:46 2015
@@ -24,10 +24,13 @@ import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
 
 public class ContentHeaderBody implements AMQBody
 {
@@ -98,6 +101,19 @@ public class ContentHeaderBody implement
         _properties.writePropertyListPayload(buffer);
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        byte[] data = new byte[14];
+        BytesDataOutput buffer = new BytesDataOutput(data);
+        EncodingUtils.writeUnsignedShort(buffer, CLASS_ID);
+        EncodingUtils.writeUnsignedShort(buffer, 0);
+        buffer.writeLong(_bodySize);
+        EncodingUtils.writeUnsignedShort(buffer, _properties.getPropertyFlags());
+        sender.send(ByteBuffer.wrap(data));
+        return 14 + _properties.writePropertyListPayload(sender);
+    }
+
     public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
             throws AMQException
     {

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Thu Mar 12 15:41:46 2015
@@ -27,6 +27,7 @@ import java.io.IOException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
 
 public class HeartbeatBody implements AMQBody
 {
@@ -61,6 +62,12 @@ public class HeartbeatBody implements AM
     {
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        return 0l;
+    }
+
     public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
             throws AMQException
     {

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Thu Mar 12 15:41:46 2015
@@ -23,11 +23,14 @@ package org.apache.qpid.framing;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
 
 public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
 {
@@ -88,6 +91,16 @@ public class ProtocolInitiation extends
         buffer.write(_protocolMinor);
     }
 
+    @Override
+    public long writePayload(final ByteBufferSender sender) throws IOException
+    {
+        byte[] data = new byte[8];
+        BytesDataOutput out = new BytesDataOutput(data);
+        writePayload(out);
+        sender.send(ByteBuffer.wrap(data));
+        return 8l;
+    }
+
     public boolean equals(Object o)
     {
         if (!(o instanceof ProtocolInitiation))

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java Thu Mar 12 15:41:46 2015
@@ -26,9 +26,7 @@ import org.apache.qpid.framing.ContentBo
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.HeartbeatBody;
 import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.transport.Sender;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.ByteBufferSender;
 
 
 /**
@@ -56,6 +54,6 @@ public interface AMQVersionAwareProtocol
     public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
 
 
-    public void setSender(Sender<ByteBuffer> sender);
+    public void setSender(ByteBufferSender sender);
 
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Thu Mar 12 15:41:46 2015
@@ -20,19 +20,18 @@
  */
 package org.apache.qpid.protocol;
 
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import java.net.SocketAddress;
+
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.TransportActivity;
 
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
 /**
  * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
  * decodes it and then process the result.
  */
-public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity
+public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity
 {
    // Returns the remote address of the NetworkDriver
    SocketAddress getRemoteAddress();
@@ -56,7 +55,8 @@ public interface ProtocolEngine extends
    // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
    void readerIdle();
 
+   void encryptedTransport();
 
-   public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+   public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender);
 
-}
\ No newline at end of file
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java Thu Mar 12 15:41:46 2015
@@ -26,11 +26,11 @@ package org.apache.qpid.transport;
  *
  */
 
-public interface Binding<E,T>
+public interface Binding<E>
 {
 
-    E endpoint(Sender<T> sender);
+    E endpoint(ByteBufferSender sender);
 
-    Receiver<T> receiver(E endpoint);
+    ByteBufferReceiver receiver(E endpoint);
 
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Thu Mar 12 15:41:46 2015
@@ -153,11 +153,9 @@ public class ClientDelegate extends Conn
                               maxFrameSize,
                               actualHeartbeatInterval);
 
-        int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
         conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
         conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
         conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize);
-        conn.setIdleTimeout(idleTimeout);
 
         int channelMax = tune.getChannelMax();
         //0 means no implied limit, except available server resources

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Thu Mar 12 15:41:46 2015
@@ -27,7 +27,6 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.Connection.State.OPENING;
 
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -68,7 +67,7 @@ import org.apache.qpid.util.Strings;
  */
 
 public class Connection extends ConnectionInvoker
-    implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
+    implements ProtocolEventReceiver, ProtocolEventSender
 {
 
     protected static final Logger log = Logger.get(Connection.class);
@@ -120,7 +119,7 @@ public class Connection extends Connecti
     private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY;
 
     private ConnectionDelegate delegate;
-    private Sender<ProtocolEvent> sender;
+    private ProtocolEventSender sender;
 
     final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
     final private Map<Integer,Session> channels = new ConcurrentHashMap<Integer,Session>();
@@ -163,15 +162,14 @@ public class Connection extends Connecti
         return Collections.unmodifiableList(listeners);
     }
 
-    public Sender<ProtocolEvent> getSender()
+    public ProtocolEventSender getSender()
     {
         return sender;
     }
 
-    public void setSender(Sender<ProtocolEvent> sender)
+    public void setSender(ProtocolEventSender sender)
     {
         this.sender = sender;
-        sender.setIdleTimeout(idleTimeout);
     }
 
     protected void setState(State state)
@@ -248,7 +246,7 @@ public class Connection extends Connecti
             OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
             final InputHandler inputHandler = new InputHandler(new Assembler(this));
             addFrameSizeObserver(inputHandler);
-            Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler);
+            ByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
             if(secureReceiver instanceof ConnectionListener)
             {
                 addConnectionListener((ConnectionListener)secureReceiver);
@@ -260,7 +258,7 @@ public class Connection extends Connecti
             setRemoteAddress(_networkConnection.getRemoteAddress());
             setLocalAddress(_networkConnection.getLocalAddress());
 
-            final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender());
+            final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender());
             if(secureSender instanceof ConnectionListener)
             {
                 addConnectionListener((ConnectionListener)secureSender);
@@ -425,7 +423,7 @@ public class Connection extends Connecti
         {
             log.debug("SEND: [%s] %s", this, event);
         }
-        Sender<ProtocolEvent> s = sender;
+        ProtocolEventSender s = sender;
         if (s == null)
         {
             throw new ConnectionException("connection closed");
@@ -439,7 +437,7 @@ public class Connection extends Connecti
         {
             log.debug("FLUSH: [%s]", this);
         }
-        final Sender<ProtocolEvent> theSender = sender;
+        final ProtocolEventSender theSender = sender;
         if(theSender != null)
         {
             theSender.flush();
@@ -631,6 +629,12 @@ public class Connection extends Connecti
         close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface.");
     }
 
+
+    protected void sendConnectionClose(ConnectionCloseCode replyCode, String replyText, Option ... _options)
+    {
+        connectionClose(replyCode, replyText, _options);
+    }
+
     public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
     {
         synchronized (lock)
@@ -690,20 +694,6 @@ public class Connection extends Connecti
         }
     }
 
-    public void setIdleTimeout(int i)
-    {
-        idleTimeout = i;
-        if (sender != null)
-        {
-            sender.setIdleTimeout(i);
-        }
-    }
-
-    public int getIdleTimeout()
-    {
-        return idleTimeout;
-    }
-
     public String getUserID()
     {
         return userID;

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Thu Mar 12 15:41:46 2015
@@ -126,8 +126,11 @@ public class ServerDelegate extends Conn
 
     protected void connectionAuthFailed(final Connection conn, Exception e)
     {
-        conn.exception(e);
-        conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+        if (e != null)
+        {
+            conn.exception(e);
+        }
+        conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e == null ? "Authentication failed" : e.getMessage());
     }
 
     protected void connectionAuthContinue(final Connection conn, byte[] challenge)

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java Thu Mar 12 15:41:46 2015
@@ -20,12 +20,6 @@
  */
 package org.apache.qpid.transport.codec;
 
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.Type;
-
-import org.apache.qpid.transport.Xid;
 import static org.apache.qpid.transport.util.Functions.lsb;
 
 import java.io.UnsupportedEncodingException;
@@ -36,6 +30,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.Type;
+import org.apache.qpid.transport.Xid;
+
 
 /**
  * AbstractEncoder
@@ -43,7 +43,7 @@ import java.util.UUID;
  * @author Rafael H. Schloming
  */
 
-abstract class AbstractEncoder implements Encoder
+public abstract class AbstractEncoder implements Encoder
 {
 
     private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>();

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java Thu Mar 12 15:41:46 2015
@@ -360,8 +360,4 @@ public final class BBEncoder extends Abs
 		}
 	}
 
-	public void writeMagicNumber()
-	{
-		out.put("AM2".getBytes());
-	}	
-}
\ No newline at end of file
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java Thu Mar 12 15:41:46 2015
@@ -20,13 +20,14 @@
  */
 package org.apache.qpid.transport.codec;
 
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+
 
 /**
  * Encoder interface.
@@ -274,9 +275,10 @@ public interface Encoder
      * @param bytes the bytes array to be encoded.
      */
     void writeBin128(byte [] bytes);
-    
-    /**
-     * Encodes the AMQP magic number.
-     */
-    void writeMagicNumber();
-}
\ No newline at end of file
+
+    int position();
+
+    ByteBuffer underlyingBuffer();
+
+    void init();
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Thu Mar 12 15:41:46 2015
@@ -20,28 +20,29 @@
  */
 package org.apache.qpid.transport.network;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.NetworkEventReceiver;
 import org.apache.qpid.transport.ProtocolError;
 import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventReceiver;
 import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Struct;
 import org.apache.qpid.transport.codec.BBDecoder;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Assembler
  *
  */
-public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
+public class Assembler implements NetworkEventReceiver, NetworkDelegate
 {
     // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge
     // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1.
@@ -49,7 +50,7 @@ public class Assembler implements Receiv
     private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
     private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>();
 
-    private final Receiver<ProtocolEvent> receiver;
+    private final ProtocolEventReceiver receiver;
     private final Map<Integer,List<Frame>> segments;
     private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>()
     {
@@ -59,7 +60,7 @@ public class Assembler implements Receiv
         }
     };
 
-    public Assembler(Receiver<ProtocolEvent> receiver)
+    public Assembler(ProtocolEventReceiver receiver)
     {
         this.receiver = receiver;
         segments = new HashMap<Integer,List<Frame>>();

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java Thu Mar 12 15:41:46 2015
@@ -20,15 +20,13 @@
  */
 package org.apache.qpid.transport.network;
 
-import java.nio.ByteBuffer;
-
 import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.ConnectionListener;
 import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
 import org.apache.qpid.transport.network.security.sasl.SASLSender;
 
@@ -38,10 +36,10 @@ import org.apache.qpid.transport.network
  */
 
 public abstract class ConnectionBinding
-    implements Binding<Connection,ByteBuffer>
+    implements Binding<Connection>
 {
 
-    public static Binding<Connection,ByteBuffer> get(final Connection connection)
+    public static Binding<Connection> get(final Connection connection)
     {
         return new ConnectionBinding()
         {
@@ -52,7 +50,7 @@ public abstract class ConnectionBinding
         };
     }
 
-    public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate)
+    public static Binding<Connection> get(final ConnectionDelegate delegate)
     {
         return new ConnectionBinding()
         {
@@ -69,7 +67,7 @@ public abstract class ConnectionBinding
 
     public abstract Connection connection();
 
-    public Connection endpoint(Sender<ByteBuffer> sender)
+    public Connection endpoint(ByteBufferSender sender)
     {
         Connection conn = connection();
 
@@ -87,7 +85,7 @@ public abstract class ConnectionBinding
         return conn;
     }
 
-    public Receiver<ByteBuffer> receiver(Connection conn)
+    public ByteBufferReceiver receiver(Connection conn)
     {
         final InputHandler inputHandler = new InputHandler(new Assembler(conn));
         conn.addFrameSizeObserver(inputHandler);

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Thu Mar 12 15:41:46 2015
@@ -30,27 +30,29 @@ import static org.apache.qpid.transport.
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.FrameSizeObserver;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolDelegate;
 import org.apache.qpid.transport.ProtocolError;
 import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventSender;
 import org.apache.qpid.transport.ProtocolHeader;
 import org.apache.qpid.transport.SegmentType;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.Struct;
 import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Encoder;
 
 /**
  * Disassembler
  */
-public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver
+public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
 {
-    private final Sender<ByteBuffer> sender;
+    private final ByteBufferSender sender;
     private int maxPayload;
     private final Object sendlock = new Object();
-    private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
+    private final static ThreadLocal<Encoder> _encoder = new ThreadLocal<Encoder>()
     {
         public BBEncoder initialValue()
         {
@@ -58,7 +60,7 @@ public final class Disassembler implemen
         }
     };
 
-    public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
+    public Disassembler(ByteBufferSender sender, int maxFrame)
     {
         this.sender = sender;
         if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
@@ -174,7 +176,7 @@ public final class Disassembler implemen
 
     private void method(Method method, SegmentType type)
     {
-        BBEncoder enc = _encoder.get();
+        Encoder enc = _encoder.get();
         enc.init();
         enc.writeUint16(method.getEncodedType());
         if (type == SegmentType.COMMAND)
@@ -251,11 +253,6 @@ public final class Disassembler implemen
         throw new IllegalArgumentException(String.valueOf(error));
     }
 
-    public void setIdleTimeout(int i)
-    {
-        sender.setIdleTimeout(i);
-    }
-
     @Override
     public void setMaxFrameSize(final int maxFrame)
     {

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java Thu Mar 12 15:41:46 2015
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.transport.network;
 
+import java.util.Set;
+
 import javax.net.ssl.SSLContext;
 
 import org.apache.qpid.protocol.ProtocolEngineFactory;
@@ -29,7 +31,8 @@ public interface IncomingNetworkTranspor
 {
     public void accept(NetworkTransportConfiguration config,
                        ProtocolEngineFactory factory,
-                       SSLContext sslContext);
+                       SSLContext sslContext,
+                       final Set<TransportEncryption> encryptionSet);
 
     public int getAcceptingPort();
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java Thu Mar 12 15:41:46 2015
@@ -29,11 +29,12 @@ import static org.apache.qpid.transport.
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.Constant;
 import org.apache.qpid.transport.FrameSizeObserver;
+import org.apache.qpid.transport.NetworkEventReceiver;
 import org.apache.qpid.transport.ProtocolError;
 import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.SegmentType;
 
 
@@ -43,7 +44,7 @@ import org.apache.qpid.transport.Segment
  * @author Rafael H. Schloming
  */
 
-public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
+public class InputHandler implements ByteBufferReceiver, FrameSizeObserver
 {
 
     private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
@@ -56,7 +57,7 @@ public class InputHandler implements Rec
         ERROR
     }
 
-    private final Receiver<NetworkEvent> receiver;
+    private final NetworkEventReceiver receiver;
     private State state;
     private ByteBuffer input = null;
     private int needed;
@@ -66,7 +67,7 @@ public class InputHandler implements Rec
     private byte track;
     private int channel;
 
-    public InputHandler(Receiver<NetworkEvent> receiver, State state)
+    public InputHandler(NetworkEventReceiver receiver, State state)
     {
         this.receiver = receiver;
         this.state = state;
@@ -82,7 +83,7 @@ public class InputHandler implements Rec
         }
     }
 
-    public InputHandler(Receiver<NetworkEvent> receiver)
+    public InputHandler(NetworkEventReceiver receiver)
     {
         this(receiver, PROTO_HDR);
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java Thu Mar 12 15:41:46 2015
@@ -21,13 +21,13 @@
 package org.apache.qpid.transport.network;
 
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.security.Principal;
-import org.apache.qpid.transport.Sender;
+
+import org.apache.qpid.transport.ByteBufferSender;
 
 public interface NetworkConnection
 {
-    Sender<ByteBuffer> getSender();
+    ByteBufferSender getSender();
 
     void start();
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java Thu Mar 12 15:41:46 2015
@@ -20,16 +20,14 @@
  */
 package org.apache.qpid.transport.network;
 
+import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-
-import java.nio.ByteBuffer;
 
 public interface OutgoingNetworkTransport extends NetworkTransport
 {
     public NetworkConnection getConnection();
 
     public NetworkConnection connect(ConnectionSettings settings,
-                                     Receiver<ByteBuffer> delegate,
+                                     ByteBufferReceiver delegate,
                                      TransportActivity transportActivity);
-}
\ No newline at end of file
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java Thu Mar 12 15:41:46 2015
@@ -25,7 +25,7 @@ import org.apache.qpid.transport.network
 import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.network.TransportActivity;
 
-class IdleTimeoutTicker implements Ticker
+public class IdleTimeoutTicker implements Ticker
 {
     private final TransportActivity _transport;
     private final int _defaultTimeout;

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java Thu Mar 12 15:41:46 2015
@@ -22,7 +22,6 @@ package org.apache.qpid.transport.networ
 
 import java.net.Socket;
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.security.Principal;
 
 import javax.net.ssl.SSLPeerUnverifiedException;
@@ -31,8 +30,8 @@ import javax.net.ssl.SSLSocket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.Ticker;
 
@@ -49,7 +48,7 @@ public class IoNetworkConnection impleme
     private boolean _principalChecked;
     private final Object _lock = new Object();
 
-    public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
+    public IoNetworkConnection(Socket socket, ByteBufferReceiver delegate,
             int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
     {
         _socket = socket;
@@ -70,7 +69,7 @@ public class IoNetworkConnection impleme
         _ioReceiver.initiate();
     }
 
-    public Sender<ByteBuffer> getSender()
+    public ByteBufferSender getSender()
     {
         return _ioSender;
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Thu Mar 12 15:41:46 2015
@@ -20,313 +20,24 @@
  */
 package org.apache.qpid.transport.network.io;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
+import org.apache.qpid.transport.ByteBufferReceiver;
 
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.TransportActivity;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-
-public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+public class IoNetworkTransport extends AbstractNetworkTransport
 {
-    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
-    private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
-                                                              CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
-    private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
-                                                                  CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
-
-
-    private Socket _socket;
-    private IoNetworkConnection _connection;
-    private AcceptingThread _acceptor;
-
-    public NetworkConnection connect(ConnectionSettings settings,
-                                     Receiver<ByteBuffer> delegate,
-                                     TransportActivity transportActivity)
-    {
-        int sendBufferSize = settings.getWriteBufferSize();
-        int receiveBufferSize = settings.getReadBufferSize();
-
-        try
-        {
-            _socket = new Socket();
-            _socket.setReuseAddress(true);
-            _socket.setTcpNoDelay(settings.isTcpNodelay());
-            _socket.setSendBufferSize(sendBufferSize);
-            _socket.setReceiveBufferSize(receiveBufferSize);
 
-            if(LOGGER.isDebugEnabled())
-            {
-                LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
-                LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
-                LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
-            }
-
-            InetAddress address = InetAddress.getByName(settings.getHost());
-
-            _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
-        }
-        catch (SocketException e)
-        {
-            throw new TransportException("Error connecting to broker", e);
-        }
-        catch (IOException e)
-        {
-            throw new TransportException("Error connecting to broker", e);
-        }
-
-        try
-        {
-            IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
-            _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
-            ticker.setConnection(_connection);
-            _connection.start();
-        }
-        catch(Exception e)
-        {
-            try
-            {
-                _socket.close();
-            }
-            catch(IOException ioe)
-            {
-                //ignored, throw based on original exception
-            }
-
-            throw new TransportException("Error creating network connection", e);
-        }
-
-        return _connection;
-    }
-
-    public void close()
-    {
-        if(_connection != null)
-        {
-            _connection.close();
-        }
-        if(_acceptor != null)
-        {
-            _acceptor.close();
-        }
-    }
-
-    public NetworkConnection getConnection()
-    {
-        return _connection;
-    }
-
-    public void accept(NetworkTransportConfiguration config,
-                       ProtocolEngineFactory factory,
-                       SSLContext sslContext)
-    {
-        try
-        {
-            _acceptor = new AcceptingThread(config, factory, sslContext);
-            _acceptor.setName(String.format("IoNetworkAcceptor - %s", config.getAddress()));
-            _acceptor.setDaemon(false);
-            _acceptor.start();
-        }
-        catch (IOException e)
-        {
-            throw new TransportException("Failed to start AMQP on port : " + config, e);
-        }
-    }
-
-    public int getAcceptingPort()
-    {
-        return _acceptor == null ? -1 : _acceptor.getPort();
-    }
 
-    private class AcceptingThread extends Thread
+    @Override
+    protected IoNetworkConnection createNetworkConnection(final Socket socket,
+                                                       final ByteBufferReceiver engine,
+                                                       final Integer sendBufferSize,
+                                                       final Integer receiveBufferSize,
+                                                       final int timeout,
+                                                       final IdleTimeoutTicker ticker)
     {
-        private volatile boolean _closed = false;
-        private NetworkTransportConfiguration _config;
-        private ProtocolEngineFactory _factory;
-        private SSLContext _sslContext;
-        private ServerSocket _serverSocket;
-        private int _timeout;
-
-        private AcceptingThread(NetworkTransportConfiguration config,
-                                ProtocolEngineFactory factory,
-                                SSLContext sslContext) throws IOException
-        {
-            _config = config;
-            _factory = factory;
-            _sslContext = sslContext;
-            _timeout = TIMEOUT;
-
-            InetSocketAddress address = config.getAddress();
-
-            if(sslContext == null)
-            {
-                _serverSocket = new ServerSocket();
-            }
-            else
-            {
-                SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
-                _serverSocket = socketFactory.createServerSocket();
-
-                SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
-
-                SSLUtil.removeSSLv3Support(sslServerSocket);
-                SSLUtil.updateEnabledCipherSuites(sslServerSocket, config.getEnabledCipherSuites(), config.getDisabledCipherSuites());
-
-                if(config.needClientAuth())
-                {
-                    sslServerSocket.setNeedClientAuth(true);
-                }
-                else if(config.wantClientAuth())
-                {
-                    sslServerSocket.setWantClientAuth(true);
-                }
-
-            }
-
-            _serverSocket.setReuseAddress(true);
-            _serverSocket.bind(address);
-        }
-
-
-        /**
-            Close the underlying ServerSocket if it has not already been closed.
-         */
-        public void close()
-        {
-            LOGGER.debug("Shutting down the Acceptor");
-            _closed = true;
-
-            if (!_serverSocket.isClosed())
-            {
-                try
-                {
-                    _serverSocket.close();
-                }
-                catch (IOException e)
-                {
-                    throw new TransportException(e);
-                }
-            }
-        }
-
-        private int getPort()
-        {
-            return _serverSocket.getLocalPort();
-        }
-
-        @Override
-        public void run()
-        {
-            try
-            {
-                while (!_closed)
-                {
-                    Socket socket = null;
-                    try
-                    {
-                        socket = _serverSocket.accept();
-
-                        ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
-
-                        if(engine != null)
-                        {
-                            socket.setTcpNoDelay(_config.getTcpNoDelay());
-                            socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
-
-                            final Integer sendBufferSize = _config.getSendBufferSize();
-                            final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
-                            socket.setSendBufferSize(sendBufferSize);
-                            socket.setReceiveBufferSize(receiveBufferSize);
-
-
-                            final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
-                            NetworkConnection connection =
-                                    new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
-                                                            ticker);
-
-                            connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
-
-                            ticker.setConnection(connection);
-
-                            engine.setNetworkConnection(connection, connection.getSender());
-
-                            connection.start();
-                        }
-                        else
-                        {
-                            socket.close();
-                        }
-                    }
-                    catch(RuntimeException e)
-                    {
-                        LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
-                        closeSocketIfNecessary(socket);
-                    }
-                    catch(IOException e)
-                    {
-                        if(!_closed)
-                        {
-                            LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
-                            closeSocketIfNecessary(socket);
-                            try
-                            {
-                                //Delay to avoid tight spinning the loop during issues such as too many open files
-                                Thread.sleep(1000);
-                            }
-                            catch (InterruptedException ie)
-                            {
-                                LOGGER.debug("Stopping acceptor due to interrupt request");
-                                _closed = true;
-                            }
-                        }
-                    }
-                }
-            }
-            finally
-            {
-                if(LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress());
-                }
-            }
-        }
-
-        private void closeSocketIfNecessary(final Socket socket)
-        {
-            if(socket != null)
-            {
-                try
-                {
-                    socket.close();
-                }
-                catch (IOException e)
-                {
-                    LOGGER.debug("Exception while closing socket", e);
-                }
-            }
-        }
-
+        return new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout,
+                                ticker);
     }
 
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Thu Mar 12 15:41:46 2015
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.Atomi
 import javax.net.ssl.SSLSocket;
 
 import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.util.Logger;
@@ -47,7 +47,7 @@ final class IoReceiver implements Runnab
 
     private static final Logger log = Logger.get(IoReceiver.class);
 
-    private final Receiver<ByteBuffer> receiver;
+    private final ByteBufferReceiver receiver;
     private final int bufferSize;
     private final Socket socket;
     private final long timeout;
@@ -61,7 +61,7 @@ final class IoReceiver implements Runnab
         shutdownBroken = SystemUtils.isWindows();
     }
 
-    public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
+    public IoReceiver(Socket socket, ByteBufferReceiver receiver, int bufferSize, long timeout)
     {
         this.receiver = receiver;
         this.bufferSize = bufferSize;
@@ -78,7 +78,7 @@ final class IoReceiver implements Runnab
             throw new RuntimeException("Error creating IOReceiver thread",e);
         }
         receiverThread.setDaemon(true);
-        receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+        receiverThread.setName(String.format("IoReceiver-%s", socket.getRemoteSocketAddress()));
     }
 
     public void initiate()

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Thu Mar 12 15:41:46 2015
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.Atomi
 import javax.net.ssl.SSLSocket;
 
 import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.SenderClosedException;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.TransportException;
@@ -37,7 +37,7 @@ import org.apache.qpid.transport.util.Lo
 import org.apache.qpid.util.SystemUtils;
 
 
-public final class IoSender implements Runnable, Sender<ByteBuffer>
+public final class IoSender implements Runnable, ByteBufferSender
 {
 
     private static final Logger log = Logger.get(IoSender.class);
@@ -97,7 +97,7 @@ public final class IoSender implements R
         }
 
         senderThread.setDaemon(true);
-        senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
+        senderThread.setName(String.format("IoSender-%s", _remoteSocketAddress));
     }
 
     public void initiate()
@@ -337,18 +337,6 @@ public final class IoSender implements R
         }
     }
 
-    public void setIdleTimeout(int i)
-    {
-        try
-        {
-            socket.setSoTimeout(i);
-        }
-        catch (Exception e)
-        {
-            throw new SenderException(e);
-        }
-    }
-
     public void setReceiver(IoReceiver receiver)
     {
         _receiver = receiver;

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java Thu Mar 12 15:41:46 2015
@@ -20,16 +20,14 @@
  */
 package org.apache.qpid.transport.network.security;
 
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
 
 public interface SecurityLayer
 {
 
-    public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate);
-    public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate);
+    public ByteBufferSender sender(ByteBufferSender delegate);
+    public ByteBufferReceiver receiver(ByteBufferReceiver delegate);
     public String getUserID();
 
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java Thu Mar 12 15:41:46 2015
@@ -20,15 +20,13 @@
  */
 package org.apache.qpid.transport.network.security;
 
-import java.nio.ByteBuffer;
-
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
 import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
 import org.apache.qpid.transport.network.security.sasl.SASLSender;
@@ -110,14 +108,14 @@ public class SecurityLayerFactory
 
         }
 
-        public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+        public ByteBufferSender sender(ByteBufferSender delegate)
         {
             SSLSender sender = new SSLSender(_engine, _layer.sender(delegate), _sslStatus);
             sender.setHostname(_hostname);
             return sender;
         }
 
-        public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+        public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
         {
             SSLReceiver receiver = new SSLReceiver(_engine, _layer.receiver(delegate), _sslStatus);
             receiver.setHostname(_hostname);
@@ -141,13 +139,13 @@ public class SecurityLayerFactory
             _layer = layer;
         }
 
-        public SASLSender sender(Sender<ByteBuffer> delegate)
+        public SASLSender sender(ByteBufferSender delegate)
         {
             SASLSender sender = new SASLSender(_layer.sender(delegate));
             return sender;
         }
 
-        public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
+        public SASLReceiver receiver(ByteBufferReceiver delegate)
         {
             SASLReceiver receiver = new SASLReceiver(_layer.receiver(delegate));
             return receiver;
@@ -169,12 +167,12 @@ public class SecurityLayerFactory
         {
         }
 
-        public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+        public ByteBufferSender sender(ByteBufferSender delegate)
         {
             return delegate;
         }
 
-        public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+        public ByteBufferReceiver receiver(ByteBufferReceiver delegate)
         {
             return delegate;
         }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java Thu Mar 12 15:41:46 2015
@@ -21,20 +21,21 @@
 package org.apache.qpid.transport.network.security.sasl;
 
 
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.util.Logger;
+import java.nio.ByteBuffer;
 
 import javax.security.sasl.SaslException;
-import java.nio.ByteBuffer;
 
-public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> {
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SASLReceiver extends SASLEncryptor implements ByteBufferReceiver {
 
-    private Receiver<ByteBuffer> delegate;
+    private ByteBufferReceiver delegate;
     private byte[] netData;
     private static final Logger log = Logger.get(SASLReceiver.class);
     
-    public SASLReceiver(Receiver<ByteBuffer> delegate)
+    public SASLReceiver(ByteBufferReceiver delegate)
     {
         this.delegate = delegate;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org