You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/08/07 02:28:20 UTC
svn commit: r1694594 [5/5] - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ bdbs...
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Fri Aug 7 00:28:17 2015
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -105,7 +106,7 @@ public class ContentHeaderBody implement
@Override
public long writePayload(final ByteBufferSender sender) throws IOException
{
- ByteBuffer data = ByteBuffer.allocate(14);
+ QpidByteBuffer data = QpidByteBuffer.allocate(14);
EncodingUtils.writeUnsignedShort(data, CLASS_ID);
EncodingUtils.writeUnsignedShort(data, 0);
data.putLong(_bodySize);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Fri Aug 7 00:28:17 2015
@@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.MarkableDataInput;
public class EncodingUtils
@@ -280,7 +281,7 @@ public class EncodingUtils
}
}
- public static void writeUnsignedShort(ByteBuffer buffer, int s) throws IOException
+ public static void writeUnsignedShort(QpidByteBuffer buffer, int s) throws IOException
{
// TODO: Is this comparison safe? Do I need to cast RHS to long?
if (s < Short.MAX_VALUE)
@@ -321,7 +322,7 @@ public class EncodingUtils
}
}
- public static void writeUnsignedInteger(ByteBuffer buffer, long l) throws IOException
+ public static void writeUnsignedInteger(QpidByteBuffer buffer, long l) throws IOException
{
// TODO: Is this comparison safe? Do I need to cast RHS to long?
if (l < Integer.MAX_VALUE)
@@ -627,7 +628,7 @@ public class EncodingUtils
}
else
{
- return new FieldTable(buffer.readAsByteBuffer((int)length));
+ return new FieldTable(buffer.readAsByteBuffer((int) length));
}
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java Fri Aug 7 00:28:17 2015
@@ -41,6 +41,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.codec.MarkableDataInput;
// extends FieldTable
public class FieldTable
@@ -49,7 +51,7 @@ public class FieldTable
private static final String STRICT_AMQP_NAME = "STRICT_AMQP";
private static final boolean STRICT_AMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false"));
- private ByteBuffer _encodedForm;
+ private QpidByteBuffer _encodedForm;
private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null;
private long _encodedSize;
private static final int INITIAL_HASHMAP_CAPACITY = 16;
@@ -70,10 +72,10 @@ public class FieldTable
public FieldTable(byte[] encodedForm, int offset, int length)
{
- this(ByteBuffer.wrap(encodedForm,offset,length));
+ this(QpidByteBuffer.wrap(encodedForm,offset,length));
}
- public FieldTable(ByteBuffer buffer)
+ public FieldTable(QpidByteBuffer buffer)
{
this();
_encodedForm = buffer;
@@ -1094,13 +1096,13 @@ public class FieldTable
private void setFromBuffer() throws AMQFrameDecodingException, IOException
{
- ByteBufferDataInput dataInput = new ByteBufferDataInput(_encodedForm.duplicate());
+ MarkableDataInput dataInput = _encodedForm.slice().asDataInput();
if (_encodedSize > 0)
{
- _properties = new LinkedHashMap<AMQShortString, AMQTypedValue>(INITIAL_HASHMAP_CAPACITY);
+ _properties = new LinkedHashMap<>(INITIAL_HASHMAP_CAPACITY);
do
{
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java Fri Aug 7 00:28:17 2015
@@ -20,10 +20,11 @@
*/
package org.apache.qpid.framing;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>,
ClientMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>,
ServerMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>
@@ -604,7 +605,7 @@ public class FrameCreatingMethodProcesso
}
@Override
- public void receiveMessageContent(ByteBuffer data)
+ public void receiveMessageContent(QpidByteBuffer data)
{
_processedMethods.add(new AMQFrame(_channelId, new ContentBody(data)));
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Fri Aug 7 00:28:17 2015
@@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets
import java.util.Arrays;
import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.util.BytesDataOutput;
@@ -97,7 +98,7 @@ public class ProtocolInitiation extends
byte[] data = new byte[8];
BytesDataOutput out = new BytesDataOutput(data);
writePayload(out);
- sender.send(ByteBuffer.wrap(data));
+ sender.send(QpidByteBuffer.wrap(data));
return 8l;
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java Fri Aug 7 00:28:17 2015
@@ -22,9 +22,12 @@ package org.apache.qpid.transport;
import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
public interface ByteBufferSender
{
- void send(ByteBuffer msg);
+
+ void send(QpidByteBuffer msg);
void flush();
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageTransfer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageTransfer.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageTransfer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageTransfer.java Fri Aug 7 00:28:17 2015
@@ -21,9 +21,13 @@ package org.apache.qpid.transport;
*/
+import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.codec.Decoder;
import org.apache.qpid.transport.codec.Encoder;
@@ -66,13 +70,17 @@ public final class MessageTransfer exten
private MessageAcceptMode acceptMode;
private MessageAcquireMode acquireMode;
private Header header;
- private ByteBuffer body;
+ private Collection<QpidByteBuffer> _body;
public MessageTransfer() {}
+ public MessageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, java.nio.ByteBuffer body, Option ... options)
+ {
+ this(destination, acceptMode, acquireMode, header, Collections.singletonList(QpidByteBuffer.wrap(body)), options);
+ }
- public MessageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, java.nio.ByteBuffer body, Option ... _options) {
+ public MessageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, Collection<QpidByteBuffer> body, Option ... _options) {
if(destination != null) {
setDestination(destination);
}
@@ -194,42 +202,42 @@ public final class MessageTransfer exten
return this;
}
- public int getBodySize()
- {
- return this.body == null ? 0 : this.body.remaining();
- }
-
- public final ByteBuffer getBody() {
- if (this.body == null)
+ @Override
+ public final Collection<QpidByteBuffer> getBody() {
+ if (this._body == null)
{
return null;
}
else
{
- return this.body.slice();
+ return Collections.unmodifiableCollection(_body);
}
}
- public final void setBody(ByteBuffer body) {
- this.body = body;
+ @Override
+ public final void setBody(Collection<QpidByteBuffer> body) {
+ this._body = body;
}
- public final MessageTransfer body(ByteBuffer body)
+ public final MessageTransfer body(List<QpidByteBuffer> body)
{
setBody(body);
return this;
}
public final byte[] getBodyBytes() {
- ByteBuffer buf = getBody();
- byte[] bytes = new byte[buf.remaining()];
- buf.get(bytes);
+ Collection<QpidByteBuffer> body = getBody();
+ byte[] bytes = new byte[getBodySize()];
+ for(QpidByteBuffer buf : body)
+ {
+ buf.duplicate().get(bytes);
+ }
return bytes;
}
public final void setBody(byte[] body)
{
- setBody(ByteBuffer.wrap(body));
+ setBody(Collections.singletonList(QpidByteBuffer.wrap(body)));
}
public final String getBodyString() {
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Method.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Method.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Method.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Method.java Fri Aug 7 00:28:17 2015
@@ -20,11 +20,14 @@
*/
package org.apache.qpid.transport;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.network.Frame;
import static org.apache.qpid.transport.util.Functions.str;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
/**
* Method
@@ -125,26 +128,31 @@ public abstract class Method extends Str
throw new UnsupportedOperationException();
}
- public ByteBuffer getBody()
+ public Collection<QpidByteBuffer> getBody()
{
return null;
}
- public void setBody(ByteBuffer body)
+ public void setBody(Collection<QpidByteBuffer> body)
{
throw new UnsupportedOperationException();
}
public int getBodySize()
{
- ByteBuffer body = getBody();
+ Collection<QpidByteBuffer> body = getBody();
if (body == null)
{
return 0;
}
else
{
- return body.remaining();
+ int size = 0;
+ for(QpidByteBuffer buf : body)
+ {
+ size += buf.remaining();
+ }
+ return size;
}
}
@@ -223,7 +231,7 @@ public abstract class Method extends Str
str.append(st);
}
}
- ByteBuffer body = getBody();
+ Collection<QpidByteBuffer> body = getBody();
if (body != null)
{
str.append("\n body=");
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java Fri Aug 7 00:28:17 2015
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.transport;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.network.Frame;
import org.apache.qpid.transport.network.NetworkDelegate;
import org.apache.qpid.transport.network.NetworkEvent;
@@ -93,9 +94,9 @@ public final class ProtocolHeader implem
return false;
}
- public ByteBuffer toByteBuffer(final boolean useDirect)
+ public QpidByteBuffer toByteBuffer()
{
- ByteBuffer buf = ByteBuffer.allocate(8);
+ QpidByteBuffer buf = QpidByteBuffer.allocate(8);
buf.put(AMQP);
buf.put(protoClass);
buf.put(instance);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java Fri Aug 7 00:28:17 2015
@@ -42,7 +42,7 @@ import java.util.UUID;
* @author Rafael H. Schloming
*/
-abstract class AbstractDecoder implements Decoder
+public abstract class AbstractDecoder implements Decoder
{
private final Map<Binary,String> str8cache = new LinkedHashMap<Binary,String>()
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Fri Aug 7 00:28:17 2015
@@ -22,10 +22,12 @@ package org.apache.qpid.transport.networ
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageProperties;
@@ -241,7 +243,7 @@ public class Assembler implements Networ
break;
case BODY:
command = getIncompleteCommand(channel);
- command.setBody(segment);
+ command.setBody(Collections.singletonList(QpidByteBuffer.wrap(segment)));
setIncompleteCommand(channel, null);
emit(channel, command);
break;
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Fri Aug 7 00:28:17 2015
@@ -30,6 +30,7 @@ import static org.apache.qpid.transport.
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.Header;
@@ -43,6 +44,7 @@ import org.apache.qpid.transport.Segment
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
import org.apache.qpid.transport.codec.Encoder;
+import org.apache.qpid.util.ByteBufferUtils;
/**
* Disassembler
@@ -116,8 +118,8 @@ public final class Disassembler implemen
buf.limit(buf.position() + size);
data.rewind();
- sender.send(data);
- sender.send(buf);
+ sender.send(QpidByteBuffer.wrap(data));
+ sender.send(QpidByteBuffer.wrap(buf));
buf.limit(limit);
}
@@ -159,7 +161,7 @@ public final class Disassembler implemen
{
synchronized (sendlock)
{
- sender.send(header.toByteBuffer(false));
+ sender.send(header.toByteBuffer());
sender.flush();
}
}
@@ -235,7 +237,7 @@ public final class Disassembler implemen
fragment(flags, type, method, buf);
if (payload)
{
- ByteBuffer body = method.getBody();
+ ByteBuffer body = ByteBufferUtils.combine(method.getBody());
buf.limit(headerLimit);
buf.position(methodLimit);
fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Fri Aug 7 00:28:17 2015
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.Atomi
import javax.net.ssl.SSLSocket;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
@@ -115,7 +116,7 @@ public final class IoSender implements R
return result;
}
- public void send(ByteBuffer buf)
+ public void send(QpidByteBuffer buf)
{
checkNotAlreadyClosed();
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java Fri Aug 7 00:28:17 2015
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.Atomi
import javax.security.sasl.SaslException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.util.Logger;
@@ -69,13 +70,13 @@ public class SASLSender extends SASLEncr
delegate.flush();
}
- public void send(ByteBuffer buf)
- {
+ public void send(QpidByteBuffer buf)
+ {
if (closed.get())
{
throw new SenderException("SSL Sender is closed");
}
-
+
if (isSecurityLayerEstablished())
{
while (buf.hasRemaining())
@@ -83,28 +84,29 @@ public class SASLSender extends SASLEncr
int length = Math.min(buf.remaining(), getSendBuffSize());
log.debug("sendBuffSize %s", getSendBuffSize());
log.debug("buf.remaining() %s", buf.remaining());
-
+
buf.get(appData, 0, length);
try
{
byte[] out = getSaslClient().wrap(appData, 0, length);
log.debug("out.length %s", out.length);
-
- delegate.send(ByteBuffer.wrap(out));
- }
+
+ delegate.send(QpidByteBuffer.wrap(out));
+ }
catch (SaslException e)
{
log.error("Exception while encrypting data.",e);
throw new SenderException("SASL Sender, Error occurred while encrypting data",e);
}
- }
+ }
}
else
{
delegate.send(buf);
- }
+ }
}
+
public void securityLayerEstablished()
{
appData = new byte[getSendBuffSize()];
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java Fri Aug 7 00:28:17 2015
@@ -28,6 +28,7 @@ import javax.net.ssl.SSLEngineResult.Han
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.network.security.SSLStatus;
@@ -126,7 +127,7 @@ public class SSLSender implements ByteBu
netData.limit(limit);
netData.position(netData.position() + read);
- delegate.send(data);
+ delegate.send(QpidByteBuffer.wrap(data));
flush();
}
result = engine.wrap(ByteBuffer.allocate(0), netData);
@@ -140,7 +141,7 @@ public class SSLSender implements ByteBu
delegate.flush();
}
- public void send(ByteBuffer appData)
+ public void send(QpidByteBuffer appData)
{
if (closed.get() && !_sslStatus.getSslErrorFlag())
{
@@ -155,7 +156,7 @@ public class SSLSender implements ByteBu
int read = 0;
try
{
- SSLEngineResult result = engine.wrap(appData, netData);
+ SSLEngineResult result = engine.wrap(appData.getNativeBuffer(), netData);
read = result.bytesProduced();
status = result.getStatus();
handshakeStatus = result.getHandshakeStatus();
@@ -177,7 +178,7 @@ public class SSLSender implements ByteBu
netData.limit(limit);
netData.position(netData.position() + read);
- delegate.send(data);
+ delegate.send(QpidByteBuffer.wrap(data));
}
switch(status)
@@ -219,24 +220,24 @@ public class SSLSender implements ByteBu
switch (engine.getHandshakeStatus())
{
- case NEED_UNWRAP:
- final long start = System.currentTimeMillis();
- try
- {
- _sslStatus.getSslLock().wait(timeout);
- }
- catch(InterruptedException e)
- {
- // pass
- }
-
- if (!_sslStatus.getSslErrorFlag() && System.currentTimeMillis() - start >= timeout)
- {
- throw new SenderException(
- "SSL Engine timed out after waiting " + timeout + "ms. for a response." +
- "To get more info,run with -Djavax.net.debug=ssl");
- }
- break;
+ case NEED_UNWRAP:
+ final long start = System.currentTimeMillis();
+ try
+ {
+ _sslStatus.getSslLock().wait(timeout);
+ }
+ catch(InterruptedException e)
+ {
+ // pass
+ }
+
+ if (!_sslStatus.getSslErrorFlag() && System.currentTimeMillis() - start >= timeout)
+ {
+ throw new SenderException(
+ "SSL Engine timed out after waiting " + timeout + "ms. for a response." +
+ "To get more info,run with -Djavax.net.debug=ssl");
+ }
+ break;
}
}
break;
@@ -246,7 +247,7 @@ public class SSLSender implements ByteBu
{
SSLUtil.verifyHostname(engine, _hostname);
}
-
+
case NOT_HANDSHAKING:
break; //do nothing
@@ -257,6 +258,8 @@ public class SSLSender implements ByteBu
}
}
+
+
private void doTasks()
{
Runnable runnable;
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java Fri Aug 7 00:28:17 2015
@@ -23,6 +23,11 @@ package org.apache.qpid.transport.util;
import static java.lang.Math.min;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
/**
@@ -63,11 +68,37 @@ public final class Functions
public static final String str(ByteBuffer buf, int limit)
{
- return str(buf, limit,buf.position());
+ return str(buf, limit,buf.position());
}
-
+
public static final String str(ByteBuffer buf, int limit,int start)
{
+ return str(QpidByteBuffer.wrap(buf), start,limit);
+ }
+
+ public static final String str(QpidByteBuffer buf)
+ {
+ return str(buf, buf.remaining());
+ }
+
+ public static final String str(QpidByteBuffer buf, int limit)
+ {
+ return str(buf, limit, buf.position());
+ }
+
+ public static final String str(Collection<QpidByteBuffer> buf, int limit, int start)
+ {
+ return str(ByteBufferUtils.combine(buf),limit,start);
+ }
+
+
+ public static final String str(Collection<QpidByteBuffer> buf, int limit)
+ {
+ return str(buf, limit, 0);
+ }
+
+ public static final String str(QpidByteBuffer buf, int limit, int start)
+ {
StringBuilder str = new StringBuilder();
str.append('"');
@@ -109,7 +140,7 @@ public final class Functions
{
return hex(bytes, limit, "");
}
- public static String hex(ByteBuffer bytes, int limit)
+ public static String hex(QpidByteBuffer bytes, int limit)
{
return hex(bytes, limit, "");
}
@@ -134,7 +165,7 @@ public final class Functions
return sb.toString();
}
- public static String hex(ByteBuffer bytes, int limit, CharSequence separator)
+ public static String hex(QpidByteBuffer bytes, int limit, CharSequence separator)
{
limit = Math.min(limit, bytes == null ? 0 : bytes.remaining());
StringBuilder sb = new StringBuilder(3 + limit*2);
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java Fri Aug 7 00:28:17 2015
@@ -23,37 +23,49 @@ package org.apache.qpid.util;
import java.nio.ByteBuffer;
import java.util.Collection;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
public class ByteBufferUtils
{
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
- public static ByteBuffer combine(Collection<ByteBuffer> bufs)
+ public static ByteBuffer combine(Collection<QpidByteBuffer> bufs)
{
if(bufs == null || bufs.isEmpty())
{
return EMPTY_BYTE_BUFFER;
}
- else if(bufs.size() == 1)
- {
- return bufs.iterator().next();
- }
else
{
int size = 0;
boolean isDirect = false;
- for(ByteBuffer buf : bufs)
+ for(QpidByteBuffer buf : bufs)
{
size += buf.remaining();
isDirect = isDirect || buf.isDirect();
}
ByteBuffer combined = isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
- for(ByteBuffer buf : bufs)
+ for(QpidByteBuffer buf : bufs)
{
- combined.put(buf.duplicate());
+ buf.duplicate().get(combined);
}
combined.flip();
return combined;
}
}
+
+ public static int remaining(Collection<QpidByteBuffer> bufs)
+ {
+ int size = 0;
+ if (bufs != null && !bufs.isEmpty())
+ {
+ for (QpidByteBuffer buf : bufs)
+ {
+ size += buf.remaining();
+ }
+
+ }
+ return size;
+ }
}
Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Fri Aug 7 00:28:17 2015
@@ -44,7 +44,7 @@ import org.apache.qpid.util.BytesDataOut
public class AMQDecoderTest extends QpidTestCase
{
- private AMQDecoder _decoder;
+ private ClientDecoder _decoder;
private FrameCreatingMethodProcessor _methodProcessor;
@@ -97,7 +97,7 @@ public class AMQDecoderTest extends Qpid
{
assertEquals(ContentBody.TYPE, ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType());
ContentBody decodedBody = (ContentBody) ((AMQFrame) frames.get(0)).getBodyFrame();
- final ByteBuffer byteBuffer = decodedBody.getPayload().duplicate();
+ final ByteBuffer byteBuffer = decodedBody.getPayload().getNativeBuffer().duplicate();
byte[] bodyBytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bodyBytes);
assertTrue("Body was corrupted", Arrays.equals(payload, bodyBytes));
Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java Fri Aug 7 00:28:17 2015
@@ -31,6 +31,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.test.utils.QpidTestCase;
import org.junit.Assert;
@@ -601,7 +602,7 @@ public class FieldTableTest extends Qpid
ByteArrayOutputStream baos = new ByteArrayOutputStream((int) table.getEncodedSize() + 4);
table.writeToBuffer(new DataOutputStream(baos));
- ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
+ QpidByteBuffer buf = QpidByteBuffer.wrap(baos.toByteArray());
long length = buf.getInt() & 0xFFFFFFFFL;
buf = buf.slice();
@@ -920,7 +921,7 @@ public class FieldTableTest extends Qpid
assertEquals("unexpected data length", 24, length);
//Create a second FieldTable from the encoded bytes
- FieldTable tableFromBytes = new FieldTable(ByteBuffer.wrap(data));
+ FieldTable tableFromBytes = new FieldTable(QpidByteBuffer.wrap(data));
//Create a final FieldTable and addAll() from the table created with encoded bytes
FieldTable destinationTable = new FieldTable();
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java Fri Aug 7 00:28:17 2015
@@ -62,6 +62,8 @@ public class MultipleAuthenticationManag
sslPortAttributes.put(Port.KEY_STORE, TestBrokerConfiguration.ENTRY_NAME_SSL_KEYSTORE);
sslPortAttributes.put(Port.TRUST_STORES, Collections.singleton(TestBrokerConfiguration.ENTRY_NAME_SSL_TRUSTSTORE));
sslPortAttributes.put(Port.AUTHENTICATION_PROVIDER, TestBrokerConfiguration.ENTRY_NAME_ANONYMOUS_PROVIDER);
+ sslPortAttributes.put(Port.PROTOCOLS, System.getProperty(TEST_AMQP_PORT_PROTOCOLS_PROPERTY));
+
config.addObjectConfiguration(Port.class, sslPortAttributes);
Map<String, Object> aliasAttributes = new HashMap<>();
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java Fri Aug 7 00:28:17 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.test.unit.basic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -133,7 +134,7 @@ public class FieldTableMessageTest exten
final long bodyLength = bytesMessage.getBodyLength();
byte[] data = new byte[(int) bodyLength];
bytesMessage.readBytes(data);
- FieldTable actual = new FieldTable(ByteBuffer.wrap(data));
+ FieldTable actual = new FieldTable(QpidByteBuffer.wrap(data));
for (String key : _expected.keys())
{
assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key));
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Fri Aug 7 00:28:17 2015
@@ -264,7 +264,7 @@ public class MaxFrameSizeTest extends Qp
}
final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
- AMQDecoder decoder = new ClientDecoder(methodProcessor);
+ ClientDecoder decoder = new ClientDecoder(methodProcessor);
byte[] buffer = new byte[1024];
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java Fri Aug 7 00:28:17 2015
@@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.ByteArrayDataInput;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.ProtocolInitiation;
@@ -197,8 +198,34 @@ public class ProtocolNegotiationTest ext
ConnectionHeartbeat heartbeat = new ConnectionHeartbeat();
ServerDisassembler serverDisassembler = new ServerDisassembler(new ByteBufferSender()
{
+ private void send(final ByteBuffer msg)
+ {
+ try
+ {
+ if(msg.hasArray())
+ {
+ dataOutputStream.write(msg.array(), msg.arrayOffset() + msg.position(), msg.remaining());
+ }
+ else
+ {
+ byte[] data = new byte[msg.remaining()];
+ msg.duplicate().get(data);
+ dataOutputStream.write(data, 0, data.length);
+ }
+ }
+ catch (SocketException se)
+ {
+
+ success.set(false);
+ }
+ catch(IOException e)
+ {
+ throw new RuntimeException("Unexpected IOException", e);
+ }
+ }
+
@Override
- public void send(final ByteBuffer msg)
+ public void send(final QpidByteBuffer msg)
{
try
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org