You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/16 17:02:27 UTC

svn commit: r744953 - /activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java

Author: chirino
Date: Mon Feb 16 16:02:25 2009
New Revision: 744953

URL: http://svn.apache.org/viewvc?rev=744953&view=rev
Log:
- The wire protocol used in the staeful interface of the ProtoWireFormatFactory did not match exactly what the non-statefull version did. Matched them up.
- Since to wireformat is doing the framing, swithced to using the toUnframedByteArray() methods to save a few bytes.


Modified:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=744953&r1=744952&r2=744953&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java Mon Feb 16 16:02:25 2009
@@ -3,13 +3,12 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.StatefulWireFormat;
@@ -28,16 +27,28 @@
         public void marshal(Object value, DataOutput out) throws IOException {
             if( value.getClass() == Message.class ) {
                 out.writeByte(0);
-                ((Message)value).getProto().writeFramed((OutputStream)out);
+                Commands.Message proto = ((Message)value).getProto();
+                Buffer buffer = proto.toUnframedBuffer();
+                out.writeInt(buffer.getLength());
+                out.write(buffer.getData(), buffer.getOffset(), buffer.getLength());
             } else if( value.getClass() == String.class ) {
                 out.writeByte(1);
-                out.writeUTF((String) value);
+                String value2 = (String) value;
+                byte[] bytes = value2.getBytes("UTF-8");
+                out.writeInt(bytes.length);
+                out.write(bytes);
             } else if( value.getClass() == Destination.class ) {
                 out.writeByte(2);
-                ((Destination)value).writeFramed((OutputStream)out);
+                Destination proto = (Destination)value;
+                Buffer buffer = proto.toUnframedBuffer();
+                out.writeInt(buffer.getLength());
+                out.write(buffer.getData(), buffer.getOffset(), buffer.getLength());
             }else if( value.getClass() == FlowControl.class ) {
                 out.writeByte(3);
-                ((FlowControl)value).writeFramed((OutputStream)out);
+                FlowControl proto = (FlowControl)value;
+                Buffer buffer = proto.toUnframedBuffer();
+                out.writeInt(buffer.getLength());
+                out.write(buffer.getData(), buffer.getOffset(), buffer.getLength());
             } else {
                 throw new IOException("Unsupported type: "+value.getClass());
             }
@@ -45,20 +56,23 @@
 
         public Object unmarshal(DataInput in) throws IOException {
             byte type = in.readByte();
+            int size = in.readInt();
+            byte data[] = new byte[size];
+            in.readFully(data);
             switch(type) {
                 case 0:
                     Commands.Message m = new Commands.Message();
-                    m.mergeFramed((InputStream)in);
+                    m.mergeUnframed(data);
                     return new Message(m);
                 case 1:
-                    return in.readUTF();
+                    return new String(data, "UTF-8");
                 case 2:
                     Destination d = new Destination();
-                    d.mergeFramed((InputStream)in);
+                    d.mergeUnframed(data);
                     return d;
                 case 3:
                     FlowControl fc = new FlowControl();
-                    fc.mergeFramed((InputStream)in);
+                    fc.mergeUnframed(data);
                     return fc;
                 default:
                     throw new IOException("Unknonw type byte: ");
@@ -77,7 +91,7 @@
                 
                 if( value.getClass() == Message.class ) {
                 	
-                	currentOut = ByteBuffer.wrap(((Message)value).getProto().toFramedByteArray());
+                	currentOut = ByteBuffer.wrap(((Message)value).getProto().toUnframedByteArray());
                 	outType = 0;
                 } else if( value.getClass() == String.class ) {
                 	outType = 1;
@@ -89,10 +103,10 @@
                     }
                 } else if( value.getClass() == Destination.class ) {
                 	outType = 2;
-                    currentOut = ByteBuffer.wrap(((Destination)value).toFramedByteArray());
+                    currentOut = ByteBuffer.wrap(((Destination)value).toUnframedByteArray());
                 }else if( value.getClass() == FlowControl.class ) {
                 	outType = 3;
-                    currentOut = ByteBuffer.wrap(((FlowControl)value).toFramedByteArray());
+                    currentOut = ByteBuffer.wrap(((FlowControl)value).toUnframedByteArray());
                 }else {
                     throw new IOException("Unsupported type: "+value.getClass());
                 }
@@ -181,7 +195,7 @@
             	Commands.Message m = new Commands.Message();
             	try
             	{
-            		m.mergeFramed(currentIn.array());
+            		m.mergeUnframed(currentIn.array());
             	}
             	catch(Exception e)
             	{
@@ -194,12 +208,12 @@
             	break;
         	case 2:
         		Destination d = new Destination();
-        		d.mergeFramed(currentIn.array());
+        		d.mergeUnframed(currentIn.array());
         		ret = d;
         		break;
         	case 3:
         		FlowControl c = new FlowControl();
-        		c.mergeFramed(currentIn.array());
+        		c.mergeUnframed(currentIn.array());
         		ret = c;
         		break;
         	default: