You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/12/13 20:51:06 UTC

svn commit: r1421452 - in /qpid/proton/trunk/proton-j: contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/ contrib/proton-jms/src/main/ja...

Author: rgodfrey
Date: Thu Dec 13 19:51:04 2012
New Revision: 1421452

URL: http://svn.apache.org/viewvc?rev=1421452&view=rev
Log:
PROTON-186 : return the number of bytes needed to fully encode the message from Message.encode(2)

Added:
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java
      - copied, changed from r1421438, qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/DroppingWritableBuffer.java
Removed:
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java
    qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/DroppingWritableBuffer.java
Modified:
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
    qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
    qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java

Modified: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java?rev=1421452&r1=1421451&r2=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java (original)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java Thu Dec 13 19:51:04 2012
@@ -76,7 +76,7 @@ public class AmqpSender extends AmqpLink
     final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>();
     long outboundBufferSize;
 
-    public MessageDelivery send(MessageImpl message) {
+    public MessageDelivery send(Message message) {
         assertExecuting();
         MessageDelivery rc = new MessageDelivery(message) {
             @Override

Modified: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java?rev=1421452&r1=1421451&r2=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java (original)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java Thu Dec 13 19:51:04 2012
@@ -17,44 +17,38 @@
 
 package org.apache.qpid.proton.hawtdispatch.api;
 
-import org.apache.qpid.proton.hawtdispatch.impl.DroppingWritableBuffer;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
 import org.apache.qpid.proton.hawtdispatch.impl.Watch;
 import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.engine.impl.DeliveryImpl;
 import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtdispatch.Task;
 
-import java.nio.ByteBuffer;
-
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public abstract class MessageDelivery extends WatchBase {
 
     final int initialSize;
-    private MessageImpl message;
+    private Message message;
     private Buffer encoded;
     public DeliveryImpl delivery;
     private int sizeHint = 1024*4;
 
-    static Buffer encode(MessageImpl message, int sizeHint) {
-        ByteBuffer buffer = ByteBuffer.wrap(new byte[sizeHint]);
-        DroppingWritableBuffer overflow = new DroppingWritableBuffer();
-        int c = message.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
-        if( overflow.position() > 0 ) {
-            buffer = ByteBuffer.wrap(new byte[sizeHint+overflow.position()]);
-            c = message.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+    static Buffer encode(Message message, int sizeHint) {
+        byte[] buffer = new byte[sizeHint];
+        int size = ((MessageImpl)message).encode2(buffer, 0, sizeHint);
+        if( size > sizeHint ) {
+            buffer = new byte[size];
+            size = message.encode(buffer, 0, size);
         }
-        return new Buffer(buffer.array(), 0, c);
+        return new Buffer(buffer, 0, size);
     }
 
-    static MessageImpl decode(Buffer buffer) {
-        MessageImpl msg = new MessageImpl();
+    static Message decode(Buffer buffer) {
+        Message msg = new MessageImpl();
         int offset = buffer.offset;
         int len = buffer.length;
         while( len > 0 ) {
@@ -66,7 +60,7 @@ public abstract class MessageDelivery ex
         return msg;
     }
 
-    public MessageDelivery(MessageImpl message) {
+    public MessageDelivery(Message message) {
         this(message, encode(message, 1024*4));
     }
 
@@ -74,7 +68,7 @@ public abstract class MessageDelivery ex
         this(null, encoded);
     }
 
-    public MessageDelivery(MessageImpl message, Buffer encoded) {
+    public MessageDelivery(Message message, Buffer encoded) {
         this.message = message;
         this.encoded = encoded;
         sizeHint = this.encoded.length;

Modified: qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java?rev=1421452&r1=1421451&r2=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java (original)
+++ qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java Thu Dec 13 19:51:04 2012
@@ -17,6 +17,7 @@
 package org.apache.qpid.proton.jms;
 
 import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.DroppingWritableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 

Modified: qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java?rev=1421452&r1=1421451&r2=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java (original)
+++ qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java Thu Dec 13 19:51:04 2012
@@ -19,6 +19,7 @@ package org.apache.qpid.proton.jms;
 import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.CompositeWritableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.codec.DroppingWritableBuffer;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;

Copied: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java (from r1421438, qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/DroppingWritableBuffer.java)
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java?p2=qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java&p1=qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/DroppingWritableBuffer.java&r1=1421438&r2=1421452&rev=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/DroppingWritableBuffer.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java Thu Dec 13 19:51:04 2012
@@ -18,79 +18,90 @@
  * under the License.
  *
  */
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.codec.WritableBuffer;
+package org.apache.qpid.proton.codec;
 
 import java.nio.ByteBuffer;
 
 public class DroppingWritableBuffer implements WritableBuffer
 {
-    int pos = 0;
+    private int _pos = 0;
 
     @Override
-    public boolean hasRemaining() {
+    public boolean hasRemaining() 
+    {
         return true;
     }
 
     @Override
-    public void put(byte b) {
-        pos += 1;
+    public void put(byte b)
+    {
+        _pos += 1;
     }
 
     @Override
-    public void putFloat(float f) {
-        pos += 4;
+    public void putFloat(float f)
+    {
+        _pos += 4;
     }
 
     @Override
-    public void putDouble(double d) {
-        pos += 8;
+    public void putDouble(double d)
+    {
+        _pos += 8;
     }
 
     @Override
-    public void put(byte[] src, int offset, int length) {
-        pos += length;
+    public void put(byte[] src, int offset, int length)
+    {
+        _pos += length;
     }
 
     @Override
-    public void putShort(short s) {
-        pos += 2;
+    public void putShort(short s)
+    {
+        _pos += 2;
     }
 
     @Override
-    public void putInt(int i) {
-        pos += 4;
+    public void putInt(int i)
+    {
+        _pos += 4;
     }
 
     @Override
-    public void putLong(long l) {
-        pos += 8;
+    public void putLong(long l)
+    {
+        _pos += 8;
     }
 
     @Override
-    public int remaining() {
-        return Integer.MAX_VALUE - pos;
+    public int remaining()
+    {
+        return Integer.MAX_VALUE - _pos;
     }
 
     @Override
-    public int position() {
-        return pos;
+    public int position()
+    {
+        return _pos;
     }
 
     @Override
-    public void position(int position) {
-        pos = position;
+    public void position(int position)
+    {
+        _pos = position;
     }
 
     @Override
-    public void put(ByteBuffer payload) {
-        pos += payload.remaining();
+    public void put(ByteBuffer payload)
+    {
+        _pos += payload.remaining();
         payload.position(payload.limit());
     }
 
     @Override
-    public int limit() {
+    public int limit()
+    {
         return Integer.MAX_VALUE;
     }
 }

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java?rev=1421452&r1=1421451&r2=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java Thu Dec 13 19:51:04 2012
@@ -28,10 +28,8 @@ import org.apache.qpid.proton.amqp.Symbo
 import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.messaging.*;
-import org.apache.qpid.proton.codec.AMQPDefinedTypes;
-import org.apache.qpid.proton.codec.DecoderImpl;
-import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.codec.*;
 import org.apache.qpid.proton.message.*;
 
 public class MessageImpl implements Message
@@ -675,6 +673,17 @@ public class MessageImpl implements Mess
         return encode(new WritableBuffer.ByteBufferWrapper(buffer));
     }
 
+    public int encode2(byte[] data, int offset, int length)
+    {
+        ByteBuffer buffer = ByteBuffer.wrap(data, offset, length);
+        WritableBuffer.ByteBufferWrapper first = new WritableBuffer.ByteBufferWrapper(buffer);
+        DroppingWritableBuffer second = new DroppingWritableBuffer();
+        CompositeWritableBuffer composite = new CompositeWritableBuffer(first, second);
+        int start = composite.position();
+        encode(composite);
+        return composite.position() - start;
+    }
+
     public int encode(WritableBuffer buffer)
     {
         int length = buffer.remaining();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org