You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/01/31 21:07:38 UTC

svn commit: r1656248 [1/2] - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker-plugins/amqp-0-8-...

Author: rgodfrey
Date: Sat Jan 31 20:07:36 2015
New Revision: 1656248

URL: http://svn.apache.org/r1656248
Log:
Separate Byte and ProtocolEvent sender/receivers, add server specific 0-10 encoder

Added:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
      - copied, changed from r1655432, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
      - copied, changed from r1655432, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java   (with props)
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java   (with props)
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java   (with props)
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java   (with props)
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java   (with props)
Removed:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Sat Jan 31 20:07:36 2015
@@ -37,7 +37,7 @@ import org.apache.qpid.server.model.Prot
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 public class MultiVersionProtocolEngine implements ServerProtocolEngine
@@ -54,7 +54,7 @@ public class MultiVersionProtocolEngine
     private String _fqdn;
     private final Broker<?> _broker;
     private NetworkConnection _network;
-    private Sender<ByteBuffer> _sender;
+    private ByteBufferSender _sender;
     private final Protocol _defaultSupportedReply;
 
     private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
@@ -171,7 +171,7 @@ public class MultiVersionProtocolEngine
 
     private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
 
-    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
     {
         _network = network;
         SocketAddress address = _network.getLocalAddress();
@@ -253,7 +253,7 @@ public class MultiVersionProtocolEngine
 
         }
 
-        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+        public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
         {
 
         }
@@ -494,7 +494,7 @@ public class MultiVersionProtocolEngine
             }
         }
 
-        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+        public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
         {
 
         }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Sat Jan 31 20:07:36 2015
@@ -32,10 +32,9 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Port;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
 import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.NetworkConnection;
 
@@ -68,7 +67,7 @@ public class ProtocolEngine_0_10  extend
         }
     }
 
-    public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+    public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender)
     {
         if(!getSubject().equals(Subject.getSubject(AccessController.getContext())))
         {
@@ -88,7 +87,7 @@ public class ProtocolEngine_0_10  extend
             _network = network;
 
             _connection.setNetworkConnection(network);
-            Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE);
+            ServerDisassembler disassembler = new ServerDisassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE);
             _connection.setSender(disassembler);
             _connection.addFrameSizeObserver(disassembler);
             // FIXME Two log messages to maintain compatibility with earlier protocol versions
@@ -97,19 +96,15 @@ public class ProtocolEngine_0_10  extend
         }
     }
 
-    private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
+    private ByteBufferSender wrapSender(final ByteBufferSender sender)
     {
-        return new Sender<ByteBuffer>()
+        return new ByteBufferSender()
         {
             @Override
             public void send(ByteBuffer msg)
             {
                 _lastWriteTime = System.currentTimeMillis();
-                ByteBuffer copy = ByteBuffer.wrap(new byte[msg.remaining()]);
-                copy.put(msg);
-                copy.flip();
-                sender.send(copy);
-
+                sender.send(msg);
             }
 
             @Override

Copied: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java (from r1655432, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java&r1=1655432&r2=1656248&rev=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java Sat Jan 31 20:07:36 2015
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.transport.network;
+package org.apache.qpid.server.protocol.v0_10;
 
 import static java.lang.Math.min;
 import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
@@ -28,97 +28,83 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.network.Frame.LAST_SEG;
 
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.FrameSizeObserver;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolDelegate;
 import org.apache.qpid.transport.ProtocolError;
 import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventSender;
 import org.apache.qpid.transport.ProtocolHeader;
 import org.apache.qpid.transport.SegmentType;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Encoder;
+import org.apache.qpid.transport.network.Frame;
 
 /**
  * Disassembler
  */
-public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver
+public final class ServerDisassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
 {
-    private final Sender<ByteBuffer> sender;
-    private int maxPayload;
-    private final Object sendlock = new Object();
-    private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
-    {
-        public BBEncoder initialValue()
-        {
-            return new BBEncoder(4*1024);
-        }
-    };
+    private final ByteBufferSender _sender;
+    private int _maxPayload;
+    private final Object _sendLock = new Object();
+    private final Encoder _encoder =  new ServerEncoder();
 
-    public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
+    public ServerDisassembler(ByteBufferSender sender, int maxFrame)
     {
-        this.sender = sender;
-        if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+        _sender = sender;
+        if (maxFrame <= HEADER_SIZE || maxFrame >= 64 * 1024)
         {
             throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
         }
-        this.maxPayload  = maxFrame - HEADER_SIZE;
+        _maxPayload = maxFrame - HEADER_SIZE;
     }
 
     public void send(ProtocolEvent event)
     {
-        event.delegate(null, this);
+        synchronized (_sendLock)
+        {
+            event.delegate(null, this);
+        }
     }
 
     public void flush()
     {
-        synchronized (sendlock)
+        synchronized (_sendLock)
         {
-            sender.flush();
+            _sender.flush();
         }
     }
 
     public void close()
     {
-        synchronized (sendlock)
+        synchronized (_sendLock)
         {
-            sender.close();
+            _sender.close();
         }
     }
 
-    private final ByteBuffer _frameHeader = ByteBuffer.allocate(HEADER_SIZE);
-
-    {
-        _frameHeader.order(ByteOrder.BIG_ENDIAN);
-    }
-
     private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
     {
-        synchronized (sendlock)
-        {
-            ByteBuffer data = _frameHeader;
-            _frameHeader.rewind();
+        ByteBuffer data = ByteBuffer.wrap(new byte[HEADER_SIZE]);
+
+        data.put(0, flags);
+        data.put(1, type);
+        data.putShort(2, (short) (size + HEADER_SIZE));
+        data.put(5, track);
+        data.putShort(6, (short) channel);
+
+
+        ByteBuffer dup = buf.duplicate();
+        dup.limit(dup.position() + size);
+        buf.position(buf.position() + size);
+        _sender.send(data);
+        _sender.send(dup);
 
-            
-            data.put(0, flags);
-            data.put(1, type);
-            data.putShort(2, (short) (size + HEADER_SIZE));
-            data.put(5, track);
-            data.putShort(6, (short) channel);
-
-
-            int limit = buf.limit();
-            buf.limit(buf.position() + size);
-
-            data.rewind();
-            sender.send(data);
-            sender.send(buf);
-            buf.limit(limit);
 
-        }
     }
 
     private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf)
@@ -130,7 +116,7 @@ public final class Disassembler implemen
         boolean first = true;
         while (true)
         {
-            int size = min(maxPayload, remaining);
+            int size = min(_maxPayload, remaining);
             remaining -= size;
 
             byte newflags = flags;
@@ -155,12 +141,9 @@ public final class Disassembler implemen
 
     public void init(Void v, ProtocolHeader header)
     {
-        synchronized (sendlock)
-        {
-            sender.send(header.toByteBuffer());
-            sender.flush();
-        }
-    }
+        _sender.send(header.toByteBuffer());
+        _sender.flush();
+}
 
     public void control(Void v, Method method)
     {
@@ -174,7 +157,7 @@ public final class Disassembler implemen
 
     private void method(Method method, SegmentType type)
     {
-        BBEncoder enc = _encoder.get();
+        Encoder enc = _encoder;
         enc.init();
         enc.writeUint16(method.getEncodedType());
         if (type == SegmentType.COMMAND)
@@ -205,15 +188,15 @@ public final class Disassembler implemen
             final Header hdr = method.getHeader();
             if (hdr != null)
             {
-                if(hdr.getDeliveryProperties() != null)
+                if (hdr.getDeliveryProperties() != null)
                 {
                     enc.writeStruct32(hdr.getDeliveryProperties());
                 }
-                if(hdr.getMessageProperties() != null)
+                if (hdr.getMessageProperties() != null)
                 {
                     enc.writeStruct32(hdr.getMessageProperties());
                 }
-                if(hdr.getNonStandardProperties() != null)
+                if (hdr.getNonStandardProperties() != null)
                 {
                     for (Struct st : hdr.getNonStandardProperties())
                     {
@@ -221,25 +204,26 @@ public final class Disassembler implemen
                     }
                 }
             }
+
             headerLimit = enc.position();
         }
-
-        synchronized (sendlock)
+        synchronized (_sendLock)
         {
             ByteBuffer buf = enc.underlyingBuffer();
             buf.position(0);
             buf.limit(methodLimit);
 
-            fragment(flags, type, method, buf);
+            fragment(flags, type, method, buf.duplicate());
             if (payload)
             {
                 ByteBuffer body = method.getBody();
                 buf.limit(headerLimit);
                 buf.position(methodLimit);
-                fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf);
+
+                fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf.duplicate());
                 if (body != null)
                 {
-                    fragment(LAST_SEG, SegmentType.BODY, method, body);
+                    fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate());
                 }
 
             }
@@ -258,7 +242,7 @@ public final class Disassembler implemen
         {
             throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
         }
-        this.maxPayload  = maxFrame - HEADER_SIZE;
+        this._maxPayload = maxFrame - HEADER_SIZE;
 
     }
 }

Copied: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java (from r1655432, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java&r1=1655432&r2=1656248&rev=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java Sat Jan 31 20:07:36 2015
@@ -18,88 +18,85 @@
  * under the License.
  *
  */
-package org.apache.qpid.transport.codec;
+package org.apache.qpid.server.protocol.v0_10;
 
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.util.UUID;
 
+import org.apache.qpid.transport.codec.AbstractEncoder;
 
-/**
- * Byte Buffer Encoder.
- * Encoder concrete implementor using a backing byte buffer for encoding data.
- * 
- * @author Rafael H. Schloming
- */
-public final class BBEncoder extends AbstractEncoder
+
+public final class ServerEncoder extends AbstractEncoder
 {
-    private ByteBuffer out;
-    private int segment;
+    public static final int DEFAULT_CAPACITY = 4096;
+    private ByteBuffer _out;
+    private int _segment;
+    private int _initialCapacity;
 
-    public BBEncoder(int capacity) {
-        out = ByteBuffer.allocate(capacity);
-        out.order(ByteOrder.BIG_ENDIAN);
-        segment = 0;
+    public ServerEncoder()
+    {
+        this(DEFAULT_CAPACITY);
     }
 
-    public void init()
+    public ServerEncoder(int capacity)
     {
-        out.clear();
-        segment = 0;
+        _initialCapacity = capacity;
+        _out = ByteBuffer.allocate(capacity);
+        _segment = 0;
     }
 
-    public ByteBuffer segment()
+    public void init()
     {
-        int pos = out.position();
-        out.position(segment);
-        ByteBuffer slice = out.slice();
-        slice.limit(pos - segment);
-        out.position(pos);
-        segment = pos;
-        return slice;
+        _out.position(_out.limit());
+        _out.limit(_out.capacity());
+        _out = _out.slice();
+        if(_out.remaining() < 256)
+        {
+            _out = ByteBuffer.allocate(_initialCapacity);
+        }
+        _segment = 0;
     }
 
     public ByteBuffer buffer()
     {
-        int pos = out.position();
-        out.position(segment);
-        ByteBuffer slice = out.slice();
-        slice.limit(pos - segment);
-        out.position(pos);
+        int pos = _out.position();
+        _out.position(_segment);
+        ByteBuffer slice = _out.slice();
+        slice.limit(pos - _segment);
+        _out.position(pos);
         return slice;
     }
 
     public int position()
     {
-        return out.position();
+        return _out.position();
     }
 
     public ByteBuffer underlyingBuffer()
     {
-        return out;
+        return _out;
     }
 
     private void grow(int size)
     {
-        ByteBuffer old = out;
+        ByteBuffer old = _out;
         int capacity = old.capacity();
-        out = ByteBuffer.allocate(Math.max(capacity + size, 2*capacity));
-        out.order(ByteOrder.BIG_ENDIAN);
+        _out = ByteBuffer.allocate(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity));
         old.flip();
-        out.put(old);
+        _out.put(old);
     }
 
     protected void doPut(byte b)
     {
         try
         {
-            out.put(b);
+            _out.put(b);
         }
         catch (BufferOverflowException e)
         {
             grow(1);
-            out.put(b);
+            _out.put(b);
         }
     }
 
@@ -107,12 +104,12 @@ public final class BBEncoder extends Abs
     {
         try
         {
-            out.put(src);
+            _out.put(src);
         }
         catch (BufferOverflowException e)
         {
             grow(src.remaining());
-            out.put(src);
+            _out.put(src);
         }
     }
 
@@ -120,12 +117,12 @@ public final class BBEncoder extends Abs
     {
         try
         {
-            out.put(bytes);
+            _out.put(bytes);
         }
         catch (BufferOverflowException e)
         {
             grow(bytes.length);
-            out.put(bytes);
+            _out.put(bytes);
         }
     }
 
@@ -135,12 +132,12 @@ public final class BBEncoder extends Abs
 
         try
         {
-            out.put((byte) b);
+            _out.put((byte) b);
         }
         catch (BufferOverflowException e)
         {
             grow(1);
-            out.put((byte) b);
+            _out.put((byte) b);
         }
     }
 
@@ -150,12 +147,12 @@ public final class BBEncoder extends Abs
 
         try
         {
-            out.putShort((short) s);
+            _out.putShort((short) s);
         }
         catch (BufferOverflowException e)
         {
             grow(2);
-            out.putShort((short) s);
+            _out.putShort((short) s);
         }
     }
 
@@ -165,12 +162,12 @@ public final class BBEncoder extends Abs
 
         try
         {
-            out.putInt((int) i);
+            _out.putInt((int) i);
         }
         catch (BufferOverflowException e)
         {
             grow(4);
-            out.putInt((int) i);
+            _out.putInt((int) i);
         }
     }
 
@@ -178,87 +175,90 @@ public final class BBEncoder extends Abs
     {
         try
         {
-            out.putLong(l);
+            _out.putLong(l);
         }
         catch (BufferOverflowException e)
         {
             grow(8);
-            out.putLong(l);
+            _out.putLong(l);
         }
     }
 
     public int beginSize8()
     {
-        int pos = out.position();
+        int pos = _out.position();
         try
         {
-            out.put((byte) 0);
+            _out.put((byte) 0);
         }
         catch (BufferOverflowException e)
         {
             grow(1);
-            out.put((byte) 0);
+            _out.put((byte) 0);
         }
         return pos;
     }
 
     public void endSize8(int pos)
     {
-        int cur = out.position();
-        out.put(pos, (byte) (cur - pos - 1));
+        int cur = _out.position();
+        _out.put(pos, (byte) (cur - pos - 1));
     }
 
     public int beginSize16()
     {
-        int pos = out.position();
+        int pos = _out.position();
         try
         {
-            out.putShort((short) 0);
+            _out.putShort((short) 0);
         }
         catch (BufferOverflowException e)
         {
             grow(2);
-            out.putShort((short) 0);
+            _out.putShort((short) 0);
         }
         return pos;
     }
 
     public void endSize16(int pos)
     {
-        int cur = out.position();
-        out.putShort(pos, (short) (cur - pos - 2));
+        int cur = _out.position();
+        _out.putShort(pos, (short) (cur - pos - 2));
     }
 
     public int beginSize32()
     {
-        int pos = out.position();
+        int pos = _out.position();
         try
         {
-            out.putInt(0);
+            _out.putInt(0);
         }
         catch (BufferOverflowException e)
         {
             grow(4);
-            out.putInt(0);
+            _out.putInt(0);
         }
         return pos;
+
     }
 
     public void endSize32(int pos)
     {
-        int cur = out.position();
-        out.putInt(pos, (cur - pos - 4));
+        int cur = _out.position();
+        _out.putInt(pos, (cur - pos - 4));
+
     }
 
 	public void writeDouble(double aDouble)
 	{
-		try 
+		try
 		{
-			out.putDouble(aDouble);
-		} catch(BufferOverflowException exception)
+			_out.putDouble(aDouble);
+		}
+        catch(BufferOverflowException exception)
 		{
 			grow(8);
-			out.putDouble(aDouble);
+			_out.putDouble(aDouble);
 		}
 	}
 
@@ -266,11 +266,12 @@ public final class BBEncoder extends Abs
 	{
 		try 
 		{
-			out.putShort(aShort);
-		} catch(BufferOverflowException exception)
+			_out.putShort(aShort);
+		}
+        catch(BufferOverflowException exception)
 		{
 			grow(2);
-			out.putShort(aShort);
+			_out.putShort(aShort);
 		}
 	}
 
@@ -278,11 +279,12 @@ public final class BBEncoder extends Abs
 	{
 		try
 		{
-			out.putInt(anInt);
-		} catch(BufferOverflowException exception)
+			_out.putInt(anInt);
+		}
+        catch(BufferOverflowException exception)
 		{
 			grow(4);
-			out.putInt(anInt);
+			_out.putInt(anInt);
 		}
 	}
 
@@ -290,11 +292,12 @@ public final class BBEncoder extends Abs
 	{
 		try
 		{
-			out.putLong(aLong);
-		} catch(BufferOverflowException exception)
+			_out.putLong(aLong);
+		}
+        catch(BufferOverflowException exception)
 		{
 			grow(8);
-			out.putLong(aLong);
+			_out.putLong(aLong);
 		}
 	}
       
@@ -302,11 +305,12 @@ public final class BBEncoder extends Abs
 	{
 		try 
 		{
-			out.put(aByte);	
-		} catch(BufferOverflowException exception)
+			_out.put(aByte);
+		}
+        catch(BufferOverflowException exception)
 		{
 			grow(1);
-			out.put(aByte);
+			_out.put(aByte);
 		}
 	}	
 	
@@ -318,11 +322,12 @@ public final class BBEncoder extends Abs
 		
 		try 
 		{
-			out.put(byteArray);
-		} catch(BufferOverflowException exception)
+			_out.put(byteArray);
+		}
+        catch(BufferOverflowException exception)
 		{
 			grow(16);
-			out.put(byteArray);			
+			_out.put(byteArray);
 		}
 	}
 
@@ -352,16 +357,13 @@ public final class BBEncoder extends Abs
 	{
 		try 
 		{
-			out.putFloat(aFloat);
-		} catch(BufferOverflowException exception)
+			_out.putFloat(aFloat);
+		}
+        catch(BufferOverflowException exception)
 		{
 			grow(4);
-			out.putFloat(aFloat);
+			_out.putFloat(aFloat);
 		}
 	}
 
-	public void writeMagicNumber()
-	{
-		out.put("AM2".getBytes());
-	}	
-}
\ No newline at end of file
+}

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Sat Jan 31 20:07:36 2015
@@ -85,7 +85,7 @@ import org.apache.qpid.server.util.Actio
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.SenderClosedException;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.TransportException;
@@ -167,7 +167,7 @@ public class AMQProtocolEngine implement
     private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     private NetworkConnection _network;
-    private Sender<ByteBuffer> _sender;
+    private ByteBufferSender _sender;
 
     private volatile boolean _deferFlush;
     private long _lastReceivedTime = System.currentTimeMillis();  // TODO consider if this is what we want?
@@ -272,7 +272,7 @@ public class AMQProtocolEngine implement
         setNetworkConnection(network, network.getSender());
     }
 
-    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
     {
         _network = network;
         _sender = sender;

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Sat Jan 31 20:07:36 2015
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.Atomi
 import javax.security.auth.Subject;
 
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -50,7 +51,7 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
@@ -282,11 +283,11 @@ public class InternalTestProtocolSession
         private String _remoteHost = "127.0.0.1";
         private String _localHost = "127.0.0.1";
         private int _port = portNumber.incrementAndGet();
-        private final Sender<ByteBuffer> _sender;
+        private final ByteBufferSender _sender;
 
         public TestNetworkConnection()
         {
-            _sender = new Sender<ByteBuffer>()
+            _sender = new ByteBufferSender()
             {
                 public void send(ByteBuffer msg)
                 {
@@ -348,7 +349,7 @@ public class InternalTestProtocolSession
         }
 
         @Override
-        public Sender<ByteBuffer> getSender()
+        public ByteBufferSender getSender()
         {
             return _sender;
         }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java Sat Jan 31 20:07:36 2015
@@ -59,7 +59,7 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.NetworkConnection;
 
@@ -116,7 +116,7 @@ public class ProtocolEngine_1_0_0_SASL i
     private byte _revision;
     private PrintWriter _out;
     private NetworkConnection _network;
-    private Sender<ByteBuffer> _sender;
+    private ByteBufferSender _sender;
     private Connection_1_0 _connection;
     private volatile boolean _transportBlockedForWriting;
 
@@ -185,7 +185,7 @@ public class ProtocolEngine_1_0_0_SASL i
     {
     }
 
-    public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+    public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender)
     {
         _network = network;
         _sender = sender;

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Sat Jan 31 20:07:36 2015
@@ -53,7 +53,7 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.transport.AcceptingTransport;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
@@ -240,7 +240,7 @@ class WebSocketProvider implements Accep
         }
     }
 
-    private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer>
+    private class ConnectionWrapper implements NetworkConnection, ByteBufferSender
     {
         private final WebSocket.Connection _connection;
         private final SocketAddress _localAddress;
@@ -259,7 +259,7 @@ class WebSocketProvider implements Accep
         }
 
         @Override
-        public Sender<ByteBuffer> getSender()
+        public ByteBufferSender getSender()
         {
             return this;
         }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Sat Jan 31 20:07:36 2015
@@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit;
 import javax.jms.JMSException;
 import javax.jms.XASession;
 
-import org.apache.qpid.transport.Receiver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +59,7 @@ import org.apache.qpid.jms.ChannelLimitR
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.OutgoingNetworkTransport;
@@ -527,12 +527,12 @@ public class AMQConnectionDelegate_8_0 i
     }
 
 
-    private static class ReceiverClosedWaiter implements Receiver<ByteBuffer>
+    private static class ReceiverClosedWaiter implements ByteBufferReceiver
     {
         private final CountDownLatch _closedWatcher;
-        private final Receiver<ByteBuffer> _receiver;
+        private final ByteBufferReceiver _receiver;
 
-        public ReceiverClosedWaiter(Receiver<ByteBuffer> receiver)
+        public ReceiverClosedWaiter(ByteBufferReceiver receiver)
         {
             _receiver = receiver;
             _closedWatcher = new CountDownLatch(1);

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Sat Jan 31 20:07:36 2015
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.client.handler;
 
-import java.nio.ByteBuffer;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +33,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.TransportException;
 
 public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
@@ -95,7 +93,7 @@ public class ConnectionCloseMethodHandle
         }
         finally
         {
-            Sender<ByteBuffer> sender = session.getSender();
+            ByteBufferSender sender = session.getSender();
 
             if (error != null)
             {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Sat Jan 31 20:07:36 2015
@@ -66,8 +66,8 @@ import org.apache.qpid.protocol.AMQMetho
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.util.BytesDataOutput;
@@ -179,7 +179,7 @@ public class AMQProtocolHandler implemen
 
 
     private NetworkConnection _network;
-    private Sender<ByteBuffer> _sender;
+    private ByteBufferSender _sender;
     private long _lastReadTime = System.currentTimeMillis();
     private long _lastWriteTime = System.currentTimeMillis();
     private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
@@ -905,7 +905,7 @@ public class AMQProtocolHandler implemen
         setNetworkConnection(network, network.getSender());
     }
 
-    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
     {
         _network = network;
         _sender = sender;
@@ -923,7 +923,7 @@ public class AMQProtocolHandler implemen
         return _lastWriteTime;
     }
 
-    protected Sender<ByteBuffer> getSender()
+    protected ByteBufferSender getSender()
     {
         return _sender;
     }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Sat Jan 31 20:07:36 2015
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.client.protocol;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -52,8 +51,8 @@ import org.apache.qpid.framing.ProtocolI
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
 
 /**
@@ -382,7 +381,7 @@ public class AMQProtocolSession implemen
         }
     }
 
-    public Sender<ByteBuffer> getSender()
+    public ByteBufferSender getSender()
     {
         return _protocolHandler.getSender();
     }
@@ -476,7 +475,7 @@ public class AMQProtocolSession implemen
         _protocolHandler.propagateExceptionToAllWaiters(error);
     }
 
-    public void setSender(Sender<java.nio.ByteBuffer> sender)
+    public void setSender(ByteBufferSender sender)
     {
         // No-op, interface munging
     }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Sat Jan 31 20:07:36 2015
@@ -666,7 +666,7 @@ public class AMQSession_0_10Test extends
         }
     }
 
-    class MockSender implements Sender<ProtocolEvent>
+    class MockSender implements ProtocolEventSender
     {
         private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>();
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java Sat Jan 31 20:07:36 2015
@@ -22,9 +22,9 @@ package org.apache.qpid.client.transport
 
 import java.nio.ByteBuffer;
 
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
 
-public class MockSender implements Sender<ByteBuffer>
+public class MockSender implements ByteBufferSender
 {
 
     public void send(ByteBuffer msg)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java Sat Jan 31 20:07:36 2015
@@ -20,18 +20,18 @@
  */
 package org.apache.qpid.client.transport;
 
-import java.security.Principal;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
-
 import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.security.Principal;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 /**
  * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
@@ -147,7 +147,7 @@ public class TestNetworkConnection imple
         _remoteAddress = address;
     }
 
-    public Sender<ByteBuffer> getSender()
+    public ByteBufferSender getSender()
     {
         return _sender;
     }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java Sat Jan 31 20:07:36 2015
@@ -26,9 +26,7 @@ import org.apache.qpid.framing.ContentBo
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.HeartbeatBody;
 import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.transport.Sender;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.ByteBufferSender;
 
 
 /**
@@ -56,6 +54,6 @@ public interface AMQVersionAwareProtocol
     public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
 
 
-    public void setSender(Sender<ByteBuffer> sender);
+    public void setSender(ByteBufferSender sender);
 
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Sat Jan 31 20:07:36 2015
@@ -21,10 +21,9 @@
 package org.apache.qpid.protocol;
 
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.TransportActivity;
 
@@ -32,7 +31,7 @@ import org.apache.qpid.transport.network
  * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
  * decodes it and then process the result.
  */
-public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity
+public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity
 {
    // Returns the remote address of the NetworkDriver
    SocketAddress getRemoteAddress();
@@ -58,6 +57,6 @@ public interface ProtocolEngine extends
 
    void encryptedTransport();
 
-   public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+   public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender);
 
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java Sat Jan 31 20:07:36 2015
@@ -26,11 +26,11 @@ package org.apache.qpid.transport;
  *
  */
 
-public interface Binding<E,T>
+public interface Binding<E>
 {
 
-    E endpoint(Sender<T> sender);
+    E endpoint(ByteBufferSender sender);
 
-    Receiver<T> receiver(E endpoint);
+    ByteBufferReceiver receiver(E endpoint);
 
 }

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java?rev=1656248&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java Sat Jan 31 20:07:36 2015
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferReceiver
+{
+    void received(ByteBuffer msg);
+
+    void exception(Throwable t);
+
+    void closed();
+}

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java?rev=1656248&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java Sat Jan 31 20:07:36 2015
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferSender
+{
+    void send(ByteBuffer msg);
+
+    void flush();
+
+    void close();
+}

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Sat Jan 31 20:07:36 2015
@@ -27,7 +27,6 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.Connection.State.OPENING;
 
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -68,7 +67,7 @@ import org.apache.qpid.util.Strings;
  */
 
 public class Connection extends ConnectionInvoker
-    implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
+    implements ProtocolEventReceiver, ProtocolEventSender
 {
 
     protected static final Logger log = Logger.get(Connection.class);
@@ -113,7 +112,7 @@ public class Connection extends Connecti
     private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY;
 
     private ConnectionDelegate delegate;
-    private Sender<ProtocolEvent> sender;
+    private ProtocolEventSender sender;
 
     final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
     final private Map<Integer,Session> channels = new ConcurrentHashMap<Integer,Session>();
@@ -151,12 +150,12 @@ public class Connection extends Connecti
         listeners.add(listener);
     }
 
-    public Sender<ProtocolEvent> getSender()
+    public ProtocolEventSender getSender()
     {
         return sender;
     }
 
-    public void setSender(Sender<ProtocolEvent> sender)
+    public void setSender(ProtocolEventSender sender)
     {
         this.sender = sender;
     }
@@ -234,7 +233,7 @@ public class Connection extends Connecti
             OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
             final InputHandler inputHandler = new InputHandler(new Assembler(this));
             addFrameSizeObserver(inputHandler);
-            Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler);
+            ByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
             if(secureReceiver instanceof ConnectionListener)
             {
                 addConnectionListener((ConnectionListener)secureReceiver);
@@ -246,7 +245,7 @@ public class Connection extends Connecti
             setRemoteAddress(_networkConnection.getRemoteAddress());
             setLocalAddress(_networkConnection.getLocalAddress());
 
-            final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender());
+            final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender());
             if(secureSender instanceof ConnectionListener)
             {
                 addConnectionListener((ConnectionListener)secureSender);
@@ -411,7 +410,7 @@ public class Connection extends Connecti
         {
             log.debug("SEND: [%s] %s", this, event);
         }
-        Sender<ProtocolEvent> s = sender;
+        ProtocolEventSender s = sender;
         if (s == null)
         {
             throw new ConnectionException("connection closed");
@@ -425,7 +424,7 @@ public class Connection extends Connecti
         {
             log.debug("FLUSH: [%s]", this);
         }
-        final Sender<ProtocolEvent> theSender = sender;
+        final ProtocolEventSender theSender = sender;
         if(theSender != null)
         {
             theSender.flush();

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java?rev=1656248&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java Sat Jan 31 20:07:36 2015
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import org.apache.qpid.transport.network.NetworkEvent;
+
+public interface NetworkEventReceiver
+{
+    void received(NetworkEvent msg);
+
+    void exception(Throwable t);
+
+    void closed();
+}

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java?rev=1656248&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java Sat Jan 31 20:07:36 2015
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public interface ProtocolEventReceiver
+{
+    void received(ProtocolEvent msg);
+
+    void exception(Throwable t);
+
+    void closed();
+}

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java?rev=1656248&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java Sat Jan 31 20:07:36 2015
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public interface ProtocolEventSender
+{
+    void send(ProtocolEvent msg);
+
+    void flush();
+
+    void close();
+}

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java Sat Jan 31 20:07:36 2015
@@ -20,12 +20,6 @@
  */
 package org.apache.qpid.transport.codec;
 
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.Type;
-
-import org.apache.qpid.transport.Xid;
 import static org.apache.qpid.transport.util.Functions.lsb;
 
 import java.io.UnsupportedEncodingException;
@@ -36,6 +30,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.Type;
+import org.apache.qpid.transport.Xid;
+
 
 /**
  * AbstractEncoder
@@ -43,7 +43,7 @@ import java.util.UUID;
  * @author Rafael H. Schloming
  */
 
-abstract class AbstractEncoder implements Encoder
+public abstract class AbstractEncoder implements Encoder
 {
 
     private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java Sat Jan 31 20:07:36 2015
@@ -360,8 +360,4 @@ public final class BBEncoder extends Abs
 		}
 	}
 
-	public void writeMagicNumber()
-	{
-		out.put("AM2".getBytes());
-	}	
-}
\ No newline at end of file
+}

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java Sat Jan 31 20:07:36 2015
@@ -20,13 +20,14 @@
  */
 package org.apache.qpid.transport.codec;
 
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+
 
 /**
  * Encoder interface.
@@ -274,9 +275,10 @@ public interface Encoder
      * @param bytes the bytes array to be encoded.
      */
     void writeBin128(byte [] bytes);
-    
-    /**
-     * Encodes the AMQP magic number.
-     */
-    void writeMagicNumber();
-}
\ No newline at end of file
+
+    int position();
+
+    ByteBuffer underlyingBuffer();
+
+    void init();
+}

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Sat Jan 31 20:07:36 2015
@@ -20,28 +20,29 @@
  */
 package org.apache.qpid.transport.network;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.NetworkEventReceiver;
 import org.apache.qpid.transport.ProtocolError;
 import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventReceiver;
 import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Struct;
 import org.apache.qpid.transport.codec.BBDecoder;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Assembler
  *
  */
-public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
+public class Assembler implements NetworkEventReceiver, NetworkDelegate
 {
     // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge
     // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1.
@@ -49,7 +50,7 @@ public class Assembler implements Receiv
     private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
     private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>();
 
-    private final Receiver<ProtocolEvent> receiver;
+    private final ProtocolEventReceiver receiver;
     private final Map<Integer,List<Frame>> segments;
     private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>()
     {
@@ -59,7 +60,7 @@ public class Assembler implements Receiv
         }
     };
 
-    public Assembler(Receiver<ProtocolEvent> receiver)
+    public Assembler(ProtocolEventReceiver receiver)
     {
         this.receiver = receiver;
         segments = new HashMap<Integer,List<Frame>>();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java Sat Jan 31 20:07:36 2015
@@ -20,15 +20,13 @@
  */
 package org.apache.qpid.transport.network;
 
-import java.nio.ByteBuffer;
-
 import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.ConnectionListener;
 import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
 import org.apache.qpid.transport.network.security.sasl.SASLSender;
 
@@ -38,10 +36,10 @@ import org.apache.qpid.transport.network
  */
 
 public abstract class ConnectionBinding
-    implements Binding<Connection,ByteBuffer>
+    implements Binding<Connection>
 {
 
-    public static Binding<Connection,ByteBuffer> get(final Connection connection)
+    public static Binding<Connection> get(final Connection connection)
     {
         return new ConnectionBinding()
         {
@@ -52,7 +50,7 @@ public abstract class ConnectionBinding
         };
     }
 
-    public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate)
+    public static Binding<Connection> get(final ConnectionDelegate delegate)
     {
         return new ConnectionBinding()
         {
@@ -69,7 +67,7 @@ public abstract class ConnectionBinding
 
     public abstract Connection connection();
 
-    public Connection endpoint(Sender<ByteBuffer> sender)
+    public Connection endpoint(ByteBufferSender sender)
     {
         Connection conn = connection();
 
@@ -87,7 +85,7 @@ public abstract class ConnectionBinding
         return conn;
     }
 
-    public Receiver<ByteBuffer> receiver(Connection conn)
+    public ByteBufferReceiver receiver(Connection conn)
     {
         final InputHandler inputHandler = new InputHandler(new Assembler(conn));
         conn.addFrameSizeObserver(inputHandler);

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Sat Jan 31 20:07:36 2015
@@ -30,27 +30,29 @@ import static org.apache.qpid.transport.
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.FrameSizeObserver;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolDelegate;
 import org.apache.qpid.transport.ProtocolError;
 import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventSender;
 import org.apache.qpid.transport.ProtocolHeader;
 import org.apache.qpid.transport.SegmentType;
-import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.Struct;
 import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Encoder;
 
 /**
  * Disassembler
  */
-public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver
+public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
 {
-    private final Sender<ByteBuffer> sender;
+    private final ByteBufferSender sender;
     private int maxPayload;
     private final Object sendlock = new Object();
-    private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
+    private final static ThreadLocal<Encoder> _encoder = new ThreadLocal<Encoder>()
     {
         public BBEncoder initialValue()
         {
@@ -58,7 +60,7 @@ public final class Disassembler implemen
         }
     };
 
-    public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
+    public Disassembler(ByteBufferSender sender, int maxFrame)
     {
         this.sender = sender;
         if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
@@ -174,7 +176,7 @@ public final class Disassembler implemen
 
     private void method(Method method, SegmentType type)
     {
-        BBEncoder enc = _encoder.get();
+        Encoder enc = _encoder.get();
         enc.init();
         enc.writeUint16(method.getEncodedType());
         if (type == SegmentType.COMMAND)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java Sat Jan 31 20:07:36 2015
@@ -29,11 +29,12 @@ import static org.apache.qpid.transport.
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.Constant;
 import org.apache.qpid.transport.FrameSizeObserver;
+import org.apache.qpid.transport.NetworkEventReceiver;
 import org.apache.qpid.transport.ProtocolError;
 import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.SegmentType;
 
 
@@ -43,7 +44,7 @@ import org.apache.qpid.transport.Segment
  * @author Rafael H. Schloming
  */
 
-public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
+public class InputHandler implements ByteBufferReceiver, FrameSizeObserver
 {
 
     private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
@@ -56,7 +57,7 @@ public class InputHandler implements Rec
         ERROR
     }
 
-    private final Receiver<NetworkEvent> receiver;
+    private final NetworkEventReceiver receiver;
     private State state;
     private ByteBuffer input = null;
     private int needed;
@@ -66,7 +67,7 @@ public class InputHandler implements Rec
     private byte track;
     private int channel;
 
-    public InputHandler(Receiver<NetworkEvent> receiver, State state)
+    public InputHandler(NetworkEventReceiver receiver, State state)
     {
         this.receiver = receiver;
         this.state = state;
@@ -82,7 +83,7 @@ public class InputHandler implements Rec
         }
     }
 
-    public InputHandler(Receiver<NetworkEvent> receiver)
+    public InputHandler(NetworkEventReceiver receiver)
     {
         this(receiver, PROTO_HDR);
     }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java Sat Jan 31 20:07:36 2015
@@ -21,13 +21,13 @@
 package org.apache.qpid.transport.network;
 
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.security.Principal;
-import org.apache.qpid.transport.Sender;
+
+import org.apache.qpid.transport.ByteBufferSender;
 
 public interface NetworkConnection
 {
-    Sender<ByteBuffer> getSender();
+    ByteBufferSender getSender();
 
     void start();
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java Sat Jan 31 20:07:36 2015
@@ -20,16 +20,14 @@
  */
 package org.apache.qpid.transport.network;
 
+import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-
-import java.nio.ByteBuffer;
 
 public interface OutgoingNetworkTransport extends NetworkTransport
 {
     public NetworkConnection getConnection();
 
     public NetworkConnection connect(ConnectionSettings settings,
-                                     Receiver<ByteBuffer> delegate,
+                                     ByteBufferReceiver delegate,
                                      TransportActivity transportActivity);
-}
\ No newline at end of file
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org