You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/10/28 19:35:59 UTC

qpid-proton git commit: PROTON-1337 Add alternate send and recv methods that use generic buffers

Repository: qpid-proton
Updated Branches:
  refs/heads/master 507c93566 -> acf1f6eb1


PROTON-1337 Add alternate send and recv methods that use generic buffers

Add support for clients that use other buffering types to interact with
the Sender and Receiver using a ReadableBuffer and WritableBuffer
abstraction.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/acf1f6eb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/acf1f6eb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/acf1f6eb

Branch: refs/heads/master
Commit: acf1f6eb182a62ce047270c0c6483bcd0b2eb083
Parents: 507c935
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 28 15:35:45 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Oct 28 15:35:45 2016 -0400

----------------------------------------------------------------------
 .../qpid/proton/codec/WritableBuffer.java       | 21 +++++-
 .../org/apache/qpid/proton/engine/Receiver.java | 10 +++
 .../org/apache/qpid/proton/engine/Sender.java   | 15 ++++
 .../qpid/proton/engine/impl/DeliveryImpl.java   | 78 ++++++++++++++++++--
 .../qpid/proton/engine/impl/ReceiverImpl.java   | 23 +++++-
 .../qpid/proton/engine/impl/SenderImpl.java     | 27 ++++++-
 6 files changed, 162 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
index 4bcf793..79676b3 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/WritableBuffer.java
@@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
 
 public interface WritableBuffer
 {
-    public void put(byte b);
+    void put(byte b);
 
     void putFloat(float f);
 
@@ -60,66 +60,79 @@ public interface WritableBuffer
             _buf = buf;
         }
 
+        @Override
         public void put(byte b)
         {
             _buf.put(b);
         }
 
+        @Override
         public void putFloat(float f)
         {
             _buf.putFloat(f);
         }
 
+        @Override
         public void putDouble(double d)
         {
             _buf.putDouble(d);
         }
 
+        @Override
         public void put(byte[] src, int offset, int length)
         {
             _buf.put(src, offset, length);
         }
 
+        @Override
         public void putShort(short s)
         {
             _buf.putShort(s);
         }
 
+        @Override
         public void putInt(int i)
         {
             _buf.putInt(i);
         }
 
+        @Override
         public void putLong(long l)
         {
             _buf.putLong(l);
         }
 
+        @Override
         public boolean hasRemaining()
         {
             return _buf.hasRemaining();
         }
 
+        @Override
         public int remaining()
         {
             return _buf.remaining();
         }
 
+        @Override
         public int position()
         {
             return _buf.position();
         }
 
+        @Override
         public void position(int position)
         {
             _buf.position(position);
         }
 
+        @Override
         public void put(ByteBuffer src)
         {
             _buf.put(src);
         }
 
+        @Override
         public int limit()
         {
             return _buf.limit();
@@ -130,5 +143,11 @@ public interface WritableBuffer
         {
             return String.format("[pos: %d, limit: %d, remaining:%d]", _buf.position(), _buf.limit(), _buf.remaining());
         }
+
+        public static ByteBufferWrapper allocate(int size)
+        {
+            ByteBuffer allocated = ByteBuffer.allocate(size);
+            return new ByteBufferWrapper(allocated);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
index eec64da..f9d718f 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Receiver.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.proton.engine;
 
+import org.apache.qpid.proton.codec.WritableBuffer;
 
 /**
  * Receiver
@@ -59,6 +60,15 @@ public interface Receiver extends Link
      */
     public int recv(byte[] bytes, int offset, int size);
 
+    /**
+     * Receive message data for the current delivery.
+     *
+     * @param buffer the buffer to write the message data.
+     *
+     * @return number of bytes written. -1 if there are no more bytes for the current delivery.
+     */
+    public int recv(WritableBuffer buffer);
+
     public void drain(int credit);
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
index b4a61c6..159d5c3 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sender.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.proton.engine;
 
+import org.apache.qpid.proton.codec.ReadableBuffer;
 
 /**
  * Sender
@@ -40,6 +41,10 @@ public interface Sender extends Link
      * Sends some data for the current delivery.  The application may call this method multiple
      * times for the same delivery.
      *
+     * @param bytes the byte array containing the data to be sent.
+     * @param offset the offset into the given array to start reading.
+     * @param length the number of bytes to read from the given byte array.
+     *
      * @return the number of bytes accepted
      *
      * TODO Proton-j current copies all the bytes it has been given so the return value will always be
@@ -49,6 +54,16 @@ public interface Sender extends Link
     public int send(byte[] bytes, int offset, int length);
 
     /**
+     * Sends some data for the current delivery. The application may call this method multiple
+     * times for the same delivery.
+     *
+     * @param buffer the buffer to read the data from.
+     *
+     * @return the number of bytes read from the provided buffer.
+     */
+    public int send(ReadableBuffer buffer);
+
+    /**
      * Abort the current delivery.
      *
      * Note "pn_link_abort" is commented out in the .h

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 20fe4fe..0bdb163 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -22,10 +22,12 @@ package org.apache.qpid.proton.engine.impl;
 
 import java.util.Arrays;
 
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Record;
 import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
 
 public class DeliveryImpl implements Delivery
 {
@@ -80,26 +82,31 @@ public class DeliveryImpl implements Delivery
         }
     }
 
+    @Override
     public byte[] getTag()
     {
         return _tag;
     }
 
+    @Override
     public LinkImpl getLink()
     {
         return _link;
     }
 
+    @Override
     public DeliveryState getLocalState()
     {
         return _deliveryState;
     }
 
+    @Override
     public DeliveryState getRemoteState()
     {
         return _remoteDeliveryState;
     }
 
+    @Override
     public boolean remotelySettled()
     {
         return _remoteSettled;
@@ -117,6 +124,7 @@ public class DeliveryImpl implements Delivery
         return _messageFormat;
     }
 
+    @Override
     public void disposition(final DeliveryState state)
     {
         _deliveryState = state;
@@ -126,6 +134,7 @@ public class DeliveryImpl implements Delivery
         }
     }
 
+    @Override
     public void settle()
     {
         if (_settled) {
@@ -164,11 +173,13 @@ public class DeliveryImpl implements Delivery
         return _linkNext;
     }
 
+    @Override
     public DeliveryImpl next()
     {
         return getLinkNext();
     }
 
+    @Override
     public void free()
     {
         settle();
@@ -179,6 +190,7 @@ public class DeliveryImpl implements Delivery
         return _linkPrevious;
     }
 
+    @Override
     public DeliveryImpl getWorkNext()
     {
         if (_workNext != null)
@@ -205,11 +217,10 @@ public class DeliveryImpl implements Delivery
         _workPrev = workPrev;
     }
 
-    int recv(byte[] bytes, int offset, int size)
+    int recv(final byte[] bytes, int offset, int size)
     {
-
         final int consumed;
-        if(_data != null)
+        if (_data != null)
         {
             //TODO - should only be if no bytes left
             consumed = Math.min(size, _dataSize);
@@ -220,11 +231,30 @@ public class DeliveryImpl implements Delivery
         }
         else
         {
-            _dataSize =  consumed = 0;
+            _dataSize = consumed = 0;
         }
+
         return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed;  //TODO - Implement
     }
 
+    int recv(final WritableBuffer buffer) {
+        final int consumed;
+        if (_data != null)
+        {
+            consumed = Math.min(buffer.remaining(), _dataSize);
+
+            buffer.put(_data, _offset, consumed);
+            _offset += consumed;
+            _dataSize -= consumed;
+        }
+        else
+        {
+            _dataSize = consumed = 0;
+        }
+
+        return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed;
+    }
+
     void updateWork()
     {
         getLink().getConnectionImpl().workUpdate(this);
@@ -274,6 +304,7 @@ public class DeliveryImpl implements Delivery
         _transportDelivery = transportDelivery;
     }
 
+    @Override
     public boolean isSettled()
     {
         return _settled;
@@ -289,15 +320,36 @@ public class DeliveryImpl implements Delivery
         {
             byte[] oldData = _data;
             _data = new byte[oldData.length + _dataSize];
-            System.arraycopy(oldData,_offset,_data,0,_dataSize);
+            System.arraycopy(oldData, _offset, _data, 0, _dataSize);
             _offset = 0;
         }
-        System.arraycopy(bytes,offset,_data,_dataSize+_offset,length);
-        _dataSize+=length;
+        System.arraycopy(bytes, offset, _data, _dataSize + _offset, length);
+        _dataSize += length;
         addToTransportWorkList();
         return length;  //TODO - Implement.
     }
 
+    int send(final ReadableBuffer buffer)
+    {
+        int length = buffer.remaining();
+
+        if(_data == null)
+        {
+            _data = new byte[length];
+        }
+        else if(_data.length - _dataSize < length)
+        {
+            byte[] oldData = _data;
+            _data = new byte[oldData.length + _dataSize];
+            System.arraycopy(oldData, _offset, _data, 0, _dataSize);
+            _offset = 0;
+        }
+        buffer.get(_data, _offset, length);
+        _dataSize+=length;
+        addToTransportWorkList();
+        return length;
+    }
+
     byte[] getData()
     {
         return _data;
@@ -328,6 +380,7 @@ public class DeliveryImpl implements Delivery
         _offset = arrayOffset;
     }
 
+    @Override
     public boolean isWritable()
     {
         return getLink() instanceof SenderImpl
@@ -335,6 +388,7 @@ public class DeliveryImpl implements Delivery
                 && ((SenderImpl) getLink()).hasCredit();
     }
 
+    @Override
     public boolean isReadable()
     {
         return getLink() instanceof ReceiverImpl
@@ -346,6 +400,7 @@ public class DeliveryImpl implements Delivery
         _complete = true;
     }
 
+    @Override
     public boolean isPartial()
     {
         return !_complete;
@@ -357,11 +412,13 @@ public class DeliveryImpl implements Delivery
         _updated = true;
     }
 
+    @Override
     public boolean isUpdated()
     {
         return _updated;
     }
 
+    @Override
     public void clear()
     {
         _updated = false;
@@ -385,6 +442,7 @@ public class DeliveryImpl implements Delivery
         _updated = true;
     }
 
+    @Override
     public boolean isBuffered()
     {
         if (_remoteSettled) return false;
@@ -399,16 +457,19 @@ public class DeliveryImpl implements Delivery
         }
     }
 
+    @Override
     public Object getContext()
     {
         return _context;
     }
 
+    @Override
     public void setContext(Object context)
     {
         _context = context;
     }
 
+    @Override
     public Record attachments()
     {
         if(_attachments == null)
@@ -440,6 +501,7 @@ public class DeliveryImpl implements Delivery
         return builder.toString();
     }
 
+    @Override
     public int pending()
     {
         return _dataSize;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
index a3a01f7..6f86700 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.proton.engine.impl;
 
 import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Receiver;
 
 public class ReceiverImpl extends LinkImpl implements Receiver
@@ -58,6 +59,7 @@ public class ReceiverImpl extends LinkImpl implements Receiver
         super(session, name);
     }
 
+    @Override
     public void flow(final int credits)
     {
         addCredit(credits);
@@ -77,7 +79,7 @@ public class ReceiverImpl extends LinkImpl implements Receiver
         return credits;
     }
 
-
+    @Override
     public int recv(final byte[] bytes, int offset, int size)
     {
         if (_current == null) {
@@ -95,6 +97,23 @@ public class ReceiverImpl extends LinkImpl implements Receiver
     }
 
     @Override
+    public int recv(final WritableBuffer buffer)
+    {
+        if (_current == null) {
+            throw new IllegalStateException("no current delivery");
+        }
+
+        int consumed = _current.recv(buffer);
+        if (consumed > 0) {
+            getSession().incrementIncomingBytes(-consumed);
+            if (getSession().getTransportSession().getIncomingWindowSize().equals(UnsignedInteger.ZERO)) {
+                modified();
+            }
+        }
+        return consumed;
+    }
+
+    @Override
     void doFree()
     {
         getSession().freeReceiver(this);
@@ -117,6 +136,7 @@ public class ReceiverImpl extends LinkImpl implements Receiver
         return _transportReceiver;
     }
 
+    @Override
     public void drain(int credit)
     {
         setDrain(true);
@@ -124,6 +144,7 @@ public class ReceiverImpl extends LinkImpl implements Receiver
         _drainFlagMode = false;
     }
 
+    @Override
     public boolean draining()
     {
         return getDrain() && (getCredit() > getQueued());

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/acf1f6eb/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
index 7cf605f..f418655 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.proton.engine.impl;
 
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Sender;
 
@@ -33,19 +34,21 @@ public class SenderImpl  extends LinkImpl implements Sender
         super(session, name);
     }
 
+    @Override
     public void offer(final int credits)
     {
         _offered = credits;
     }
 
+    @Override
     public int send(final byte[] bytes, int offset, int length)
     {
-        if( getLocalState() == EndpointState.CLOSED ) 
+        if (getLocalState() == EndpointState.CLOSED)
         {
             throw new IllegalStateException("send not allowed after the sender is closed.");
         }
         DeliveryImpl current = current();
-        if(current == null || current.getLink() != this)
+        if (current == null || current.getLink() != this)
         {
             throw new IllegalArgumentException();//TODO.
         }
@@ -56,6 +59,26 @@ public class SenderImpl  extends LinkImpl implements Sender
         return sent;
     }
 
+    @Override
+    public int send(final ReadableBuffer buffer)
+    {
+        if (getLocalState() == EndpointState.CLOSED)
+        {
+            throw new IllegalStateException("send not allowed after the sender is closed.");
+        }
+        DeliveryImpl current = current();
+        if (current == null || current.getLink() != this)
+        {
+            throw new IllegalArgumentException();
+        }
+        int sent = current.send(buffer);
+        if (sent > 0) {
+            getSession().incrementOutgoingBytes(sent);
+        }
+        return sent;
+    }
+
+    @Override
     public void abort()
     {
         //TODO.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org