You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:42:58 UTC
svn commit: r961072 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
activemq-transport/src/main/java/org/apache/activemq/wir...
Author: chirino
Date: Wed Jul 7 03:42:58 2010
New Revision: 961072
URL: http://svn.apache.org/viewvc?rev=961072&view=rev
Log:
continuing to work on fixing simple case
Added:
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961072&r1=961071&r2=961072&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:42:58 2010
@@ -461,7 +461,7 @@ class DeliveryOverflowBuffer(val deliver
}
-class DeliveryCreditBufferProtocol(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained {
+class DeliveryCreditBufferProtocol(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained with Suspendable {
var sessions = List[CreditServer]()
@@ -478,7 +478,10 @@ class DeliveryCreditBufferProtocol(val d
// use a event aggregating source to coalesce multiple events from the same thread.
val source = createSource(new ListEventAggregator[Delivery](), queue)
source.setEventHandler(^{drain_source});
- source.resume
+
+ def suspend() = source.suspend
+ def resume() = source.resume
+ def isSuspended() = source.isSuspended
def drain_source = {
val deliveries = source.getData
@@ -488,7 +491,6 @@ class DeliveryCreditBufferProtocol(val d
}
}
-
class CreditServer(val producer_queue:DispatchQueue) {
private var _capacity = 0
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961072&r1=961071&r2=961072&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:42:58 2010
@@ -16,24 +16,17 @@
*/
package org.apache.activemq.apollo.stomp
-import _root_.java.io.{DataOutput, DataInput, EOFException, IOException}
-import _root_.java.nio.channels.{ReadableByteChannel, SocketChannel}
-import _root_.java.util.{LinkedList, ArrayList}
import _root_.org.apache.activemq.apollo.broker._
-import _root_.org.apache.activemq.wireformat.{WireFormatFactory, WireFormat}
+import _root_.org.apache.activemq.wireformat.{WireFormat}
import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
-import java.nio.ByteBuffer
import _root_.org.apache.activemq.util.buffer._
import collection.mutable.{ListBuffer, HashMap}
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import AsciiBuffer._
import Stomp._
-import Stomp.Headers._
-
import BufferConversions._
-import _root_.scala.collection.JavaConversions._
import StompFrameConstants._;
@@ -101,9 +94,11 @@ class StompProtocolHandler extends Proto
this.connection = connection
// We will be using the default virtual host
+ println("waiting for host")
connection.transport.suspendRead
connection.broker.runtime.getDefaultVirtualHost(
queue.wrap { (host)=>
+ println("got host")
this.host=host
connection.transport.resumeRead
}
@@ -113,24 +108,29 @@ class StompProtocolHandler extends Proto
def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
def onCommand(command:Any) = {
- command match {
- case StompFrame(Commands.SEND, headers, content) =>
- on_stomp_send(command.asInstanceOf[StompFrame])
- case StompFrame(Commands.ACK, headers, content) =>
- // TODO:
- case StompFrame(Commands.SUBSCRIBE, headers, content) =>
- on_stomp_subscribe(headers)
- case StompFrame(Commands.CONNECT, headers, _) =>
- on_stomp_connect(headers)
- case StompFrame(Commands.DISCONNECT, headers, content) =>
- stop
- case s:StompWireFormat =>
- // this is passed on to us by the protocol discriminator
- // so we know which wire format is being used.
- case StompFrame(unknown, _, _) =>
- die("Unsupported STOMP command: "+unknown);
- case _ =>
- die("Unsupported command: "+command);
+ try {
+ command match {
+ case StompFrame(Commands.SEND, headers, content) =>
+ on_stomp_send(command.asInstanceOf[StompFrame])
+ case StompFrame(Commands.ACK, headers, content) =>
+ // TODO:
+ case StompFrame(Commands.SUBSCRIBE, headers, content) =>
+ on_stomp_subscribe(headers)
+ case StompFrame(Commands.CONNECT, headers, _) =>
+ on_stomp_connect(headers)
+ case StompFrame(Commands.DISCONNECT, headers, content) =>
+ stop
+ case s:StompWireFormat =>
+ // this is passed on to us by the protocol discriminator
+ // so we know which wire format is being used.
+ case StompFrame(unknown, _, _) =>
+ die("Unsupported STOMP command: "+unknown);
+ case _ =>
+ die("Unsupported command: "+command);
+ }
+ } catch {
+ case e:Exception =>
+ die("Unexpected error: "+e);
}
}
@@ -259,272 +259,3 @@ class StompProtocolHandler extends Proto
}
}
-/**
- * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
- */
-class StompWireFormatFactory extends WireFormatFactory {
- import Stomp.Commands.CONNECT
-
- def createWireFormat() = new StompWireFormat();
-
- def isDiscriminatable() = true
-
- def maxWireformatHeaderLength() = CONNECT.length+10;
-
- def matchesWireformatHeader(header:Buffer) = {
- if( header.length < CONNECT.length) {
- false
- } else {
- // the magic can be preceded with newlines / whitespace..
- header.trimFront.startsWith(CONNECT);
- }
- }
-}
-
-object StompWireFormat {
- val READ_BUFFFER_SIZE = 1024*64;
- val MAX_COMMAND_LENGTH = 1024;
- val MAX_HEADER_LENGTH = 1024 * 10;
- val MAX_HEADERS = 1000;
- val MAX_DATA_LENGTH = 1024 * 1024 * 100;
- val TRIM=false
- val SIZE_CHECK=false
- }
-
-class StompWireFormat extends WireFormat {
- import StompWireFormat._
-
- implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
- implicit def wrap(x: Byte) = {
- ByteBuffer.wrap(Array(x));
- }
-
- def marshal(command:Any, os:DataOutput) = {
- val frame = command.asInstanceOf[StompFrame]
- frame.action.writeTo(os)
- os.write(NEWLINE)
-
- // we can optimize a little if the headers and content are in the same buffer..
- if( !frame.headers.isEmpty && !frame.content.isEmpty &&
- ( frame.headers.head._1.data eq frame.content.data ) ) {
-
- val offset = frame.headers.head._1.offset;
- val buffer1 = frame.headers.head._1;
- val buffer2 = frame.content;
- val length = (buffer2.offset-buffer1.offset)+buffer2.length
- os.write( buffer1.data, offset, length)
-
- } else {
- for( (key, value) <- frame.headers ) {
- key.writeTo(os)
- os.write(SEPERATOR)
- value.writeTo(os)
- os.write(NEWLINE)
- }
- os.write(NEWLINE)
- frame.content.writeTo(os)
- }
- END_OF_FRAME_BUFFER.writeTo(os)
- }
-
- def marshal(command:Any):Buffer= {
- val frame = command.asInstanceOf[StompFrame]
- // make a little bigger since size can be an estimate and we want to avoid
- // a capacity re-size.
- val os = new DataByteArrayOutputStream(frame.size + 100);
- marshal(frame, os)
- os.toBuffer
- }
-
- def unmarshal(packet:Buffer) = {
- throw new UnsupportedOperationException
- }
- def unmarshal(in: DataInput):Object = {
- throw new UnsupportedOperationException
- }
-
- def getName() = "stomp"
-
- def getWireFormatFactory() = new StompWireFormatFactory
-
- //
- // state associated with un-marshalling stomp frames from
- // a non-blocking NIO channel.
- //
- def createUnmarshalSession() = new StompUnmarshalSession
-
- class StompUnmarshalSession extends UnmarshalSession {
-
- type FrameReader = (ByteBuffer)=>StompFrame
-
- var next_action:FrameReader = read_action
- var end = 0
- var start = 0
-
- def getStartPos() = start
- def setStartPos(pos:Int):Unit = {start=pos}
-
- def getEndPos() = end
- def setEndPos(pos:Int):Unit = { end = pos }
-
- def unmarshal(buffer:ByteBuffer):Object = {
- // keep running the next action until
- // a frame is decoded or we run out of input
- var rc:StompFrame = null
- while( rc == null && end!=buffer.position ) {
- rc = next_action(buffer)
- }
- rc
- }
-
- def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
- val read_limit = buffer.position
- while( end < read_limit ) {
- if( buffer.array()(end) =='\n') {
- var rc = new Buffer(buffer.array, start, end-start)
- end += 1;
- start = end;
- return rc
- }
- if (SIZE_CHECK && end-start > maxLength) {
- throw new IOException(errorMessage);
- }
- end += 1;
- }
- return null;
- }
-
- def read_action:FrameReader = (buffer)=> {
- val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
- if( line !=null ) {
- var action = line
- if( TRIM ) {
- action = action.trim();
- }
- if (action.length() > 0) {
- next_action = read_headers(action)
- }
- }
- null
- }
-
- def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
- val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
- if( line !=null ) {
- if( line.trim().length() > 0 ) {
-
- if (SIZE_CHECK && headers.size > MAX_HEADERS) {
- throw new IOException("The maximum number of headers was exceeded");
- }
-
- try {
- val seperatorIndex = line.indexOf(SEPERATOR);
- if( seperatorIndex<0 ) {
- throw new IOException("Header line missing seperator [" + ascii(line) + "]");
- }
- var name = line.slice(0, seperatorIndex);
- if( TRIM ) {
- name = name.trim();
- }
- var value = line.slice(seperatorIndex + 1, line.length());
- if( TRIM ) {
- value = value.trim();
- }
- headers.add((ascii(name), ascii(value)))
- } catch {
- case e:Exception=>
- e.printStackTrace
- throw new IOException("Unable to parser header line [" + line + "]");
- }
-
- } else {
- val contentLength = get(headers, CONTENT_LENGTH)
- if (contentLength.isDefined) {
- // Bless the client, he's telling us how much data to read in.
- var length=0;
- try {
- length = Integer.parseInt(contentLength.get.trim().toString());
- } catch {
- case e:NumberFormatException=>
- throw new IOException("Specified content-length is not a valid integer");
- }
-
- if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
- throw new IOException("The maximum data length was exceeded");
- }
- next_action = read_binary_body(action, headers, length)
-
- } else {
- next_action = read_text_body(action, headers)
- }
- }
- }
- null
- }
-
- def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
- val i = headers.iterator
- while( i.hasNext ) {
- val entry = i.next
- if( entry._1 == name ) {
- return Some(entry._2)
- }
- }
- None
- }
-
-
- def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
- val content:Buffer=read_content(buffer, contentLength)
- if( content != null ) {
- next_action = read_action
- new StompFrame(ascii(action), headers.toList, content)
- } else {
- null
- }
- }
-
-
- def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
- val read_limit = buffer.position
- if( (read_limit-start) < contentLength+1 ) {
- end = read_limit;
- null
- } else {
- if( buffer.array()(start+contentLength)!= 0 ) {
- throw new IOException("Exected null termintor after "+contentLength+" content bytes");
- }
- var rc = new Buffer(buffer.array, start, contentLength)
- end = start+contentLength+1;
- start = end;
- rc;
- }
- }
-
- def read_to_null(buffer:ByteBuffer):Buffer = {
- val read_limit = buffer.position
- while( end < read_limit ) {
- if( buffer.array()(end) ==0) {
- var rc = new Buffer(buffer.array, start, end-start)
- end += 1;
- start = end;
- return rc;
- }
- end += 1;
- }
- return null;
- }
-
-
- def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
- val content:Buffer=read_to_null(buffer)
- if( content != null ) {
- next_action = read_action
- new StompFrame(ascii(action), headers.toList, content)
- } else {
- null
- }
- }
- }
-
-}
Added: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961072&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 03:42:58 2010
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp
+
+import _root_.java.io.{DataOutput, DataInput, IOException}
+import _root_.org.apache.activemq.apollo.broker._
+
+import _root_.org.apache.activemq.wireformat.{WireFormatFactory, WireFormat}
+import java.nio.ByteBuffer
+import _root_.org.apache.activemq.util.buffer._
+import collection.mutable.{ListBuffer, HashMap}
+import AsciiBuffer._
+import Stomp._
+import Stomp.Headers._
+
+import BufferConversions._
+import _root_.scala.collection.JavaConversions._
+import StompFrameConstants._;
+
+
+/**
+ * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
+ */
+class StompWireFormatFactory extends WireFormatFactory {
+ import Stomp.Commands.CONNECT
+
+ def createWireFormat() = new StompWireFormat();
+
+ def isDiscriminatable() = true
+
+ def maxWireformatHeaderLength() = CONNECT.length+10;
+
+ def matchesWireformatHeader(header:Buffer) = {
+ if( header.length < CONNECT.length) {
+ false
+ } else {
+ // the magic can be preceded with newlines / whitespace..
+ header.trimFront.startsWith(CONNECT);
+ }
+ }
+}
+
+object StompWireFormat {
+ val READ_BUFFFER_SIZE = 1024*64;
+ val MAX_COMMAND_LENGTH = 1024;
+ val MAX_HEADER_LENGTH = 1024 * 10;
+ val MAX_HEADERS = 1000;
+ val MAX_DATA_LENGTH = 1024 * 1024 * 100;
+ val TRIM=false
+ val SIZE_CHECK=false
+ }
+
+class StompWireFormat extends WireFormat {
+ import StompWireFormat._
+
+ implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
+ implicit def wrap(x: Byte) = {
+ ByteBuffer.wrap(Array(x));
+ }
+
+ def marshal(command:Any, os:DataOutput) = {
+ val frame = command.asInstanceOf[StompFrame]
+ frame.action.writeTo(os)
+ os.write(NEWLINE)
+
+ // we can optimize a little if the headers and content are in the same buffer..
+ if( !frame.headers.isEmpty && !frame.content.isEmpty &&
+ ( frame.headers.head._1.data eq frame.content.data ) ) {
+
+ val offset = frame.headers.head._1.offset;
+ val buffer1 = frame.headers.head._1;
+ val buffer2 = frame.content;
+ val length = (buffer2.offset-buffer1.offset)+buffer2.length
+ os.write( buffer1.data, offset, length)
+
+ } else {
+ for( (key, value) <- frame.headers ) {
+ key.writeTo(os)
+ os.write(SEPERATOR)
+ value.writeTo(os)
+ os.write(NEWLINE)
+ }
+ os.write(NEWLINE)
+ frame.content.writeTo(os)
+ }
+ END_OF_FRAME_BUFFER.writeTo(os)
+ }
+
+ def marshal(command:Any):Buffer= {
+ val frame = command.asInstanceOf[StompFrame]
+ // make a little bigger since size can be an estimate and we want to avoid
+ // a capacity re-size.
+ val os = new DataByteArrayOutputStream(frame.size + 100);
+ marshal(frame, os)
+ os.toBuffer
+ }
+
+ def unmarshal(packet:Buffer) = {
+ throw new UnsupportedOperationException
+ }
+ def unmarshal(in: DataInput):Object = {
+ throw new UnsupportedOperationException
+ }
+
+ def getName() = "stomp"
+
+ def getWireFormatFactory() = new StompWireFormatFactory
+
+ //
+ // state associated with un-marshalling stomp frames from
+ // a non-blocking NIO channel.
+ //
+ def createUnmarshalSession() = new StompUnmarshalSession
+
+ class StompUnmarshalSession extends UnmarshalSession {
+
+ type FrameReader = (ByteBuffer)=>StompFrame
+
+ var next_action:FrameReader = read_action
+ var end = 0
+ var start = 0
+
+ def getStartPos() = start
+ def setStartPos(pos:Int):Unit = {start=pos}
+
+ def getEndPos() = end
+ def setEndPos(pos:Int):Unit = { end = pos }
+
+ def unmarshal(buffer:ByteBuffer):Object = {
+ // keep running the next action until
+ // a frame is decoded or we run out of input
+ var rc:StompFrame = null
+ while( rc == null && end!=buffer.position ) {
+ rc = next_action(buffer)
+ }
+ rc
+ }
+
+ def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
+ val read_limit = buffer.position
+ while( end < read_limit ) {
+ if( buffer.array()(end) =='\n') {
+ var rc = new Buffer(buffer.array, start, end-start)
+ end += 1;
+ start = end;
+ return rc
+ }
+ if (SIZE_CHECK && end-start > maxLength) {
+ throw new IOException(errorMessage);
+ }
+ end += 1;
+ }
+ return null;
+ }
+
+ def read_action:FrameReader = (buffer)=> {
+ val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
+ if( line !=null ) {
+ var action = line
+ if( TRIM ) {
+ action = action.trim();
+ }
+ if (action.length() > 0) {
+ next_action = read_headers(action)
+ }
+ }
+ null
+ }
+
+ def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
+ val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
+ if( line !=null ) {
+ if( line.trim().length() > 0 ) {
+
+ if (SIZE_CHECK && headers.size > MAX_HEADERS) {
+ throw new IOException("The maximum number of headers was exceeded");
+ }
+
+ try {
+ val seperatorIndex = line.indexOf(SEPERATOR);
+ if( seperatorIndex<0 ) {
+ throw new IOException("Header line missing seperator [" + ascii(line) + "]");
+ }
+ var name = line.slice(0, seperatorIndex);
+ if( TRIM ) {
+ name = name.trim();
+ }
+ var value = line.slice(seperatorIndex + 1, line.length());
+ if( TRIM ) {
+ value = value.trim();
+ }
+ headers.add((ascii(name), ascii(value)))
+ } catch {
+ case e:Exception=>
+ e.printStackTrace
+ throw new IOException("Unable to parser header line [" + line + "]");
+ }
+
+ } else {
+ val contentLength = get(headers, CONTENT_LENGTH)
+ if (contentLength.isDefined) {
+ // Bless the client, he's telling us how much data to read in.
+ var length=0;
+ try {
+ length = Integer.parseInt(contentLength.get.trim().toString());
+ } catch {
+ case e:NumberFormatException=>
+ throw new IOException("Specified content-length is not a valid integer");
+ }
+
+ if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
+ throw new IOException("The maximum data length was exceeded");
+ }
+ next_action = read_binary_body(action, headers, length)
+
+ } else {
+ next_action = read_text_body(action, headers)
+ }
+ }
+ }
+ null
+ }
+
+ def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
+ val i = headers.iterator
+ while( i.hasNext ) {
+ val entry = i.next
+ if( entry._1 == name ) {
+ return Some(entry._2)
+ }
+ }
+ None
+ }
+
+
+ def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
+ val content:Buffer=read_content(buffer, contentLength)
+ if( content != null ) {
+ next_action = read_action
+ new StompFrame(ascii(action), headers.toList, content)
+ } else {
+ null
+ }
+ }
+
+
+ def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
+ val read_limit = buffer.position
+ if( (read_limit-start) < contentLength+1 ) {
+ end = read_limit;
+ null
+ } else {
+ if( buffer.array()(start+contentLength)!= 0 ) {
+ throw new IOException("Exected null termintor after "+contentLength+" content bytes");
+ }
+ var rc = new Buffer(buffer.array, start, contentLength)
+ end = start+contentLength+1;
+ start = end;
+ rc;
+ }
+ }
+
+ def read_to_null(buffer:ByteBuffer):Buffer = {
+ val read_limit = buffer.position
+ while( end < read_limit ) {
+ if( buffer.array()(end) ==0) {
+ var rc = new Buffer(buffer.array, start, end-start)
+ end += 1;
+ start = end;
+ return rc;
+ }
+ end += 1;
+ }
+ return null;
+ }
+
+
+ def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
+ val content:Buffer=read_to_null(buffer)
+ if( content != null ) {
+ next_action = read_action
+ new StompFrame(ascii(action), headers.toList, content)
+ } else {
+ null
+ }
+ }
+ }
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=961072&r1=961071&r2=961072&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Wed Jul 7 03:42:58 2010
@@ -108,19 +108,35 @@ public class MultiWireFormatFactory impl
UnmarshalSession session;
public int getStartPos() {
- return start;
+ if( session!=null ) {
+ return session.getStartPos();
+ } else {
+ return start;
+ }
}
public void setStartPos(int pos) {
- start=pos;
+ if( session!=null ) {
+ session.setEndPos(pos);
+ } else {
+ start=pos;
+ }
}
public int getEndPos() {
- return end;
+ if( session!=null ) {
+ return session.getEndPos();
+ } else {
+ return end;
+ }
}
public void setEndPos(int pos) {
- end = pos;
+ if( session!=null ) {
+ session.setEndPos(pos);
+ } else {
+ end = pos;
+ }
}
public Object unmarshal(ByteBuffer buffer) throws IOException {