You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/08/07 02:28:20 UTC
svn commit: r1694594 [2/5] - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ bdbs...
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java Fri Aug 7 00:28:17 2015
@@ -29,6 +29,7 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
@@ -115,7 +116,7 @@ public abstract class MessageStoreQuotaE
{
StorableMessageMetaData metaData = createMetaData(id, MESSAGE_DATA.length);
MessageHandle<?> handle = _store.addMessage(metaData);
- handle.addContent(ByteBuffer.wrap(MESSAGE_DATA));
+ handle.addContent(QpidByteBuffer.wrap(MESSAGE_DATA));
StoredMessage<? extends StorableMessageMetaData> storedMessage = handle.allContentAdded();
TestMessage message = new TestMessage(id, storedMessage);
return message;
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Fri Aug 7 00:28:17 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
import java.util.Collection;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -105,7 +106,7 @@ public class TestMessageMetaDataType imp
}
@Override
- public Collection<ByteBuffer> getContent(int offset, int size)
+ public Collection<QpidByteBuffer> getContent(int offset, int size)
{
return null;
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java Fri Aug 7 00:28:17 2015
@@ -19,6 +19,7 @@
package org.apache.qpid.server.transport;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.network.AggregateTicker;
@@ -112,7 +113,7 @@ public class NetworkConnectionSchedulerT
Thread.sleep(500l);
timidSender.start();
Thread.sleep(1000l);
- verify(timidEngine, atLeast(6)).received(any(ByteBuffer.class));
+ verify(timidEngine, atLeast(6)).received(any(QpidByteBuffer.class));
_keepRunningThreads = false;
transport.close();
scheduler.close();
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Fri Aug 7 00:28:17 2015
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.Collection;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -107,7 +108,7 @@ class MockServerMessage implements Serve
}
- public Collection<ByteBuffer> getContent(int offset, int size)
+ public Collection<QpidByteBuffer> getContent(int offset, int size)
{
throw new UnsupportedOperationException();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Fri Aug 7 00:28:17 2015
@@ -34,6 +34,7 @@ import javax.security.auth.Subject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
@@ -64,7 +65,7 @@ import org.apache.qpid.transport.network
public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10>
{
private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
- private final InputHandler _inputHandler;
+ private final ServerInputHandler _inputHandler;
private final NetworkConnection _network;
@@ -103,7 +104,8 @@ public class AMQPConnection_0_10 extends
_connection.setRemoteAddress(network.getRemoteAddress());
_connection.setLocalAddress(network.getLocalAddress());
- _inputHandler = new InputHandler(new ServerAssembler(_connection), true);
+ _inputHandler = new ServerInputHandler(new ServerAssembler(_connection));
+ _connection.addFrameSizeObserver(_inputHandler);
_network = network;
Subject.doAs(getSubject(), new PrivilegedAction<Object>()
@@ -163,7 +165,7 @@ public class AMQPConnection_0_10 extends
return new ByteBufferSender()
{
@Override
- public void send(ByteBuffer msg)
+ public void send(final QpidByteBuffer msg)
{
updateLastWriteTime();
sender.send(msg);
@@ -184,7 +186,7 @@ public class AMQPConnection_0_10 extends
};
}
- public void received(final ByteBuffer buf)
+ public void received(final QpidByteBuffer buf)
{
Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>()
{
@@ -212,10 +214,6 @@ public class AMQPConnection_0_10 extends
throw new ConnectionScopedRuntimeException(e);
}
}
- finally
- {
- buf.position(buf.limit());
- }
return null;
}
});
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Aug 7 00:28:17 2015
@@ -21,13 +21,18 @@
package org.apache.qpid.server.protocol.v0_10;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
@@ -266,26 +271,26 @@ public class ConsumerTarget_0_10 extends
boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding());
- ByteBuffer body = ByteBufferUtils.combine(msg.getBody());
+ Collection<QpidByteBuffer> body = msg.getBody();
boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported();
if(msgCompressed && !compressionSupported)
{
- byte[] uncompressed = GZIPUtils.uncompressBufferToArray(body);
+ byte[] uncompressed = GZIPUtils.uncompressBufferToArray(ByteBufferUtils.combine(body));
if(uncompressed != null)
{
messageProps.setContentEncoding(null);
- body = ByteBuffer.wrap(uncompressed);
+ body = Collections.singleton(QpidByteBuffer.wrap(uncompressed));
}
}
else if(!msgCompressed
&& compressionSupported
&& (messageProps == null || messageProps.getContentEncoding()==null)
&& body != null
- && body.remaining() > _session.getConnection().getMessageCompressionThreshold())
+ && ByteBufferUtils.remaining(body) > _session.getConnection().getMessageCompressionThreshold())
{
- byte[] compressed = GZIPUtils.compressBufferToArray(body);
+ byte[] compressed = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(body));
if(compressed != null)
{
if(messageProps == null)
@@ -293,7 +298,7 @@ public class ConsumerTarget_0_10 extends
messageProps = new MessageProperties();
}
messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
- body = ByteBuffer.wrap(compressed);
+ body = Collections.singleton(QpidByteBuffer.wrap(compressed));
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Fri Aug 7 00:28:17 2015
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
@@ -91,9 +92,9 @@ public class MessageConverter_Internal_t
}
@Override
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
{
- return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size));
+ return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Fri Aug 7 00:28:17 2015
@@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
@@ -91,7 +92,7 @@ public class MessageConverter_v0_10 impl
}
@Override
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
{
return serverMsg.getContent(offsetInMessage, size);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Fri Aug 7 00:28:17 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.
import java.nio.ByteBuffer;
import java.util.Collection;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
@@ -77,7 +78,7 @@ public class MessageTransferMessage exte
return getMetaData().getHeader();
}
- public Collection<ByteBuffer> getBody()
+ public Collection<QpidByteBuffer> getBody()
{
return getContent(0, (int)getSize());
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java Fri Aug 7 00:28:17 2015
@@ -22,32 +22,61 @@ package org.apache.qpid.server.protocol.
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.codec.*;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.NetworkEventReceiver;
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.network.Frame;
+import org.apache.qpid.transport.network.NetworkDelegate;
import org.apache.qpid.transport.network.NetworkEvent;
-public class ServerAssembler extends Assembler
+public class ServerAssembler
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerAssembler.class);
private final ServerConnection _connection;
- public ServerAssembler(final ServerConnection connection)
+
+
+ // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge
+ // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1.
+ private static final int ARRAY_SIZE = 0xFF;
+ private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
+ private final Map<Integer, Method> _incompleteMethodMap = new HashMap<>();
+
+ private final Map<Integer,List<ServerFrame>> _segments;
+ private final ServerDecoder _decoder = new ServerDecoder();
+
+ public ServerAssembler(ServerConnection connection)
{
- super(connection);
_connection = connection;
+ _segments = new HashMap<>();
}
- @Override
- public void received(final NetworkEvent event)
+ public void received(final ServerFrame event)
{
if (!_connection.isIgnoreFutureInput())
{
- super.received(event);
+ frame(event);
}
else
{
@@ -55,11 +84,209 @@ public class ServerAssembler extends Ass
}
}
- @Override
protected ByteBuffer allocateByteBuffer(int size)
{
return ByteBuffer.allocateDirect(size);
}
+ private int segmentKey(ServerFrame frame)
+ {
+ return (frame.getTrack() + 1) * frame.getChannel();
+ }
+
+ private List<ServerFrame> getSegment(ServerFrame frame)
+ {
+ return _segments.get(segmentKey(frame));
+ }
+
+ private void setSegment(ServerFrame frame, List<ServerFrame> segment)
+ {
+ int key = segmentKey(frame);
+ if (_segments.containsKey(key))
+ {
+ error(new ProtocolError(Frame.L2, "segment in progress: %s",
+ frame));
+ }
+ _segments.put(segmentKey(frame), segment);
+ }
+
+ private void clearSegment(ServerFrame frame)
+ {
+ _segments.remove(segmentKey(frame));
+ }
+
+ private void emit(int channel, ProtocolEvent event)
+ {
+ event.setChannel(channel);
+ _connection.received(event);
+ }
+
+ public void exception(Throwable t)
+ {
+ _connection.exception(t);
+ }
+
+ public void closed()
+ {
+ _connection.closed();
+ }
+
+ public void init(ProtocolHeader header)
+ {
+ emit(0, header);
+ }
+
+ public void error(ProtocolError error)
+ {
+ emit(0, error);
+ }
+
+ public void frame(ServerFrame frame)
+ {
+ List<QpidByteBuffer> segment;
+ if (frame.isFirstFrame() && frame.isLastFrame())
+ {
+ segment = Collections.singletonList(frame.getBody());
+ assemble(frame, segment);
+ }
+ else
+ {
+ List<ServerFrame> frames;
+ if (frame.isFirstFrame())
+ {
+ frames = new ArrayList<>();
+ setSegment(frame, frames);
+ }
+ else
+ {
+ frames = getSegment(frame);
+ }
+
+ frames.add(frame);
+
+ if (frame.isLastFrame())
+ {
+ clearSegment(frame);
+ segment = new ArrayList<>(frames.size());
+ for (ServerFrame f : frames)
+ {
+ segment.add(f.getBody());
+ }
+ assemble(frame, segment);
+ }
+ }
+
+ }
+
+ private void assemble(ServerFrame frame, List<QpidByteBuffer> segment)
+ {
+ ServerDecoder dec = _decoder;
+ dec.init(segment);
+
+ int channel = frame.getChannel();
+ Method command;
+
+ switch (frame.getType())
+ {
+ case CONTROL:
+ int controlType = dec.readUint16();
+ Method control = Method.create(controlType);
+ control.read(dec);
+ emit(channel, control);
+ break;
+ case COMMAND:
+ int commandType = dec.readUint16();
+ // read in the session header, right now we don't use it
+ int hdr = dec.readUint16();
+ command = Method.create(commandType);
+ command.setSync((0x0001 & hdr) != 0);
+ command.read(dec);
+ if (command.hasPayload() && !frame.isLastSegment())
+ {
+ setIncompleteCommand(channel, command);
+ }
+ else
+ {
+ emit(channel, command);
+ }
+ break;
+ case HEADER:
+ command = getIncompleteCommand(channel);
+ List<Struct> structs = null;
+ DeliveryProperties deliveryProps = null;
+ MessageProperties messageProps = null;
+
+ while (dec.hasRemaining())
+ {
+ Struct struct = dec.readStruct32();
+ if(struct instanceof DeliveryProperties && deliveryProps == null)
+ {
+ deliveryProps = (DeliveryProperties) struct;
+ }
+ else if(struct instanceof MessageProperties && messageProps == null)
+ {
+ messageProps = (MessageProperties) struct;
+ }
+ else
+ {
+ if(structs == null)
+ {
+ structs = new ArrayList<>(2);
+ }
+ structs.add(struct);
+ }
+
+ }
+ command.setHeader(new Header(deliveryProps,messageProps,structs));
+
+ if (frame.isLastSegment())
+ {
+ setIncompleteCommand(channel, null);
+ emit(channel, command);
+ }
+ break;
+ case BODY:
+ command = getIncompleteCommand(channel);
+ command.setBody(segment);
+ setIncompleteCommand(channel, null);
+ emit(channel, command);
+ break;
+ default:
+ throw new IllegalStateException("unknown frame type: " + frame.getType());
+ }
+
+ dec.releaseBuffer();
+ }
+
+ private void setIncompleteCommand(int channelId, Method incomplete)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ _incompleteMethodArray[channelId] = incomplete;
+ }
+ else
+ {
+ if(incomplete != null)
+ {
+ _incompleteMethodMap.put(channelId, incomplete);
+ }
+ else
+ {
+ _incompleteMethodMap.remove(channelId);
+ }
+ }
+ }
+
+ private Method getIncompleteCommand(int channelId)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ return _incompleteMethodArray[channelId];
+ }
+ else
+ {
+ return _incompleteMethodMap.get(channelId);
+ }
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Fri Aug 7 00:28:17 2015
@@ -294,7 +294,6 @@ public class ServerConnectionDelegate ex
}
setConnectionTuneOkChannelMax(sconn, okChannelMax);
-
conn.setMaxFrameSize(okMaxFrameSize);
}
Copied: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java (from r1693306, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java?p2=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java&r1=1693306&r2=1694594&rev=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java Fri Aug 7 00:28:17 2015
@@ -18,82 +18,138 @@
* under the License.
*
*/
-package org.apache.qpid.transport.codec;
-
-import org.apache.qpid.transport.Binary;
+package org.apache.qpid.server.protocol.v0_10;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
+import java.util.List;
-/**
- * Byte Buffer Decoder.
- * Decoder concrete implementor using a backing byte buffer for decoding data.
- *
- * @author Rafael H. Schloming
- */
-public final class BBDecoder extends AbstractDecoder
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.transport.codec.AbstractDecoder;
+
+public final class ServerDecoder extends AbstractDecoder
{
- private ByteBuffer in;
+ private List<QpidByteBuffer> _underlying;
+ private int _bufferIndex;
- public void init(ByteBuffer in)
- {
- this.in = in;
- this.in.order(ByteOrder.BIG_ENDIAN);
- }
- public void releaseBuffer()
+ public void init(List<QpidByteBuffer> in)
{
- in = null;
+ _underlying = in;
}
- protected byte doGet()
+ private void advanceIfNecessary()
{
- return in.get();
+ while(!getCurrentBuffer().hasRemaining() && _bufferIndex != _underlying.size()-1)
+ {
+ _bufferIndex++;
+ }
}
- protected void doGet(byte[] bytes)
+ private QpidByteBuffer getBuffer(int size)
{
- in.get(bytes);
+ advanceIfNecessary();
+ final QpidByteBuffer currentBuffer = getCurrentBuffer();
+ if(currentBuffer.remaining()>= size)
+ {
+ return currentBuffer;
+ }
+ else
+ {
+ return readAsNativeByteBuffer(size);
+ }
}
- protected Binary get(int size)
+ private QpidByteBuffer readAsNativeByteBuffer(int len)
{
- if (in.hasArray())
+ QpidByteBuffer currentBuffer = getCurrentBuffer();
+ if(currentBuffer.remaining()>=len)
{
- byte[] bytes = in.array();
- Binary bin = new Binary(bytes, in.arrayOffset() + in.position(), size);
- in.position(in.position() + size);
- return bin;
+ QpidByteBuffer buf = currentBuffer.slice();
+ buf.limit(len);
+ currentBuffer.position(currentBuffer.position()+len);
+ return buf;
}
else
{
- return super.get(size);
+ QpidByteBuffer dest = currentBuffer.isDirect() ? QpidByteBuffer.allocateDirect(len) : QpidByteBuffer.allocate(len);
+ while(dest.hasRemaining() && available()>0)
+ {
+ advanceIfNecessary();
+ currentBuffer = getCurrentBuffer();
+ final int remaining = dest.remaining();
+ if(currentBuffer.remaining()>= remaining)
+ {
+ QpidByteBuffer buf = currentBuffer.slice();
+ buf.limit(remaining);
+ currentBuffer.position(currentBuffer.position()+remaining);
+ dest.put(buf);
+ }
+ else
+ {
+ dest.put(currentBuffer);
+ }
+ }
+
+ dest.flip();
+ return dest;
}
}
+ private int available()
+ {
+ int remaining = 0;
+ for(int i = _bufferIndex; i < _underlying.size(); i++)
+ {
+ remaining += _underlying.get(i).remaining();
+ }
+ return remaining;
+ }
+
+
+ private QpidByteBuffer getCurrentBuffer()
+ {
+ return _underlying.get(_bufferIndex);
+ }
+
+
+ public void releaseBuffer()
+ {
+ _underlying = null;
+ }
+
+ protected byte doGet()
+ {
+ return getBuffer(1).get();
+ }
+
+ protected void doGet(byte[] bytes)
+ {
+ getBuffer(bytes.length).get(bytes);
+ }
+
public boolean hasRemaining()
{
- return in.hasRemaining();
+ return available() != 0;
}
public short readUint8()
{
- return (short) (0xFF & in.get());
+ return (short) (0xFF & getBuffer(1).get());
}
public int readUint16()
{
- return 0xFFFF & in.getShort();
+ return 0xFFFF & getBuffer(2).getShort();
}
public long readUint32()
{
- return 0xFFFFFFFFL & in.getInt();
+ return 0xFFFFFFFFL & getBuffer(4).getInt();
}
public long readUint64()
{
- return in.getLong();
+ return getBuffer(8).getLong();
}
public byte[] readBin128()
@@ -112,38 +168,38 @@ public final class BBDecoder extends Abs
public double readDouble()
{
- return in.getDouble();
+ return getBuffer(8).getDouble();
}
public float readFloat()
{
- return in.getFloat();
+ return getBuffer(4).getFloat();
}
public short readInt16()
{
- return in.getShort();
+ return getBuffer(2).getShort();
}
public int readInt32()
{
- return in.getInt();
+ return getBuffer(4).getInt();
}
public byte readInt8()
{
- return in.get();
+ return getBuffer(1).get();
}
public byte[] readReaminingBytes()
{
- byte[] result = new byte[in.limit() - in.position()];
+ byte[] result = new byte[available()];
get(result);
return result;
}
public long readInt64()
{
- return in.getLong();
+ return getBuffer(8).getLong();
}
-}
\ No newline at end of file
+}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java Fri Aug 7 00:28:17 2015
@@ -28,10 +28,14 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.network.Frame.LAST_SEG;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.Header;
@@ -91,9 +95,9 @@ public final class ServerDisassembler im
}
}
- private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
+ private void frame(byte flags, byte type, byte track, int channel, int size, Collection<QpidByteBuffer> buf)
{
- ByteBuffer data = ByteBuffer.allocate(HEADER_SIZE);
+ QpidByteBuffer data = QpidByteBuffer.allocate(HEADER_SIZE);
data.put(0, flags);
data.put(1, type);
@@ -102,21 +106,43 @@ public final class ServerDisassembler im
data.putShort(6, (short) channel);
- ByteBuffer dup = buf.duplicate();
- dup.limit(dup.position() + size);
- buf.position(buf.position() + size);
_sender.send(data);
- _sender.send(dup);
-
+ if(size > 0)
+ {
+ int residual = size;
+ for(QpidByteBuffer b : buf)
+ {
+ final int remaining = b.remaining();
+ if(remaining > 0 )
+ {
+ if(remaining >= residual)
+ {
+ _sender.send(b.view(0,residual));
+ b.position(b.position()+residual);
+ break;
+ }
+ else
+ {
+ _sender.send(b.duplicate());
+ b.position(b.limit());
+ residual-=remaining;
+ }
+ }
+ }
+ }
}
- private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf)
+ private void fragment(byte flags, SegmentType type, ProtocolEvent event, Collection<QpidByteBuffer> buf)
{
byte typeb = (byte) type.getValue();
byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
- int remaining = buf.remaining();
+ int remaining = 0;
+ for(QpidByteBuffer b : buf)
+ {
+ remaining += b.remaining();
+ }
boolean first = true;
while (true)
{
@@ -145,7 +171,7 @@ public final class ServerDisassembler im
public void init(Void v, ProtocolHeader header)
{
- _sender.send(header.toByteBuffer(true));
+ _sender.send(header.toByteBuffer());
_sender.flush();
}
@@ -217,17 +243,22 @@ public final class ServerDisassembler im
buf.position(0);
buf.limit(methodLimit);
- fragment(flags, type, method, buf.duplicate());
+ fragment(flags, type, method, Collections.singletonList(QpidByteBuffer.wrap(buf.duplicate())));
if (payload)
{
- ByteBuffer body = method.getBody();
+ Collection<QpidByteBuffer> body = method.getBody();
buf.limit(headerLimit);
buf.position(methodLimit);
- fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf.duplicate());
+ fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, Collections.singletonList(QpidByteBuffer.wrap(buf.duplicate())));
if (body != null)
{
- fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate());
+ Collection<QpidByteBuffer> dup = new ArrayList<>();
+ for(QpidByteBuffer b : body)
+ {
+ dup.add(b.duplicate());
+ }
+ fragment(LAST_SEG, SegmentType.BODY, method, dup);
}
}
Copied: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java (from r1693306, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Frame.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java?p2=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Frame.java&r1=1693306&r2=1694594&rev=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Frame.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java Fri Aug 7 00:28:17 2015
@@ -18,23 +18,25 @@
* under the License.
*
*/
-package org.apache.qpid.transport.network;
+package org.apache.qpid.server.protocol.v0_10;
import static org.apache.qpid.transport.util.Functions.str;
import java.nio.ByteBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.SegmentType;
+import org.apache.qpid.transport.network.NetworkDelegate;
+import org.apache.qpid.transport.network.NetworkEvent;
-/**
- * Frame
- *
- * @author Rafael H. Schloming
- */
-
-public final class Frame implements NetworkEvent
+public final class ServerFrame
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerFrame.class);
+
public static final int HEADER_SIZE = 12;
// XXX: enums?
@@ -43,10 +45,6 @@ public final class Frame implements Netw
public static final byte L3 = 2;
public static final byte L4 = 3;
- public static final byte RESERVED = 0x0;
-
- public static final byte VERSION = 0x0;
-
public static final byte FIRST_SEG = 0x8;
public static final byte LAST_SEG = 0x4;
public static final byte FIRST_FRAME = 0x2;
@@ -56,10 +54,10 @@ public final class Frame implements Netw
final private SegmentType type;
final private byte track;
final private int channel;
- final private ByteBuffer body;
+ final private QpidByteBuffer body;
- public Frame(byte flags, SegmentType type, byte track, int channel,
- ByteBuffer body)
+ public ServerFrame(byte flags, SegmentType type, byte track, int channel,
+ QpidByteBuffer body)
{
this.flags = flags;
this.type = type;
@@ -68,7 +66,7 @@ public final class Frame implements Netw
this.body = body;
}
- public ByteBuffer getBody()
+ public QpidByteBuffer getBody()
{
return body.slice();
}
@@ -123,11 +121,6 @@ public final class Frame implements Netw
return flag(LAST_FRAME);
}
- public void delegate(NetworkDelegate delegate)
- {
- delegate.frame(this);
- }
-
public String toString()
{
StringBuilder str = new StringBuilder();
Copied: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java (from r1693325, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java?p2=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java&r1=1693325&r2=1694594&rev=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java Fri Aug 7 00:28:17 2015
@@ -18,40 +18,26 @@
* under the License.
*
*/
-package org.apache.qpid.transport.network;
+package org.apache.qpid.server.protocol.v0_10;
-import static org.apache.qpid.transport.network.InputHandler.State.ERROR;
-import static org.apache.qpid.transport.network.InputHandler.State.FRAME_BODY;
-import static org.apache.qpid.transport.network.InputHandler.State.FRAME_HDR;
-import static org.apache.qpid.transport.network.InputHandler.State.PROTO_HDR;
import static org.apache.qpid.transport.util.Functions.str;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
+import static org.apache.qpid.server.protocol.v0_10.ServerInputHandler.State.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.FrameSizeObserver;
-import org.apache.qpid.transport.NetworkEventReceiver;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.SegmentType;
-/**
- * InputHandler
- *
- * @author Rafael H. Schloming
- */
-
-public class InputHandler implements ExceptionHandlingByteBufferReceiver, FrameSizeObserver
+public class ServerInputHandler implements FrameSizeObserver
{
- private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
- private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer
- .allocate(0);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerInputHandler.class);
+ private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.allocate(0);
private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
@@ -60,15 +46,11 @@ public class InputHandler implements Exc
{
PROTO_HDR,
FRAME_HDR,
- FRAME_BODY,
ERROR;
}
- private final NetworkEventReceiver receiver;
+ private final ServerAssembler _serverAssembler;
- private final boolean _useDirect;
- private State state;
- private ByteBuffer input = null;
- private int needed;
+ private State _state = PROTO_HDR;
private byte flags;
private SegmentType type;
@@ -76,21 +58,10 @@ public class InputHandler implements Exc
private int channel;
- public InputHandler(NetworkEventReceiver receiver, final boolean useDirect)
+ public ServerInputHandler(ServerAssembler serverAssembler)
{
- this.receiver = receiver;
- this.state = PROTO_HDR;
- _useDirect = useDirect;
-
- switch (state)
- {
- case PROTO_HDR:
- needed = 8;
- break;
- case FRAME_HDR:
- needed = Frame.HEADER_SIZE;
- break;
- }
+ _serverAssembler = serverAssembler;
+ _state = PROTO_HDR;
}
public void setMaxFrameSize(final int maxFrameSize)
@@ -100,125 +71,127 @@ public class InputHandler implements Exc
private void error(String fmt, Object ... args)
{
- receiver.received(new ProtocolError(Frame.L1, fmt, args));
+ _serverAssembler.error(new ProtocolError(ServerFrame.L1, fmt, args));
}
- @Override
- public void received(ByteBuffer buf)
+ public void received(QpidByteBuffer buf)
{
- int limit = buf.limit();
- int remaining = buf.remaining();
- while (remaining > 0)
+ int position = buf.position();
+ while(buf.hasRemaining() && _state != ERROR)
{
- if (remaining >= needed)
- {
- int consumed = needed;
- int pos = buf.position();
- if (input == null)
- {
- buf.limit(pos + needed);
- input = buf;
- state = next(pos);
- buf.limit(limit);
- buf.position(pos + consumed);
- }
- else
- {
- buf.limit(pos + needed);
- input.put(buf);
- buf.limit(limit);
- input.flip();
- state = next(0);
- }
+ parse(buf);
- remaining -= consumed;
- input = null;
+ int newPosition = buf.position();
+ if(position == newPosition)
+ {
+ break;
}
else
{
- if (input == null)
- {
- input = _useDirect ? ByteBuffer.allocateDirect(needed) : ByteBuffer.allocate(needed);
- }
- input.put(buf);
- needed -= remaining;
- remaining = 0;
+ position = newPosition;
}
}
}
- private State next(int pos)
+ private void parse(QpidByteBuffer buffer)
{
- input.order(ByteOrder.BIG_ENDIAN);
-
- switch (state) {
- case PROTO_HDR:
- if (input.get(pos) != 'A' &&
- input.get(pos + 1) != 'M' &&
- input.get(pos + 2) != 'Q' &&
- input.get(pos + 3) != 'P')
- {
- error("bad protocol header: %s", str(input));
- return ERROR;
- }
+ buffer.mark();
+ switch (_state) {
+ case PROTO_HDR:
+ if(buffer.remaining() < 8)
+ {
+ break;
+ }
+ if (buffer.get() != 'A' ||
+ buffer.get() != 'M' ||
+ buffer.get() != 'Q' ||
+ buffer.get() != 'P')
+ {
+ buffer.reset();
+ error("bad protocol header: %s", str(buffer));
+ _state = ERROR;
+ }
+ else
+ {
+ byte protoClass = buffer.get();
+ byte instance = buffer.get();
+ byte major = buffer.get();
+ byte minor = buffer.get();
- byte protoClass = input.get(pos + 4);
- byte instance = input.get(pos + 5);
- byte major = input.get(pos + 6);
- byte minor = input.get(pos + 7);
- receiver.received(new ProtocolHeader(protoClass, instance, major, minor));
- needed = Frame.HEADER_SIZE;
- return FRAME_HDR;
- case FRAME_HDR:
- flags = input.get(pos);
- type = SegmentType.get(input.get(pos + 1));
- int size = (0xFFFF & input.getShort(pos + 2));
- size -= Frame.HEADER_SIZE;
- _maxFrameSize = 64 * 1024;
- if (size < 0 || size > (_maxFrameSize - 12))
- {
- error("bad frame size: %d", size);
- return ERROR;
- }
- byte b = input.get(pos + 5);
- if ((b & 0xF0) != 0) {
- error("non-zero reserved bits in upper nibble of " +
- "frame header byte 5: '%x'", b);
- return ERROR;
- } else {
- track = (byte) (b & 0xF);
- }
- channel = (0xFFFF & input.getShort(pos + 6));
- if (size == 0)
- {
- Frame frame = new Frame(flags, type, track, channel, EMPTY_BYTE_BUFFER);
- receiver.received(frame);
- needed = Frame.HEADER_SIZE;
- return FRAME_HDR;
- }
- else
- {
- needed = size;
- return FRAME_BODY;
- }
- case FRAME_BODY:
- Frame frame = new Frame(flags, type, track, channel, input.slice());
- receiver.received(frame);
- needed = Frame.HEADER_SIZE;
- return FRAME_HDR;
- default:
- throw new IllegalStateException();
+ _serverAssembler.init(new ProtocolHeader(protoClass, instance, major, minor));
+ _state = FRAME_HDR;
+ }
+ break;
+ case FRAME_HDR:
+ if(buffer.remaining() < ServerFrame.HEADER_SIZE)
+ {
+ buffer.reset();
+ }
+ else
+ {
+ flags = buffer.get();
+ type = SegmentType.get(buffer.get());
+ int size = (0xFFFF & buffer.getShort());
+
+ size -= ServerFrame.HEADER_SIZE;
+ if (size < 0 || size > (_maxFrameSize - ServerFrame.HEADER_SIZE))
+ {
+ error("bad frame size: %d", size);
+ _state = ERROR;
+ }
+ else
+ {
+ buffer.get(); // skip unused byte
+ byte b = buffer.get();
+ if ((b & 0xF0) != 0)
+ {
+ error("non-zero reserved bits in upper nibble of " +
+ "frame header byte 5: '%x'", b);
+ _state = ERROR;
+ }
+ else
+ {
+ track = (byte) (b & 0xF);
+
+ channel = (0xFFFF & buffer.getShort());
+ buffer.position(buffer.position()+4);
+ if (size == 0)
+ {
+ ServerFrame frame = new ServerFrame(flags, type, track, channel, EMPTY_BYTE_BUFFER);
+ _serverAssembler.received(frame);
+
+ }
+ else if (buffer.remaining() < size)
+ {
+ buffer.reset();
+ }
+ else
+ {
+ final QpidByteBuffer body = buffer.slice();
+ body.limit(size);
+ ServerFrame frame = new ServerFrame(flags, type, track, channel, body);
+ buffer.position(buffer.position() + size);
+
+ _serverAssembler.received(frame);
+ }
+ }
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException();
}
+
}
public void exception(Throwable t)
{
- receiver.exception(t);
+ _serverAssembler.exception(t);
}
public void closed()
{
- receiver.closed();
+ _serverAssembler.closed();
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Aug 7 00:28:17 2015
@@ -34,6 +34,7 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
@@ -500,10 +501,13 @@ public class ServerSessionDelegate exten
final MessageMetaData_0_10 messageMetaData, final MessageStore store)
{
final MessageHandle<MessageMetaData_0_10> addedMessage = store.addMessage(messageMetaData);
- ByteBuffer body = xfr.getBody();
+ Collection<QpidByteBuffer> body = xfr.getBody();
if(body != null)
{
- addedMessage.addContent(body);
+ for(QpidByteBuffer b : body)
+ {
+ addedMessage.addContent(b);
+ }
}
final StoredMessage<MessageMetaData_0_10> storedMessage = addedMessage.allContentAdded();
return storedMessage;
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Aug 7 00:28:17 2015
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
import static org.apache.qpid.transport.util.Functions.hex;
-import java.nio.ByteBuffer;
import java.security.AccessControlException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@@ -49,6 +48,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -2511,7 +2511,7 @@ public class AMQChannel
}
@Override
- public void receiveMessageContent(final ByteBuffer data)
+ public void receiveMessageContent(final QpidByteBuffer data)
{
if(_logger.isDebugEnabled())
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Fri Aug 7 00:28:17 2015
@@ -26,7 +26,6 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.AccessControlException;
import java.security.Principal;
import java.security.PrivilegedAction;
@@ -53,7 +52,9 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.ServerDecoder;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.*;
@@ -126,13 +127,13 @@ public class AMQPConnection_0_8
private ConnectionState _state = ConnectionState.INIT;
/**
- * The channels that the latest call to {@link #received(ByteBuffer)} applied to.
+ * The channels that the latest call to {@link ProtocolEngine#received(QpidByteBuffer)} applied to.
* Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
* on after handling the frames.
*/
private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>();
- private final AMQDecoder _decoder;
+ private final ServerDecoder _decoder;
private SaslServer _saslServer;
@@ -277,7 +278,7 @@ public class AMQPConnection_0_8
return new WriteDeliverMethod(channelId);
}
- public void received(final ByteBuffer msg)
+ public void received(final QpidByteBuffer msg)
{
Subject.doAs(getSubject(), new PrivilegedAction<Void>()
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Fri Aug 7 00:28:17 2015
@@ -31,6 +31,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -103,9 +104,9 @@ public class MessageConverter_Internal_t
}
@Override
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
{
- return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size));
+ return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Fri Aug 7 00:28:17 2015
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
@@ -125,7 +126,8 @@ public class ProtocolOutputConverterImpl
&& compressionSupported
&& contentHeaderBody.getProperties().getEncoding()==null
&& bodySize > _connection.getMessageCompressionThreshold()
- && (modifiedContent = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(message.getContent(0, bodySize)))) != null)
+ && (modifiedContent = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(message.getContent(0,
+ bodySize)))) != null)
{
BasicContentHeaderProperties modifiedProps =
new BasicContentHeaderProperties(contentHeaderBody.getProperties());
@@ -163,9 +165,9 @@ public class ProtocolOutputConverterImpl
}
@Override
- public Collection<ByteBuffer> getContent(final int offset, final int size)
+ public Collection<QpidByteBuffer> getContent(final int offset, final int size)
{
- return Collections.singleton(ByteBuffer.wrap(content, offset, size));
+ return Collections.singleton(QpidByteBuffer.wrap(content, offset, size));
}
@Override
@@ -247,9 +249,9 @@ public class ProtocolOutputConverterImpl
public void writePayload(DataOutput buffer) throws IOException
{
- Collection<ByteBuffer> bufs = _message.getContent(_offset, _length);
+ Collection<QpidByteBuffer> bufs = _message.getContent(_offset, _length);
- for(ByteBuffer buf : bufs)
+ for(QpidByteBuffer buf : bufs)
{
if (buf.hasArray())
{
@@ -271,9 +273,9 @@ public class ProtocolOutputConverterImpl
public long writePayload(final ByteBufferSender sender) throws IOException
{
- Collection<ByteBuffer> bufs = _message.getContent(_offset, _length);
+ Collection<QpidByteBuffer> bufs = _message.getContent(_offset, _length);
long size = 0l;
- for(ByteBuffer buf : bufs)
+ for(QpidByteBuffer buf : bufs)
{
size += buf.remaining();
sender.send(buf.duplicate());
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Fri Aug 7 00:28:17 2015
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
@@ -48,7 +49,6 @@ import org.apache.qpid.server.message.In
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
@@ -288,10 +288,16 @@ public class InternalTestProtocolSession
{
_sender = new ByteBufferSender()
{
- public void send(ByteBuffer msg)
+ private void send(ByteBuffer msg)
{
}
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+
+ }
+
public void flush()
{
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Fri Aug 7 00:28:17 2015
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
@@ -35,7 +36,7 @@ public class MockStoredMessage implement
{
private long _messageId;
private MessageMetaData _metaData;
- private final ByteBuffer _content;
+ private final QpidByteBuffer _content;
public MockStoredMessage(long messageId)
{
@@ -62,7 +63,7 @@ public class MockStoredMessage implement
( chb.getProperties()).setHeaders(headers);
}
_metaData = new MessageMetaData(info, chb);
- _content = ByteBuffer.allocate(_metaData.getContentSize());
+ _content = QpidByteBuffer.allocate(_metaData.getContentSize());
}
public MessageMetaData getMetaData()
@@ -75,7 +76,7 @@ public class MockStoredMessage implement
return _messageId;
}
- public void addContent(ByteBuffer src)
+ public void addContent(QpidByteBuffer src)
{
src = src.duplicate();
_content.put(src);
@@ -90,24 +91,17 @@ public class MockStoredMessage implement
public int getContent(int offset, ByteBuffer dst)
{
- ByteBuffer src = _content.duplicate();
- src.position(offset);
- src = src.slice();
- if(dst.remaining() < src.limit())
- {
- src.limit(dst.remaining());
- }
- dst.put(src);
- return src.limit();
+ final int length = Math.min(dst.remaining(), _content.remaining() - offset);
+ QpidByteBuffer src = _content.view(offset, length);
+ src.get(dst);
+ return length;
}
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
{
- ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(offsetInMessage, buf);
- buf.position(0);
+ QpidByteBuffer buf = _content.view(offsetInMessage,size);
return Collections.singleton(buf);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java Fri Aug 7 00:28:17 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.amqp_1_0.codec;
import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
public abstract class AbstractDescribedTypeWriter<V> implements ValueWriter<V>
{
private int _length;
@@ -45,7 +47,7 @@ public abstract class AbstractDescribedT
private State _state = State.FORMAT_CODE;
- public int writeToBuffer(ByteBuffer buffer)
+ public int writeToBuffer(QpidByteBuffer buffer)
{
final int length = _length;
@@ -110,7 +112,7 @@ public abstract class AbstractDescribedT
return _length;
}
- private void writeFirstPass(ByteBuffer buffer)
+ private void writeFirstPass(QpidByteBuffer buffer)
{
int length = 1;
@@ -185,4 +187,4 @@ public abstract class AbstractDescribedT
{
return false;
}
-}
\ No newline at end of file
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java Fri Aug 7 00:28:17 2015
@@ -20,9 +20,9 @@ package org.apache.qpid.amqp_1_0.codec;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import java.lang.reflect.Array;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -31,7 +31,7 @@ public abstract class ArrayTypeConstruct
- public Object[] construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ public Object[] construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
{
int size = read(in);
if(in.remaining() < size)
@@ -40,7 +40,7 @@ public abstract class ArrayTypeConstruct
"Insufficient data to decode array - requires %d octects, only %d remaining.",
size, in.remaining());
}
- ByteBuffer dup = in.slice();
+ QpidByteBuffer dup = in.slice();
dup.limit(size);
in.position(in.position()+size);
int count = read(dup);
@@ -69,13 +69,13 @@ public abstract class ArrayTypeConstruct
}
- abstract int read(ByteBuffer in) throws AmqpErrorException;
+ abstract int read(QpidByteBuffer in) throws AmqpErrorException;
private static final ArrayTypeConstructor ONE_BYTE_SIZE_ARRAY = new ArrayTypeConstructor()
{
- @Override int read(final ByteBuffer in) throws AmqpErrorException
+ @Override int read(final QpidByteBuffer in) throws AmqpErrorException
{
if(!in.hasRemaining())
{
@@ -89,7 +89,7 @@ public abstract class ArrayTypeConstruct
private static final ArrayTypeConstructor FOUR_BYTE_SIZE_ARRAY = new ArrayTypeConstructor()
{
- @Override int read(final ByteBuffer in) throws AmqpErrorException
+ @Override int read(final QpidByteBuffer in) throws AmqpErrorException
{
if(in.remaining()<4)
{
@@ -110,4 +110,4 @@ public abstract class ArrayTypeConstruct
return FOUR_BYTE_SIZE_ARRAY;
}
-}
\ No newline at end of file
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java Fri Aug 7 00:28:17 2015
@@ -19,7 +19,7 @@
package org.apache.qpid.amqp_1_0.codec;
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ArrayWriter implements ValueWriter<Object[]>
{
@@ -60,7 +60,7 @@ public class ArrayWriter implements Val
//registry.register(List.class, FACTORY);
}
- public int writeToBuffer(final ByteBuffer buffer)
+ public int writeToBuffer(final QpidByteBuffer buffer)
{
return 0; //TODO change body of implemented methods use File | Settings | File Templates.
}
@@ -79,4 +79,4 @@ public class ArrayWriter implements Val
{
return false; //TODO change body of implemented methods use File | Settings | File Templates.
}
-}
\ No newline at end of file
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java Fri Aug 7 00:28:17 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.amqp_1_0.codec;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import java.nio.ByteBuffer;
@@ -42,7 +43,7 @@ public class BinaryTypeConstructor exten
}
@Override
- public Object construct(final ByteBuffer in, boolean isCopy, ValueHandler handler) throws AmqpErrorException
+ public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler handler) throws AmqpErrorException
{
int size;
@@ -55,7 +56,7 @@ public class BinaryTypeConstructor exten
size = in.getInt();
}
- ByteBuffer inDup = in.slice();
+ QpidByteBuffer inDup = in.slice();
inDup.limit(inDup.position()+size);
Binary binary;
@@ -77,4 +78,4 @@ public class BinaryTypeConstructor exten
}
-}
\ No newline at end of file
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java Fri Aug 7 00:28:17 2015
@@ -22,14 +22,13 @@ package org.apache.qpid.amqp_1_0.codec;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BooleanConstructor
{
private static final TypeConstructor<Boolean> TRUE_INSTANCE = new TypeConstructor<Boolean>()
{
- public Boolean construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
{
return Boolean.TRUE;
}
@@ -37,14 +36,14 @@ public class BooleanConstructor
private static final TypeConstructor<Boolean> FALSE_INSTANCE = new TypeConstructor<Boolean>()
{
- public Boolean construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
{
return Boolean.FALSE;
}
};
private static final TypeConstructor<Boolean> BYTE_INSTANCE = new TypeConstructor<Boolean>()
{
- public Boolean construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
{
if(in.hasRemaining())
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java Fri Aug 7 00:28:17 2015
@@ -21,14 +21,14 @@
package org.apache.qpid.amqp_1_0.codec;
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BooleanWriter implements ValueWriter<Boolean>
{
private boolean _complete = true;
private boolean _value;
- public int writeToBuffer(ByteBuffer buffer)
+ public int writeToBuffer(QpidByteBuffer buffer)
{
if(!_complete & buffer.hasRemaining())
{
@@ -67,4 +67,4 @@ public class BooleanWriter implements Va
{
registry.register(Boolean.class, FACTORY);
}
-}
\ No newline at end of file
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java Fri Aug 7 00:28:17 2015
@@ -23,8 +23,7 @@ package org.apache.qpid.amqp_1_0.codec;
import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ByteTypeConstructor implements TypeConstructor
{
@@ -39,7 +38,7 @@ public class ByteTypeConstructor impleme
{
}
- public Object construct(final ByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
{
if(in.hasRemaining())
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java Fri Aug 7 00:28:17 2015
@@ -21,14 +21,14 @@
package org.apache.qpid.amqp_1_0.codec;
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ByteWriter implements ValueWriter<Byte>
{
private int _written = 2;
private byte _value;
- public int writeToBuffer(ByteBuffer buffer)
+ public int writeToBuffer(QpidByteBuffer buffer)
{
switch(_written)
@@ -87,4 +87,4 @@ public class ByteWriter implements Value
{
registry.register(Byte.class, FACTORY);
}
-}
\ No newline at end of file
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java Fri Aug 7 00:28:17 2015
@@ -22,8 +22,7 @@ package org.apache.qpid.amqp_1_0.codec;
import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.transport.*;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class CharTypeConstructor implements TypeConstructor
{
@@ -39,7 +38,7 @@ public class CharTypeConstructor impleme
{
}
- public Object construct(final ByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
{
if(in.remaining()>=4)
{
@@ -64,4 +63,4 @@ public class CharTypeConstructor impleme
}
}
-}
\ No newline at end of file
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java Fri Aug 7 00:28:17 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.amqp_1_0.codec;
import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -154,7 +155,7 @@ public class CompoundTypeConstructor ext
}
@Override
- public Object construct(final ByteBuffer in, boolean isCopy, ValueHandler delegate) throws AmqpErrorException
+ public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler delegate) throws AmqpErrorException
{
int size;
int count;
@@ -170,8 +171,7 @@ public class CompoundTypeConstructor ext
count = in.getInt();
}
- ByteBuffer data;
- ByteBuffer inDup = in.slice();
+ QpidByteBuffer inDup = in.slice();
inDup.limit(size-getSize());
@@ -189,4 +189,4 @@ public class CompoundTypeConstructor ext
}
-}
\ No newline at end of file
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java Fri Aug 7 00:28:17 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.amqp_1_0.codec;
import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
public abstract class CompoundWriter<V> implements ValueWriter<V>
{
private int _length;
@@ -51,12 +53,12 @@ public abstract class CompoundWriter<V>
private State _state = State.FORMAT_CODE;
- public int writeToBuffer(ByteBuffer buffer)
+ public int writeToBuffer(QpidByteBuffer buffer)
{
return writeToBuffer(buffer, false);
}
- public int writeToBuffer(ByteBuffer buffer, boolean large)
+ public int writeToBuffer(QpidByteBuffer buffer, boolean large)
{
final int length = _length;
@@ -209,7 +211,7 @@ public abstract class CompoundWriter<V>
return _length;
}
- private void writeFirstPass(ByteBuffer buffer, int size)
+ private void writeFirstPass(QpidByteBuffer buffer, int size)
{
State state = State.FORMAT_CODE;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java Fri Aug 7 00:28:17 2015
@@ -21,9 +21,9 @@ package org.apache.qpid.amqp_1_0.codec;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import java.math.BigDecimal;
-import java.nio.ByteBuffer;
public abstract class DecimalConstructor implements TypeConstructor<BigDecimal>
{
@@ -31,7 +31,7 @@ public abstract class DecimalConstructor
private static final DecimalConstructor DECIMAL_32 = new DecimalConstructor()
{
- public BigDecimal construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
{
@@ -54,7 +54,7 @@ public abstract class DecimalConstructor
private static final DecimalConstructor DECIMAL_64 = new DecimalConstructor()
{
- public BigDecimal construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
{
long val;
@@ -77,7 +77,7 @@ public abstract class DecimalConstructor
private static final DecimalConstructor DECIMAL_128 = new DecimalConstructor()
{
- public BigDecimal construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
{
long high;
long low;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org