You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/10/28 19:35:59 UTC
qpid-proton git commit: PROTON-1337 Add alternate send and recv
methods that use generic buffers
Repository: qpid-proton
Updated Branches:
refs/heads/master 507c93566 -> acf1f6eb1
PROTON-1337 Add alternate send and recv methods that use generic buffers
Add support for clients that use other buffering types to interact with
the Sender and Receiver using a ReadableBuffer and WritableBuffer
abstraction.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/acf1f6eb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/acf1f6eb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/acf1f6eb
Branch: refs/heads/master
Commit: acf1f6eb182a62ce047270c0c6483bcd0b2eb083
Parents: 507c935
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 28 15:35:45 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Oct 28 15:35:45 2016 -0400
----------------------------------------------------------------------
.../qpid/proton/codec/WritableBuffer.java | 21 +++++-
.../org/apache/qpid/proton/engine/Receiver.java | 10 +++
.../org/apache/qpid/proton/engine/Sender.java | 15 ++++
.../qpid/proton/engine/impl/DeliveryImpl.java | 78 ++++++++++++++++++--
.../qpid/proton/engine/impl/ReceiverImpl.java | 23 +++++-
.../qpid/proton/engine/impl/SenderImpl.java | 27 ++++++-
6 files changed, 162 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
index 4bcf793..79676b3 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
@@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
public interface WritableBuffer
{
- public void put(byte b);
+ void put(byte b);
void putFloat(float f);
@@ -60,66 +60,79 @@ public interface WritableBuffer
_buf = buf;
}
+ @Override
public void put(byte b)
{
_buf.put(b);
}
+ @Override
public void putFloat(float f)
{
_buf.putFloat(f);
}
+ @Override
public void putDouble(double d)
{
_buf.putDouble(d);
}
+ @Override
public void put(byte[] src, int offset, int length)
{
_buf.put(src, offset, length);
}
+ @Override
public void putShort(short s)
{
_buf.putShort(s);
}
+ @Override
public void putInt(int i)
{
_buf.putInt(i);
}
+ @Override
public void putLong(long l)
{
_buf.putLong(l);
}
+ @Override
public boolean hasRemaining()
{
return _buf.hasRemaining();
}
+ @Override
public int remaining()
{
return _buf.remaining();
}
+ @Override
public int position()
{
return _buf.position();
}
+ @Override
public void position(int position)
{
_buf.position(position);
}
+ @Override
public void put(ByteBuffer src)
{
_buf.put(src);
}
+ @Override
public int limit()
{
return _buf.limit();
@@ -130,5 +143,11 @@ public interface WritableBuffer
{
return String.format("[pos: %d, limit: %d, remaining:%d]", _buf.position(), _buf.limit(), _buf.remaining());
}
+
+ public static ByteBufferWrapper allocate(int size)
+ {
+ ByteBuffer allocated = ByteBuffer.allocate(size);
+ return new ByteBufferWrapper(allocated);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
index eec64da..f9d718f 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.proton.engine;
+import org.apache.qpid.proton.codec.WritableBuffer;
/**
* Receiver
@@ -59,6 +60,15 @@ public interface Receiver extends Link
*/
public int recv(byte[] bytes, int offset, int size);
+ /**
+ * Receive message data for the current delivery.
+ *
+ * @param buffer the buffer to write the message data.
+ *
+ * @return number of bytes written. -1 if there are no more bytes for the current delivery.
+ */
+ public int recv(WritableBuffer buffer);
+
public void drain(int credit);
/**
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
index b4a61c6..159d5c3 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.proton.engine;
+import org.apache.qpid.proton.codec.ReadableBuffer;
/**
* Sender
@@ -40,6 +41,10 @@ public interface Sender extends Link
* Sends some data for the current delivery. The application may call this method multiple
* times for the same delivery.
*
+ * @param bytes the byte array containing the data to be sent.
+ * @param offset the offset into the given array to start reading.
+ * @param length the number of bytes to read from the given byte array.
+ *
* @return the number of bytes accepted
*
* TODO Proton-j current copies all the bytes it has been given so the return value will always be
@@ -49,6 +54,16 @@ public interface Sender extends Link
public int send(byte[] bytes, int offset, int length);
/**
+ * Sends some data for the current delivery. The application may call this method multiple
+ * times for the same delivery.
+ *
+ * @param buffer the buffer to read the data from.
+ *
+ * @return the number of bytes read from the provided buffer.
+ */
+ public int send(ReadableBuffer buffer);
+
+ /**
* Abort the current delivery.
*
* Note "pn_link_abort" is commented out in the .h
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 20fe4fe..0bdb163 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -22,10 +22,12 @@ package org.apache.qpid.proton.engine.impl;
import java.util.Arrays;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
public class DeliveryImpl implements Delivery
{
@@ -80,26 +82,31 @@ public class DeliveryImpl implements Delivery
}
}
+ @Override
public byte[] getTag()
{
return _tag;
}
+ @Override
public LinkImpl getLink()
{
return _link;
}
+ @Override
public DeliveryState getLocalState()
{
return _deliveryState;
}
+ @Override
public DeliveryState getRemoteState()
{
return _remoteDeliveryState;
}
+ @Override
public boolean remotelySettled()
{
return _remoteSettled;
@@ -117,6 +124,7 @@ public class DeliveryImpl implements Delivery
return _messageFormat;
}
+ @Override
public void disposition(final DeliveryState state)
{
_deliveryState = state;
@@ -126,6 +134,7 @@ public class DeliveryImpl implements Delivery
}
}
+ @Override
public void settle()
{
if (_settled) {
@@ -164,11 +173,13 @@ public class DeliveryImpl implements Delivery
return _linkNext;
}
+ @Override
public DeliveryImpl next()
{
return getLinkNext();
}
+ @Override
public void free()
{
settle();
@@ -179,6 +190,7 @@ public class DeliveryImpl implements Delivery
return _linkPrevious;
}
+ @Override
public DeliveryImpl getWorkNext()
{
if (_workNext != null)
@@ -205,11 +217,10 @@ public class DeliveryImpl implements Delivery
_workPrev = workPrev;
}
- int recv(byte[] bytes, int offset, int size)
+ int recv(final byte[] bytes, int offset, int size)
{
-
final int consumed;
- if(_data != null)
+ if (_data != null)
{
//TODO - should only be if no bytes left
consumed = Math.min(size, _dataSize);
@@ -220,11 +231,30 @@ public class DeliveryImpl implements Delivery
}
else
{
- _dataSize = consumed = 0;
+ _dataSize = consumed = 0;
}
+
return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed; //TODO - Implement
}
+ int recv(final WritableBuffer buffer) {
+ final int consumed;
+ if (_data != null)
+ {
+ consumed = Math.min(buffer.remaining(), _dataSize);
+
+ buffer.put(_data, _offset, consumed);
+ _offset += consumed;
+ _dataSize -= consumed;
+ }
+ else
+ {
+ _dataSize = consumed = 0;
+ }
+
+ return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed;
+ }
+
void updateWork()
{
getLink().getConnectionImpl().workUpdate(this);
@@ -274,6 +304,7 @@ public class DeliveryImpl implements Delivery
_transportDelivery = transportDelivery;
}
+ @Override
public boolean isSettled()
{
return _settled;
@@ -289,15 +320,36 @@ public class DeliveryImpl implements Delivery
{
byte[] oldData = _data;
_data = new byte[oldData.length + _dataSize];
- System.arraycopy(oldData,_offset,_data,0,_dataSize);
+ System.arraycopy(oldData, _offset, _data, 0, _dataSize);
_offset = 0;
}
- System.arraycopy(bytes,offset,_data,_dataSize+_offset,length);
- _dataSize+=length;
+ System.arraycopy(bytes, offset, _data, _dataSize + _offset, length);
+ _dataSize += length;
addToTransportWorkList();
return length; //TODO - Implement.
}
+ int send(final ReadableBuffer buffer)
+ {
+ int length = buffer.remaining();
+
+ if(_data == null)
+ {
+ _data = new byte[length];
+ }
+ else if(_data.length - _dataSize < length)
+ {
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + _dataSize];
+ System.arraycopy(oldData, _offset, _data, 0, _dataSize);
+ _offset = 0;
+ }
+ buffer.get(_data, _offset, length);
+ _dataSize+=length;
+ addToTransportWorkList();
+ return length;
+ }
+
byte[] getData()
{
return _data;
@@ -328,6 +380,7 @@ public class DeliveryImpl implements Delivery
_offset = arrayOffset;
}
+ @Override
public boolean isWritable()
{
return getLink() instanceof SenderImpl
@@ -335,6 +388,7 @@ public class DeliveryImpl implements Delivery
&& ((SenderImpl) getLink()).hasCredit();
}
+ @Override
public boolean isReadable()
{
return getLink() instanceof ReceiverImpl
@@ -346,6 +400,7 @@ public class DeliveryImpl implements Delivery
_complete = true;
}
+ @Override
public boolean isPartial()
{
return !_complete;
@@ -357,11 +412,13 @@ public class DeliveryImpl implements Delivery
_updated = true;
}
+ @Override
public boolean isUpdated()
{
return _updated;
}
+ @Override
public void clear()
{
_updated = false;
@@ -385,6 +442,7 @@ public class DeliveryImpl implements Delivery
_updated = true;
}
+ @Override
public boolean isBuffered()
{
if (_remoteSettled) return false;
@@ -399,16 +457,19 @@ public class DeliveryImpl implements Delivery
}
}
+ @Override
public Object getContext()
{
return _context;
}
+ @Override
public void setContext(Object context)
{
_context = context;
}
+ @Override
public Record attachments()
{
if(_attachments == null)
@@ -440,6 +501,7 @@ public class DeliveryImpl implements Delivery
return builder.toString();
}
+ @Override
public int pending()
{
return _dataSize;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
index a3a01f7..6f86700 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
@@ -21,6 +21,7 @@
package org.apache.qpid.proton.engine.impl;
import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Receiver;
public class ReceiverImpl extends LinkImpl implements Receiver
@@ -58,6 +59,7 @@ public class ReceiverImpl extends LinkImpl implements Receiver
super(session, name);
}
+ @Override
public void flow(final int credits)
{
addCredit(credits);
@@ -77,7 +79,7 @@ public class ReceiverImpl extends LinkImpl implements Receiver
return credits;
}
-
+ @Override
public int recv(final byte[] bytes, int offset, int size)
{
if (_current == null) {
@@ -95,6 +97,23 @@ public class ReceiverImpl extends LinkImpl implements Receiver
}
@Override
+ public int recv(final WritableBuffer buffer)
+ {
+ if (_current == null) {
+ throw new IllegalStateException("no current delivery");
+ }
+
+ int consumed = _current.recv(buffer);
+ if (consumed > 0) {
+ getSession().incrementIncomingBytes(-consumed);
+ if (getSession().getTransportSession().getIncomingWindowSize().equals(UnsignedInteger.ZERO)) {
+ modified();
+ }
+ }
+ return consumed;
+ }
+
+ @Override
void doFree()
{
getSession().freeReceiver(this);
@@ -117,6 +136,7 @@ public class ReceiverImpl extends LinkImpl implements Receiver
return _transportReceiver;
}
+ @Override
public void drain(int credit)
{
setDrain(true);
@@ -124,6 +144,7 @@ public class ReceiverImpl extends LinkImpl implements Receiver
_drainFlagMode = false;
}
+ @Override
public boolean draining()
{
return getDrain() && (getCredit() > getQueued());
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
index 7cf605f..f418655 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.proton.engine.impl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
@@ -33,19 +34,21 @@ public class SenderImpl extends LinkImpl implements Sender
super(session, name);
}
+ @Override
public void offer(final int credits)
{
_offered = credits;
}
+ @Override
public int send(final byte[] bytes, int offset, int length)
{
- if( getLocalState() == EndpointState.CLOSED )
+ if (getLocalState() == EndpointState.CLOSED)
{
throw new IllegalStateException("send not allowed after the sender is closed.");
}
DeliveryImpl current = current();
- if(current == null || current.getLink() != this)
+ if (current == null || current.getLink() != this)
{
throw new IllegalArgumentException();//TODO.
}
@@ -56,6 +59,26 @@ public class SenderImpl extends LinkImpl implements Sender
return sent;
}
+ @Override
+ public int send(final ReadableBuffer buffer)
+ {
+ if (getLocalState() == EndpointState.CLOSED)
+ {
+ throw new IllegalStateException("send not allowed after the sender is closed.");
+ }
+ DeliveryImpl current = current();
+ if (current == null || current.getLink() != this)
+ {
+ throw new IllegalArgumentException();
+ }
+ int sent = current.send(buffer);
+ if (sent > 0) {
+ getSession().incrementOutgoingBytes(sent);
+ }
+ return sent;
+ }
+
+ @Override
public void abort()
{
//TODO.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org