You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/01/31 21:07:38 UTC
svn commit: r1656248 [1/2] - in
/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broker-plugins/amqp-0-8-...
Author: rgodfrey
Date: Sat Jan 31 20:07:36 2015
New Revision: 1656248
URL: http://svn.apache.org/r1656248
Log:
Separate Byte and ProtocolEvent sender/receivers, add server specific 0-10 encoder
Added:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
- copied, changed from r1655432, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
- copied, changed from r1655432, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java (with props)
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java (with props)
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java (with props)
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java (with props)
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java (with props)
Removed:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Sat Jan 31 20:07:36 2015
@@ -37,7 +37,7 @@ import org.apache.qpid.server.model.Prot
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
public class MultiVersionProtocolEngine implements ServerProtocolEngine
@@ -54,7 +54,7 @@ public class MultiVersionProtocolEngine
private String _fqdn;
private final Broker<?> _broker;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private final Protocol _defaultSupportedReply;
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
@@ -171,7 +171,7 @@ public class MultiVersionProtocolEngine
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
_network = network;
SocketAddress address = _network.getLocalAddress();
@@ -253,7 +253,7 @@ public class MultiVersionProtocolEngine
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
}
@@ -494,7 +494,7 @@ public class MultiVersionProtocolEngine
}
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Sat Jan 31 20:07:36 2015
@@ -32,10 +32,9 @@ import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -68,7 +67,7 @@ public class ProtocolEngine_0_10 extend
}
}
- public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender)
{
if(!getSubject().equals(Subject.getSubject(AccessController.getContext())))
{
@@ -88,7 +87,7 @@ public class ProtocolEngine_0_10 extend
_network = network;
_connection.setNetworkConnection(network);
- Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE);
+ ServerDisassembler disassembler = new ServerDisassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE);
_connection.setSender(disassembler);
_connection.addFrameSizeObserver(disassembler);
// FIXME Two log messages to maintain compatibility with earlier protocol versions
@@ -97,19 +96,15 @@ public class ProtocolEngine_0_10 extend
}
}
- private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
+ private ByteBufferSender wrapSender(final ByteBufferSender sender)
{
- return new Sender<ByteBuffer>()
+ return new ByteBufferSender()
{
@Override
public void send(ByteBuffer msg)
{
_lastWriteTime = System.currentTimeMillis();
- ByteBuffer copy = ByteBuffer.wrap(new byte[msg.remaining()]);
- copy.put(msg);
- copy.flip();
- sender.send(copy);
-
+ sender.send(msg);
}
@Override
Copied: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java (from r1655432, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java&r1=1655432&r2=1656248&rev=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java Sat Jan 31 20:07:36 2015
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.transport.network;
+package org.apache.qpid.server.protocol.v0_10;
import static java.lang.Math.min;
import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
@@ -28,97 +28,83 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.network.Frame.LAST_SEG;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolDelegate;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventSender;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.SegmentType;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Encoder;
+import org.apache.qpid.transport.network.Frame;
/**
* Disassembler
*/
-public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver
+public final class ServerDisassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
{
- private final Sender<ByteBuffer> sender;
- private int maxPayload;
- private final Object sendlock = new Object();
- private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
- {
- public BBEncoder initialValue()
- {
- return new BBEncoder(4*1024);
- }
- };
+ private final ByteBufferSender _sender;
+ private int _maxPayload;
+ private final Object _sendLock = new Object();
+ private final Encoder _encoder = new ServerEncoder();
- public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
+ public ServerDisassembler(ByteBufferSender sender, int maxFrame)
{
- this.sender = sender;
- if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+ _sender = sender;
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64 * 1024)
{
throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
}
- this.maxPayload = maxFrame - HEADER_SIZE;
+ _maxPayload = maxFrame - HEADER_SIZE;
}
public void send(ProtocolEvent event)
{
- event.delegate(null, this);
+ synchronized (_sendLock)
+ {
+ event.delegate(null, this);
+ }
}
public void flush()
{
- synchronized (sendlock)
+ synchronized (_sendLock)
{
- sender.flush();
+ _sender.flush();
}
}
public void close()
{
- synchronized (sendlock)
+ synchronized (_sendLock)
{
- sender.close();
+ _sender.close();
}
}
- private final ByteBuffer _frameHeader = ByteBuffer.allocate(HEADER_SIZE);
-
- {
- _frameHeader.order(ByteOrder.BIG_ENDIAN);
- }
-
private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
{
- synchronized (sendlock)
- {
- ByteBuffer data = _frameHeader;
- _frameHeader.rewind();
+ ByteBuffer data = ByteBuffer.wrap(new byte[HEADER_SIZE]);
+
+ data.put(0, flags);
+ data.put(1, type);
+ data.putShort(2, (short) (size + HEADER_SIZE));
+ data.put(5, track);
+ data.putShort(6, (short) channel);
+
+
+ ByteBuffer dup = buf.duplicate();
+ dup.limit(dup.position() + size);
+ buf.position(buf.position() + size);
+ _sender.send(data);
+ _sender.send(dup);
-
- data.put(0, flags);
- data.put(1, type);
- data.putShort(2, (short) (size + HEADER_SIZE));
- data.put(5, track);
- data.putShort(6, (short) channel);
-
-
- int limit = buf.limit();
- buf.limit(buf.position() + size);
-
- data.rewind();
- sender.send(data);
- sender.send(buf);
- buf.limit(limit);
- }
}
private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf)
@@ -130,7 +116,7 @@ public final class Disassembler implemen
boolean first = true;
while (true)
{
- int size = min(maxPayload, remaining);
+ int size = min(_maxPayload, remaining);
remaining -= size;
byte newflags = flags;
@@ -155,12 +141,9 @@ public final class Disassembler implemen
public void init(Void v, ProtocolHeader header)
{
- synchronized (sendlock)
- {
- sender.send(header.toByteBuffer());
- sender.flush();
- }
- }
+ _sender.send(header.toByteBuffer());
+ _sender.flush();
+}
public void control(Void v, Method method)
{
@@ -174,7 +157,7 @@ public final class Disassembler implemen
private void method(Method method, SegmentType type)
{
- BBEncoder enc = _encoder.get();
+ Encoder enc = _encoder;
enc.init();
enc.writeUint16(method.getEncodedType());
if (type == SegmentType.COMMAND)
@@ -205,15 +188,15 @@ public final class Disassembler implemen
final Header hdr = method.getHeader();
if (hdr != null)
{
- if(hdr.getDeliveryProperties() != null)
+ if (hdr.getDeliveryProperties() != null)
{
enc.writeStruct32(hdr.getDeliveryProperties());
}
- if(hdr.getMessageProperties() != null)
+ if (hdr.getMessageProperties() != null)
{
enc.writeStruct32(hdr.getMessageProperties());
}
- if(hdr.getNonStandardProperties() != null)
+ if (hdr.getNonStandardProperties() != null)
{
for (Struct st : hdr.getNonStandardProperties())
{
@@ -221,25 +204,26 @@ public final class Disassembler implemen
}
}
}
+
headerLimit = enc.position();
}
-
- synchronized (sendlock)
+ synchronized (_sendLock)
{
ByteBuffer buf = enc.underlyingBuffer();
buf.position(0);
buf.limit(methodLimit);
- fragment(flags, type, method, buf);
+ fragment(flags, type, method, buf.duplicate());
if (payload)
{
ByteBuffer body = method.getBody();
buf.limit(headerLimit);
buf.position(methodLimit);
- fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf);
+
+ fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf.duplicate());
if (body != null)
{
- fragment(LAST_SEG, SegmentType.BODY, method, body);
+ fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate());
}
}
@@ -258,7 +242,7 @@ public final class Disassembler implemen
{
throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
}
- this.maxPayload = maxFrame - HEADER_SIZE;
+ this._maxPayload = maxFrame - HEADER_SIZE;
}
}
Copied: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java (from r1655432, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java&r1=1655432&r2=1656248&rev=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java Sat Jan 31 20:07:36 2015
@@ -18,88 +18,85 @@
* under the License.
*
*/
-package org.apache.qpid.transport.codec;
+package org.apache.qpid.server.protocol.v0_10;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.util.UUID;
+import org.apache.qpid.transport.codec.AbstractEncoder;
-/**
- * Byte Buffer Encoder.
- * Encoder concrete implementor using a backing byte buffer for encoding data.
- *
- * @author Rafael H. Schloming
- */
-public final class BBEncoder extends AbstractEncoder
+
+public final class ServerEncoder extends AbstractEncoder
{
- private ByteBuffer out;
- private int segment;
+ public static final int DEFAULT_CAPACITY = 4096;
+ private ByteBuffer _out;
+ private int _segment;
+ private int _initialCapacity;
- public BBEncoder(int capacity) {
- out = ByteBuffer.allocate(capacity);
- out.order(ByteOrder.BIG_ENDIAN);
- segment = 0;
+ public ServerEncoder()
+ {
+ this(DEFAULT_CAPACITY);
}
- public void init()
+ public ServerEncoder(int capacity)
{
- out.clear();
- segment = 0;
+ _initialCapacity = capacity;
+ _out = ByteBuffer.allocate(capacity);
+ _segment = 0;
}
- public ByteBuffer segment()
+ public void init()
{
- int pos = out.position();
- out.position(segment);
- ByteBuffer slice = out.slice();
- slice.limit(pos - segment);
- out.position(pos);
- segment = pos;
- return slice;
+ _out.position(_out.limit());
+ _out.limit(_out.capacity());
+ _out = _out.slice();
+ if(_out.remaining() < 256)
+ {
+ _out = ByteBuffer.allocate(_initialCapacity);
+ }
+ _segment = 0;
}
public ByteBuffer buffer()
{
- int pos = out.position();
- out.position(segment);
- ByteBuffer slice = out.slice();
- slice.limit(pos - segment);
- out.position(pos);
+ int pos = _out.position();
+ _out.position(_segment);
+ ByteBuffer slice = _out.slice();
+ slice.limit(pos - _segment);
+ _out.position(pos);
return slice;
}
public int position()
{
- return out.position();
+ return _out.position();
}
public ByteBuffer underlyingBuffer()
{
- return out;
+ return _out;
}
private void grow(int size)
{
- ByteBuffer old = out;
+ ByteBuffer old = _out;
int capacity = old.capacity();
- out = ByteBuffer.allocate(Math.max(capacity + size, 2*capacity));
- out.order(ByteOrder.BIG_ENDIAN);
+ _out = ByteBuffer.allocate(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity));
old.flip();
- out.put(old);
+ _out.put(old);
}
protected void doPut(byte b)
{
try
{
- out.put(b);
+ _out.put(b);
}
catch (BufferOverflowException e)
{
grow(1);
- out.put(b);
+ _out.put(b);
}
}
@@ -107,12 +104,12 @@ public final class BBEncoder extends Abs
{
try
{
- out.put(src);
+ _out.put(src);
}
catch (BufferOverflowException e)
{
grow(src.remaining());
- out.put(src);
+ _out.put(src);
}
}
@@ -120,12 +117,12 @@ public final class BBEncoder extends Abs
{
try
{
- out.put(bytes);
+ _out.put(bytes);
}
catch (BufferOverflowException e)
{
grow(bytes.length);
- out.put(bytes);
+ _out.put(bytes);
}
}
@@ -135,12 +132,12 @@ public final class BBEncoder extends Abs
try
{
- out.put((byte) b);
+ _out.put((byte) b);
}
catch (BufferOverflowException e)
{
grow(1);
- out.put((byte) b);
+ _out.put((byte) b);
}
}
@@ -150,12 +147,12 @@ public final class BBEncoder extends Abs
try
{
- out.putShort((short) s);
+ _out.putShort((short) s);
}
catch (BufferOverflowException e)
{
grow(2);
- out.putShort((short) s);
+ _out.putShort((short) s);
}
}
@@ -165,12 +162,12 @@ public final class BBEncoder extends Abs
try
{
- out.putInt((int) i);
+ _out.putInt((int) i);
}
catch (BufferOverflowException e)
{
grow(4);
- out.putInt((int) i);
+ _out.putInt((int) i);
}
}
@@ -178,87 +175,90 @@ public final class BBEncoder extends Abs
{
try
{
- out.putLong(l);
+ _out.putLong(l);
}
catch (BufferOverflowException e)
{
grow(8);
- out.putLong(l);
+ _out.putLong(l);
}
}
public int beginSize8()
{
- int pos = out.position();
+ int pos = _out.position();
try
{
- out.put((byte) 0);
+ _out.put((byte) 0);
}
catch (BufferOverflowException e)
{
grow(1);
- out.put((byte) 0);
+ _out.put((byte) 0);
}
return pos;
}
public void endSize8(int pos)
{
- int cur = out.position();
- out.put(pos, (byte) (cur - pos - 1));
+ int cur = _out.position();
+ _out.put(pos, (byte) (cur - pos - 1));
}
public int beginSize16()
{
- int pos = out.position();
+ int pos = _out.position();
try
{
- out.putShort((short) 0);
+ _out.putShort((short) 0);
}
catch (BufferOverflowException e)
{
grow(2);
- out.putShort((short) 0);
+ _out.putShort((short) 0);
}
return pos;
}
public void endSize16(int pos)
{
- int cur = out.position();
- out.putShort(pos, (short) (cur - pos - 2));
+ int cur = _out.position();
+ _out.putShort(pos, (short) (cur - pos - 2));
}
public int beginSize32()
{
- int pos = out.position();
+ int pos = _out.position();
try
{
- out.putInt(0);
+ _out.putInt(0);
}
catch (BufferOverflowException e)
{
grow(4);
- out.putInt(0);
+ _out.putInt(0);
}
return pos;
+
}
public void endSize32(int pos)
{
- int cur = out.position();
- out.putInt(pos, (cur - pos - 4));
+ int cur = _out.position();
+ _out.putInt(pos, (cur - pos - 4));
+
}
public void writeDouble(double aDouble)
{
- try
+ try
{
- out.putDouble(aDouble);
- } catch(BufferOverflowException exception)
+ _out.putDouble(aDouble);
+ }
+ catch(BufferOverflowException exception)
{
grow(8);
- out.putDouble(aDouble);
+ _out.putDouble(aDouble);
}
}
@@ -266,11 +266,12 @@ public final class BBEncoder extends Abs
{
try
{
- out.putShort(aShort);
- } catch(BufferOverflowException exception)
+ _out.putShort(aShort);
+ }
+ catch(BufferOverflowException exception)
{
grow(2);
- out.putShort(aShort);
+ _out.putShort(aShort);
}
}
@@ -278,11 +279,12 @@ public final class BBEncoder extends Abs
{
try
{
- out.putInt(anInt);
- } catch(BufferOverflowException exception)
+ _out.putInt(anInt);
+ }
+ catch(BufferOverflowException exception)
{
grow(4);
- out.putInt(anInt);
+ _out.putInt(anInt);
}
}
@@ -290,11 +292,12 @@ public final class BBEncoder extends Abs
{
try
{
- out.putLong(aLong);
- } catch(BufferOverflowException exception)
+ _out.putLong(aLong);
+ }
+ catch(BufferOverflowException exception)
{
grow(8);
- out.putLong(aLong);
+ _out.putLong(aLong);
}
}
@@ -302,11 +305,12 @@ public final class BBEncoder extends Abs
{
try
{
- out.put(aByte);
- } catch(BufferOverflowException exception)
+ _out.put(aByte);
+ }
+ catch(BufferOverflowException exception)
{
grow(1);
- out.put(aByte);
+ _out.put(aByte);
}
}
@@ -318,11 +322,12 @@ public final class BBEncoder extends Abs
try
{
- out.put(byteArray);
- } catch(BufferOverflowException exception)
+ _out.put(byteArray);
+ }
+ catch(BufferOverflowException exception)
{
grow(16);
- out.put(byteArray);
+ _out.put(byteArray);
}
}
@@ -352,16 +357,13 @@ public final class BBEncoder extends Abs
{
try
{
- out.putFloat(aFloat);
- } catch(BufferOverflowException exception)
+ _out.putFloat(aFloat);
+ }
+ catch(BufferOverflowException exception)
{
grow(4);
- out.putFloat(aFloat);
+ _out.putFloat(aFloat);
}
}
- public void writeMagicNumber()
- {
- out.put("AM2".getBytes());
- }
-}
\ No newline at end of file
+}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Sat Jan 31 20:07:36 2015
@@ -85,7 +85,7 @@ import org.apache.qpid.server.util.Actio
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
@@ -167,7 +167,7 @@ public class AMQProtocolEngine implement
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private volatile boolean _deferFlush;
private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want?
@@ -272,7 +272,7 @@ public class AMQProtocolEngine implement
setNetworkConnection(network, network.getSender());
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
_network = network;
_sender = sender;
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Sat Jan 31 20:07:36 2015
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -50,7 +51,7 @@ import org.apache.qpid.server.model.port
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
@@ -282,11 +283,11 @@ public class InternalTestProtocolSession
private String _remoteHost = "127.0.0.1";
private String _localHost = "127.0.0.1";
private int _port = portNumber.incrementAndGet();
- private final Sender<ByteBuffer> _sender;
+ private final ByteBufferSender _sender;
public TestNetworkConnection()
{
- _sender = new Sender<ByteBuffer>()
+ _sender = new ByteBufferSender()
{
public void send(ByteBuffer msg)
{
@@ -348,7 +349,7 @@ public class InternalTestProtocolSession
}
@Override
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _sender;
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java Sat Jan 31 20:07:36 2015
@@ -59,7 +59,7 @@ import org.apache.qpid.server.model.port
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -116,7 +116,7 @@ public class ProtocolEngine_1_0_0_SASL i
private byte _revision;
private PrintWriter _out;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private Connection_1_0 _connection;
private volatile boolean _transportBlockedForWriting;
@@ -185,7 +185,7 @@ public class ProtocolEngine_1_0_0_SASL i
{
}
- public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender)
{
_network = network;
_sender = sender;
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Sat Jan 31 20:07:36 2015
@@ -53,7 +53,7 @@ import org.apache.qpid.server.model.port
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
@@ -240,7 +240,7 @@ class WebSocketProvider implements Accep
}
}
- private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer>
+ private class ConnectionWrapper implements NetworkConnection, ByteBufferSender
{
private final WebSocket.Connection _connection;
private final SocketAddress _localAddress;
@@ -259,7 +259,7 @@ class WebSocketProvider implements Accep
}
@Override
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return this;
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Sat Jan 31 20:07:36 2015
@@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.XASession;
-import org.apache.qpid.transport.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +59,7 @@ import org.apache.qpid.jms.ChannelLimitR
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.Session;
import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
@@ -527,12 +527,12 @@ public class AMQConnectionDelegate_8_0 i
}
- private static class ReceiverClosedWaiter implements Receiver<ByteBuffer>
+ private static class ReceiverClosedWaiter implements ByteBufferReceiver
{
private final CountDownLatch _closedWatcher;
- private final Receiver<ByteBuffer> _receiver;
+ private final ByteBufferReceiver _receiver;
- public ReceiverClosedWaiter(Receiver<ByteBuffer> receiver)
+ public ReceiverClosedWaiter(ByteBufferReceiver receiver)
{
_receiver = receiver;
_closedWatcher = new CountDownLatch(1);
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Sat Jan 31 20:07:36 2015
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.client.handler;
-import java.nio.ByteBuffer;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +33,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.TransportException;
public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
@@ -95,7 +93,7 @@ public class ConnectionCloseMethodHandle
}
finally
{
- Sender<ByteBuffer> sender = session.getSender();
+ ByteBufferSender sender = session.getSender();
if (error != null)
{
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Sat Jan 31 20:07:36 2015
@@ -66,8 +66,8 @@ import org.apache.qpid.protocol.AMQMetho
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
@@ -179,7 +179,7 @@ public class AMQProtocolHandler implemen
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private long _lastReadTime = System.currentTimeMillis();
private long _lastWriteTime = System.currentTimeMillis();
private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
@@ -905,7 +905,7 @@ public class AMQProtocolHandler implemen
setNetworkConnection(network, network.getSender());
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
_network = network;
_sender = sender;
@@ -923,7 +923,7 @@ public class AMQProtocolHandler implemen
return _lastWriteTime;
}
- protected Sender<ByteBuffer> getSender()
+ protected ByteBufferSender getSender()
{
return _sender;
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Sat Jan 31 20:07:36 2015
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -52,8 +51,8 @@ import org.apache.qpid.framing.ProtocolI
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
/**
@@ -382,7 +381,7 @@ public class AMQProtocolSession implemen
}
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _protocolHandler.getSender();
}
@@ -476,7 +475,7 @@ public class AMQProtocolSession implemen
_protocolHandler.propagateExceptionToAllWaiters(error);
}
- public void setSender(Sender<java.nio.ByteBuffer> sender)
+ public void setSender(ByteBufferSender sender)
{
// No-op, interface munging
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Sat Jan 31 20:07:36 2015
@@ -666,7 +666,7 @@ public class AMQSession_0_10Test extends
}
}
- class MockSender implements Sender<ProtocolEvent>
+ class MockSender implements ProtocolEventSender
{
private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java Sat Jan 31 20:07:36 2015
@@ -22,9 +22,9 @@ package org.apache.qpid.client.transport
import java.nio.ByteBuffer;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferSender;
-public class MockSender implements Sender<ByteBuffer>
+public class MockSender implements ByteBufferSender
{
public void send(ByteBuffer msg)
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java Sat Jan 31 20:07:36 2015
@@ -20,18 +20,18 @@
*/
package org.apache.qpid.client.transport;
-import java.security.Principal;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
-
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.Principal;
+
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.network.NetworkConnection;
/**
* Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
@@ -147,7 +147,7 @@ public class TestNetworkConnection imple
_remoteAddress = address;
}
- public Sender<ByteBuffer> getSender()
+ public ByteBufferSender getSender()
{
return _sender;
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java Sat Jan 31 20:07:36 2015
@@ -26,9 +26,7 @@ import org.apache.qpid.framing.ContentBo
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.transport.Sender;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.transport.ByteBufferSender;
/**
@@ -56,6 +54,6 @@ public interface AMQVersionAwareProtocol
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
- public void setSender(Sender<ByteBuffer> sender);
+ public void setSender(ByteBufferSender sender);
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Sat Jan 31 20:07:36 2015
@@ -21,10 +21,9 @@
package org.apache.qpid.protocol;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
@@ -32,7 +31,7 @@ import org.apache.qpid.transport.network
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
* decodes it and then process the result.
*/
-public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity
+public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity
{
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
@@ -58,6 +57,6 @@ public interface ProtocolEngine extends
void encryptedTransport();
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender);
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java Sat Jan 31 20:07:36 2015
@@ -26,11 +26,11 @@ package org.apache.qpid.transport;
*
*/
-public interface Binding<E,T>
+public interface Binding<E>
{
- E endpoint(Sender<T> sender);
+ E endpoint(ByteBufferSender sender);
- Receiver<T> receiver(E endpoint);
+ ByteBufferReceiver receiver(E endpoint);
}
Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java?rev=1656248&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java Sat Jan 31 20:07:36 2015
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferReceiver
+{
+ void received(ByteBuffer msg);
+
+ void exception(Throwable t);
+
+ void closed();
+}
Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java?rev=1656248&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java Sat Jan 31 20:07:36 2015
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferSender
+{
+ void send(ByteBuffer msg);
+
+ void flush();
+
+ void close();
+}
Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Sat Jan 31 20:07:36 2015
@@ -27,7 +27,6 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.Connection.State.OPENING;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -68,7 +67,7 @@ import org.apache.qpid.util.Strings;
*/
public class Connection extends ConnectionInvoker
- implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
+ implements ProtocolEventReceiver, ProtocolEventSender
{
protected static final Logger log = Logger.get(Connection.class);
@@ -113,7 +112,7 @@ public class Connection extends Connecti
private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY;
private ConnectionDelegate delegate;
- private Sender<ProtocolEvent> sender;
+ private ProtocolEventSender sender;
final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
final private Map<Integer,Session> channels = new ConcurrentHashMap<Integer,Session>();
@@ -151,12 +150,12 @@ public class Connection extends Connecti
listeners.add(listener);
}
- public Sender<ProtocolEvent> getSender()
+ public ProtocolEventSender getSender()
{
return sender;
}
- public void setSender(Sender<ProtocolEvent> sender)
+ public void setSender(ProtocolEventSender sender)
{
this.sender = sender;
}
@@ -234,7 +233,7 @@ public class Connection extends Connecti
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
final InputHandler inputHandler = new InputHandler(new Assembler(this));
addFrameSizeObserver(inputHandler);
- Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler);
+ ByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
if(secureReceiver instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureReceiver);
@@ -246,7 +245,7 @@ public class Connection extends Connecti
setRemoteAddress(_networkConnection.getRemoteAddress());
setLocalAddress(_networkConnection.getLocalAddress());
- final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender());
+ final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender());
if(secureSender instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureSender);
@@ -411,7 +410,7 @@ public class Connection extends Connecti
{
log.debug("SEND: [%s] %s", this, event);
}
- Sender<ProtocolEvent> s = sender;
+ ProtocolEventSender s = sender;
if (s == null)
{
throw new ConnectionException("connection closed");
@@ -425,7 +424,7 @@ public class Connection extends Connecti
{
log.debug("FLUSH: [%s]", this);
}
- final Sender<ProtocolEvent> theSender = sender;
+ final ProtocolEventSender theSender = sender;
if(theSender != null)
{
theSender.flush();
Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java?rev=1656248&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java Sat Jan 31 20:07:36 2015
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import org.apache.qpid.transport.network.NetworkEvent;
+
+public interface NetworkEventReceiver
+{
+ void received(NetworkEvent msg);
+
+ void exception(Throwable t);
+
+ void closed();
+}
Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java?rev=1656248&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java Sat Jan 31 20:07:36 2015
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public interface ProtocolEventReceiver
+{
+ void received(ProtocolEvent msg);
+
+ void exception(Throwable t);
+
+ void closed();
+}
Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java?rev=1656248&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java Sat Jan 31 20:07:36 2015
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public interface ProtocolEventSender
+{
+ void send(ProtocolEvent msg);
+
+ void flush();
+
+ void close();
+}
Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java Sat Jan 31 20:07:36 2015
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.transport.codec;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-import org.apache.qpid.transport.Type;
-
-import org.apache.qpid.transport.Xid;
import static org.apache.qpid.transport.util.Functions.lsb;
import java.io.UnsupportedEncodingException;
@@ -36,6 +30,12 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.Type;
+import org.apache.qpid.transport.Xid;
+
/**
* AbstractEncoder
@@ -43,7 +43,7 @@ import java.util.UUID;
* @author Rafael H. Schloming
*/
-abstract class AbstractEncoder implements Encoder
+public abstract class AbstractEncoder implements Encoder
{
private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java Sat Jan 31 20:07:36 2015
@@ -360,8 +360,4 @@ public final class BBEncoder extends Abs
}
}
- public void writeMagicNumber()
- {
- out.put("AM2".getBytes());
- }
-}
\ No newline at end of file
+}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java Sat Jan 31 20:07:36 2015
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.transport.codec;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Struct;
-
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.Struct;
+
/**
* Encoder interface.
@@ -274,9 +275,10 @@ public interface Encoder
* @param bytes the bytes array to be encoded.
*/
void writeBin128(byte [] bytes);
-
- /**
- * Encodes the AMQP magic number.
- */
- void writeMagicNumber();
-}
\ No newline at end of file
+
+ int position();
+
+ ByteBuffer underlyingBuffer();
+
+ void init();
+}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Sat Jan 31 20:07:36 2015
@@ -20,28 +20,29 @@
*/
package org.apache.qpid.transport.network;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.NetworkEventReceiver;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventReceiver;
import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBDecoder;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* Assembler
*
*/
-public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
+public class Assembler implements NetworkEventReceiver, NetworkDelegate
{
// Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge
// array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1.
@@ -49,7 +50,7 @@ public class Assembler implements Receiv
private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>();
- private final Receiver<ProtocolEvent> receiver;
+ private final ProtocolEventReceiver receiver;
private final Map<Integer,List<Frame>> segments;
private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>()
{
@@ -59,7 +60,7 @@ public class Assembler implements Receiv
}
};
- public Assembler(Receiver<ProtocolEvent> receiver)
+ public Assembler(ProtocolEventReceiver receiver)
{
this.receiver = receiver;
segments = new HashMap<Integer,List<Frame>>();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java Sat Jan 31 20:07:36 2015
@@ -20,15 +20,13 @@
*/
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
-
import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.ByteBufferReceiver;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
@@ -38,10 +36,10 @@ import org.apache.qpid.transport.network
*/
public abstract class ConnectionBinding
- implements Binding<Connection,ByteBuffer>
+ implements Binding<Connection>
{
- public static Binding<Connection,ByteBuffer> get(final Connection connection)
+ public static Binding<Connection> get(final Connection connection)
{
return new ConnectionBinding()
{
@@ -52,7 +50,7 @@ public abstract class ConnectionBinding
};
}
- public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate)
+ public static Binding<Connection> get(final ConnectionDelegate delegate)
{
return new ConnectionBinding()
{
@@ -69,7 +67,7 @@ public abstract class ConnectionBinding
public abstract Connection connection();
- public Connection endpoint(Sender<ByteBuffer> sender)
+ public Connection endpoint(ByteBufferSender sender)
{
Connection conn = connection();
@@ -87,7 +85,7 @@ public abstract class ConnectionBinding
return conn;
}
- public Receiver<ByteBuffer> receiver(Connection conn)
+ public ByteBufferReceiver receiver(Connection conn)
{
final InputHandler inputHandler = new InputHandler(new Assembler(conn));
conn.addFrameSizeObserver(inputHandler);
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Sat Jan 31 20:07:36 2015
@@ -30,27 +30,29 @@ import static org.apache.qpid.transport.
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolDelegate;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolEventSender;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.SegmentType;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Encoder;
/**
* Disassembler
*/
-public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver
+public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
{
- private final Sender<ByteBuffer> sender;
+ private final ByteBufferSender sender;
private int maxPayload;
private final Object sendlock = new Object();
- private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
+ private final static ThreadLocal<Encoder> _encoder = new ThreadLocal<Encoder>()
{
public BBEncoder initialValue()
{
@@ -58,7 +60,7 @@ public final class Disassembler implemen
}
};
- public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
+ public Disassembler(ByteBufferSender sender, int maxFrame)
{
this.sender = sender;
if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
@@ -174,7 +176,7 @@ public final class Disassembler implemen
private void method(Method method, SegmentType type)
{
- BBEncoder enc = _encoder.get();
+ Encoder enc = _encoder.get();
enc.init();
enc.writeUint16(method.getEncodedType());
if (type == SegmentType.COMMAND)
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java Sat Jan 31 20:07:36 2015
@@ -29,11 +29,12 @@ import static org.apache.qpid.transport.
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.FrameSizeObserver;
+import org.apache.qpid.transport.NetworkEventReceiver;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.SegmentType;
@@ -43,7 +44,7 @@ import org.apache.qpid.transport.Segment
* @author Rafael H. Schloming
*/
-public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
+public class InputHandler implements ByteBufferReceiver, FrameSizeObserver
{
private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
@@ -56,7 +57,7 @@ public class InputHandler implements Rec
ERROR
}
- private final Receiver<NetworkEvent> receiver;
+ private final NetworkEventReceiver receiver;
private State state;
private ByteBuffer input = null;
private int needed;
@@ -66,7 +67,7 @@ public class InputHandler implements Rec
private byte track;
private int channel;
- public InputHandler(Receiver<NetworkEvent> receiver, State state)
+ public InputHandler(NetworkEventReceiver receiver, State state)
{
this.receiver = receiver;
this.state = state;
@@ -82,7 +83,7 @@ public class InputHandler implements Rec
}
}
- public InputHandler(Receiver<NetworkEvent> receiver)
+ public InputHandler(NetworkEventReceiver receiver)
{
this(receiver, PROTO_HDR);
}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java Sat Jan 31 20:07:36 2015
@@ -21,13 +21,13 @@
package org.apache.qpid.transport.network;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.Principal;
-import org.apache.qpid.transport.Sender;
+
+import org.apache.qpid.transport.ByteBufferSender;
public interface NetworkConnection
{
- Sender<ByteBuffer> getSender();
+ ByteBufferSender getSender();
void start();
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java?rev=1656248&r1=1656247&r2=1656248&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java Sat Jan 31 20:07:36 2015
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.transport.network;
+import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-
-import java.nio.ByteBuffer;
public interface OutgoingNetworkTransport extends NetworkTransport
{
public NetworkConnection getConnection();
public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
+ ByteBufferReceiver delegate,
TransportActivity transportActivity);
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org