You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/12/13 20:51:06 UTC
svn commit: r1421452 - in /qpid/proton/trunk/proton-j:
contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/
contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/
contrib/proton-jms/src/main/ja...
Author: rgodfrey
Date: Thu Dec 13 19:51:04 2012
New Revision: 1421452
URL: http://svn.apache.org/viewvc?rev=1421452&view=rev
Log:
PROTON-186 : return the number of bytes needed to fully encode the message from Message.encode(2)
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java
- copied, changed from r1421438, qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/DroppingWritableBuffer.java
Removed:
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java
qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/DroppingWritableBuffer.java
Modified:
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
Modified: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java?rev=1421452&r1=1421451&r2=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java (original)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java Thu Dec 13 19:51:04 2012
@@ -76,7 +76,7 @@ public class AmqpSender extends AmqpLink
final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>();
long outboundBufferSize;
- public MessageDelivery send(MessageImpl message) {
+ public MessageDelivery send(Message message) {
assertExecuting();
MessageDelivery rc = new MessageDelivery(message) {
@Override
Modified: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java?rev=1421452&r1=1421451&r2=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java (original)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java Thu Dec 13 19:51:04 2012
@@ -17,44 +17,38 @@
package org.apache.qpid.proton.hawtdispatch.api;
-import org.apache.qpid.proton.hawtdispatch.impl.DroppingWritableBuffer;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.hawtdispatch.impl.Watch;
import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtdispatch.Task;
-import java.nio.ByteBuffer;
-
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public abstract class MessageDelivery extends WatchBase {
final int initialSize;
- private MessageImpl message;
+ private Message message;
private Buffer encoded;
public DeliveryImpl delivery;
private int sizeHint = 1024*4;
- static Buffer encode(MessageImpl message, int sizeHint) {
- ByteBuffer buffer = ByteBuffer.wrap(new byte[sizeHint]);
- DroppingWritableBuffer overflow = new DroppingWritableBuffer();
- int c = message.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
- if( overflow.position() > 0 ) {
- buffer = ByteBuffer.wrap(new byte[sizeHint+overflow.position()]);
- c = message.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+ static Buffer encode(Message message, int sizeHint) {
+ byte[] buffer = new byte[sizeHint];
+ int size = ((MessageImpl)message).encode2(buffer, 0, sizeHint);
+ if( size > sizeHint ) {
+ buffer = new byte[size];
+ size = message.encode(buffer, 0, size);
}
- return new Buffer(buffer.array(), 0, c);
+ return new Buffer(buffer, 0, size);
}
- static MessageImpl decode(Buffer buffer) {
- MessageImpl msg = new MessageImpl();
+ static Message decode(Buffer buffer) {
+ Message msg = new MessageImpl();
int offset = buffer.offset;
int len = buffer.length;
while( len > 0 ) {
@@ -66,7 +60,7 @@ public abstract class MessageDelivery ex
return msg;
}
- public MessageDelivery(MessageImpl message) {
+ public MessageDelivery(Message message) {
this(message, encode(message, 1024*4));
}
@@ -74,7 +68,7 @@ public abstract class MessageDelivery ex
this(null, encoded);
}
- public MessageDelivery(MessageImpl message, Buffer encoded) {
+ public MessageDelivery(Message message, Buffer encoded) {
this.message = message;
this.encoded = encoded;
sizeHint = this.encoded.length;
Modified: qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java?rev=1421452&r1=1421451&r2=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java (original)
+++ qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java Thu Dec 13 19:51:04 2012
@@ -17,6 +17,7 @@
package org.apache.qpid.proton.jms;
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.DroppingWritableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.amqp.UnsignedInteger;
Modified: qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java?rev=1421452&r1=1421451&r2=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java (original)
+++ qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/JMSMappingOutboundTransformer.java Thu Dec 13 19:51:04 2012
@@ -19,6 +19,7 @@ package org.apache.qpid.proton.jms;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.codec.DroppingWritableBuffer;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
Copied: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java (from r1421438, qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/DroppingWritableBuffer.java)
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java?p2=qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java&p1=qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/DroppingWritableBuffer.java&r1=1421438&r2=1421452&rev=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/DroppingWritableBuffer.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/codec/DroppingWritableBuffer.java Thu Dec 13 19:51:04 2012
@@ -18,79 +18,90 @@
* under the License.
*
*/
-package org.apache.qpid.proton.jms;
-
-import org.apache.qpid.proton.codec.WritableBuffer;
+package org.apache.qpid.proton.codec;
import java.nio.ByteBuffer;
public class DroppingWritableBuffer implements WritableBuffer
{
- int pos = 0;
+ private int _pos = 0;
@Override
- public boolean hasRemaining() {
+ public boolean hasRemaining()
+ {
return true;
}
@Override
- public void put(byte b) {
- pos += 1;
+ public void put(byte b)
+ {
+ _pos += 1;
}
@Override
- public void putFloat(float f) {
- pos += 4;
+ public void putFloat(float f)
+ {
+ _pos += 4;
}
@Override
- public void putDouble(double d) {
- pos += 8;
+ public void putDouble(double d)
+ {
+ _pos += 8;
}
@Override
- public void put(byte[] src, int offset, int length) {
- pos += length;
+ public void put(byte[] src, int offset, int length)
+ {
+ _pos += length;
}
@Override
- public void putShort(short s) {
- pos += 2;
+ public void putShort(short s)
+ {
+ _pos += 2;
}
@Override
- public void putInt(int i) {
- pos += 4;
+ public void putInt(int i)
+ {
+ _pos += 4;
}
@Override
- public void putLong(long l) {
- pos += 8;
+ public void putLong(long l)
+ {
+ _pos += 8;
}
@Override
- public int remaining() {
- return Integer.MAX_VALUE - pos;
+ public int remaining()
+ {
+ return Integer.MAX_VALUE - _pos;
}
@Override
- public int position() {
- return pos;
+ public int position()
+ {
+ return _pos;
}
@Override
- public void position(int position) {
- pos = position;
+ public void position(int position)
+ {
+ _pos = position;
}
@Override
- public void put(ByteBuffer payload) {
- pos += payload.remaining();
+ public void put(ByteBuffer payload)
+ {
+ _pos += payload.remaining();
payload.position(payload.limit());
}
@Override
- public int limit() {
+ public int limit()
+ {
return Integer.MAX_VALUE;
}
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java?rev=1421452&r1=1421451&r2=1421452&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java Thu Dec 13 19:51:04 2012
@@ -28,10 +28,8 @@ import org.apache.qpid.proton.amqp.Symbo
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.*;
-import org.apache.qpid.proton.codec.AMQPDefinedTypes;
-import org.apache.qpid.proton.codec.DecoderImpl;
-import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.codec.*;
import org.apache.qpid.proton.message.*;
public class MessageImpl implements Message
@@ -675,6 +673,17 @@ public class MessageImpl implements Mess
return encode(new WritableBuffer.ByteBufferWrapper(buffer));
}
+ public int encode2(byte[] data, int offset, int length)
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(data, offset, length);
+ WritableBuffer.ByteBufferWrapper first = new WritableBuffer.ByteBufferWrapper(buffer);
+ DroppingWritableBuffer second = new DroppingWritableBuffer();
+ CompositeWritableBuffer composite = new CompositeWritableBuffer(first, second);
+ int start = composite.position();
+ encode(composite);
+ return composite.position() - start;
+ }
+
public int encode(WritableBuffer buffer)
{
int length = buffer.remaining();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org