You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/16 17:02:27 UTC
svn commit: r744953 -
/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
Author: chirino
Date: Mon Feb 16 16:02:25 2009
New Revision: 744953
URL: http://svn.apache.org/viewvc?rev=744953&view=rev
Log:
- The wire protocol used in the staeful interface of the ProtoWireFormatFactory did not match exactly what the non-statefull version did. Matched them up.
- Since to wireformat is doing the framing, swithced to using the toUnframedByteArray() methods to save a few bytes.
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=744953&r1=744952&r2=744953&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java Mon Feb 16 16:02:25 2009
@@ -3,13 +3,12 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.StatefulWireFormat;
@@ -28,16 +27,28 @@
public void marshal(Object value, DataOutput out) throws IOException {
if( value.getClass() == Message.class ) {
out.writeByte(0);
- ((Message)value).getProto().writeFramed((OutputStream)out);
+ Commands.Message proto = ((Message)value).getProto();
+ Buffer buffer = proto.toUnframedBuffer();
+ out.writeInt(buffer.getLength());
+ out.write(buffer.getData(), buffer.getOffset(), buffer.getLength());
} else if( value.getClass() == String.class ) {
out.writeByte(1);
- out.writeUTF((String) value);
+ String value2 = (String) value;
+ byte[] bytes = value2.getBytes("UTF-8");
+ out.writeInt(bytes.length);
+ out.write(bytes);
} else if( value.getClass() == Destination.class ) {
out.writeByte(2);
- ((Destination)value).writeFramed((OutputStream)out);
+ Destination proto = (Destination)value;
+ Buffer buffer = proto.toUnframedBuffer();
+ out.writeInt(buffer.getLength());
+ out.write(buffer.getData(), buffer.getOffset(), buffer.getLength());
}else if( value.getClass() == FlowControl.class ) {
out.writeByte(3);
- ((FlowControl)value).writeFramed((OutputStream)out);
+ FlowControl proto = (FlowControl)value;
+ Buffer buffer = proto.toUnframedBuffer();
+ out.writeInt(buffer.getLength());
+ out.write(buffer.getData(), buffer.getOffset(), buffer.getLength());
} else {
throw new IOException("Unsupported type: "+value.getClass());
}
@@ -45,20 +56,23 @@
public Object unmarshal(DataInput in) throws IOException {
byte type = in.readByte();
+ int size = in.readInt();
+ byte data[] = new byte[size];
+ in.readFully(data);
switch(type) {
case 0:
Commands.Message m = new Commands.Message();
- m.mergeFramed((InputStream)in);
+ m.mergeUnframed(data);
return new Message(m);
case 1:
- return in.readUTF();
+ return new String(data, "UTF-8");
case 2:
Destination d = new Destination();
- d.mergeFramed((InputStream)in);
+ d.mergeUnframed(data);
return d;
case 3:
FlowControl fc = new FlowControl();
- fc.mergeFramed((InputStream)in);
+ fc.mergeUnframed(data);
return fc;
default:
throw new IOException("Unknonw type byte: ");
@@ -77,7 +91,7 @@
if( value.getClass() == Message.class ) {
- currentOut = ByteBuffer.wrap(((Message)value).getProto().toFramedByteArray());
+ currentOut = ByteBuffer.wrap(((Message)value).getProto().toUnframedByteArray());
outType = 0;
} else if( value.getClass() == String.class ) {
outType = 1;
@@ -89,10 +103,10 @@
}
} else if( value.getClass() == Destination.class ) {
outType = 2;
- currentOut = ByteBuffer.wrap(((Destination)value).toFramedByteArray());
+ currentOut = ByteBuffer.wrap(((Destination)value).toUnframedByteArray());
}else if( value.getClass() == FlowControl.class ) {
outType = 3;
- currentOut = ByteBuffer.wrap(((FlowControl)value).toFramedByteArray());
+ currentOut = ByteBuffer.wrap(((FlowControl)value).toUnframedByteArray());
}else {
throw new IOException("Unsupported type: "+value.getClass());
}
@@ -181,7 +195,7 @@
Commands.Message m = new Commands.Message();
try
{
- m.mergeFramed(currentIn.array());
+ m.mergeUnframed(currentIn.array());
}
catch(Exception e)
{
@@ -194,12 +208,12 @@
break;
case 2:
Destination d = new Destination();
- d.mergeFramed(currentIn.array());
+ d.mergeUnframed(currentIn.array());
ret = d;
break;
case 3:
FlowControl c = new FlowControl();
- c.mergeFramed(currentIn.array());
+ c.mergeUnframed(currentIn.array());
ret = c;
break;
default: