You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:42:58 UTC

svn commit: r961072 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ activemq-transport/src/main/java/org/apache/activemq/wir...

Author: chirino
Date: Wed Jul  7 03:42:58 2010
New Revision: 961072

URL: http://svn.apache.org/viewvc?rev=961072&view=rev
Log:
continuing to work on fixing simple case

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961072&r1=961071&r2=961072&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:42:58 2010
@@ -461,7 +461,7 @@ class DeliveryOverflowBuffer(val deliver
 
 }
 
-class DeliveryCreditBufferProtocol(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained {
+class DeliveryCreditBufferProtocol(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained with Suspendable {
 
   var sessions = List[CreditServer]()
 
@@ -478,7 +478,10 @@ class DeliveryCreditBufferProtocol(val d
   // use a event aggregating source to coalesce multiple events from the same thread.
   val source = createSource(new ListEventAggregator[Delivery](), queue)
   source.setEventHandler(^{drain_source});
-  source.resume
+
+  def suspend() = source.suspend
+  def resume() = source.resume
+  def isSuspended() = source.isSuspended
 
   def drain_source = {
     val deliveries = source.getData
@@ -488,7 +491,6 @@ class DeliveryCreditBufferProtocol(val d
     }
   }
 
-
   class CreditServer(val producer_queue:DispatchQueue) {
     private var _capacity = 0
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961072&r1=961071&r2=961072&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:42:58 2010
@@ -16,24 +16,17 @@
  */
 package org.apache.activemq.apollo.stomp
 
-import _root_.java.io.{DataOutput, DataInput, EOFException, IOException}
-import _root_.java.nio.channels.{ReadableByteChannel, SocketChannel}
-import _root_.java.util.{LinkedList, ArrayList}
 import _root_.org.apache.activemq.apollo.broker._
 
-import _root_.org.apache.activemq.wireformat.{WireFormatFactory, WireFormat}
+import _root_.org.apache.activemq.wireformat.{WireFormat}
 import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
-import java.nio.ByteBuffer
 import _root_.org.apache.activemq.util.buffer._
 import collection.mutable.{ListBuffer, HashMap}
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
 import AsciiBuffer._
 import Stomp._
-import Stomp.Headers._
-
 import BufferConversions._
-import _root_.scala.collection.JavaConversions._
 import StompFrameConstants._;
 
 
@@ -101,9 +94,11 @@ class StompProtocolHandler extends Proto
     this.connection = connection
 
     // We will be using the default virtual host
+    println("waiting for host")
     connection.transport.suspendRead
     connection.broker.runtime.getDefaultVirtualHost(
       queue.wrap { (host)=>
+        println("got host")
         this.host=host
         connection.transport.resumeRead
       }
@@ -113,24 +108,29 @@ class StompProtocolHandler extends Proto
   def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
 
   def onCommand(command:Any) = {
-    command match {
-      case StompFrame(Commands.SEND, headers, content) =>
-        on_stomp_send(command.asInstanceOf[StompFrame])
-      case StompFrame(Commands.ACK, headers, content) =>
-        // TODO:
-      case StompFrame(Commands.SUBSCRIBE, headers, content) =>
-        on_stomp_subscribe(headers)
-      case StompFrame(Commands.CONNECT, headers, _) =>
-        on_stomp_connect(headers)
-      case StompFrame(Commands.DISCONNECT, headers, content) =>
-        stop
-      case s:StompWireFormat =>
-        // this is passed on to us by the protocol discriminator
-        // so we know which wire format is being used.
-      case StompFrame(unknown, _, _) =>
-        die("Unsupported STOMP command: "+unknown);
-      case _ =>
-        die("Unsupported command: "+command);
+    try {
+      command match {
+        case StompFrame(Commands.SEND, headers, content) =>
+          on_stomp_send(command.asInstanceOf[StompFrame])
+        case StompFrame(Commands.ACK, headers, content) =>
+          // TODO:
+        case StompFrame(Commands.SUBSCRIBE, headers, content) =>
+          on_stomp_subscribe(headers)
+        case StompFrame(Commands.CONNECT, headers, _) =>
+          on_stomp_connect(headers)
+        case StompFrame(Commands.DISCONNECT, headers, content) =>
+          stop
+        case s:StompWireFormat =>
+          // this is passed on to us by the protocol discriminator
+          // so we know which wire format is being used.
+        case StompFrame(unknown, _, _) =>
+          die("Unsupported STOMP command: "+unknown);
+        case _ =>
+          die("Unsupported command: "+command);
+      }
+    }  catch {
+      case e:Exception =>
+        die("Unexpected error: "+e);  
     }
   }
 
@@ -259,272 +259,3 @@ class StompProtocolHandler extends Proto
   }
 }
 
-/**
- * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
- */
-class StompWireFormatFactory extends WireFormatFactory {
-  import Stomp.Commands.CONNECT
-
-    def createWireFormat() = new StompWireFormat();
-
-    def isDiscriminatable() = true
-
-    def maxWireformatHeaderLength() = CONNECT.length+10;
-
-    def matchesWireformatHeader(header:Buffer) = {
-        if( header.length < CONNECT.length) {
-          false
-        } else {
-          // the magic can be preceded with newlines / whitespace..
-          header.trimFront.startsWith(CONNECT);
-        }
-    }
-}
-
-object StompWireFormat {
-    val READ_BUFFFER_SIZE = 1024*64;
-    val MAX_COMMAND_LENGTH = 1024;
-    val MAX_HEADER_LENGTH = 1024 * 10;
-    val MAX_HEADERS = 1000;
-    val MAX_DATA_LENGTH = 1024 * 1024 * 100;
-    val TRIM=false
-    val SIZE_CHECK=false
-  }
-
-class StompWireFormat extends WireFormat {
-  import StompWireFormat._
-
-  implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
-  implicit def wrap(x: Byte) = {
-    ByteBuffer.wrap(Array(x));
-  }
-
-  def marshal(command:Any, os:DataOutput) = {
-    val frame = command.asInstanceOf[StompFrame]
-    frame.action.writeTo(os)
-    os.write(NEWLINE)
-
-    // we can optimize a little if the headers and content are in the same buffer..
-    if( !frame.headers.isEmpty && !frame.content.isEmpty &&
-            ( frame.headers.head._1.data eq frame.content.data ) ) {
-
-      val offset = frame.headers.head._1.offset;
-      val buffer1 = frame.headers.head._1;
-      val buffer2 = frame.content;
-      val length = (buffer2.offset-buffer1.offset)+buffer2.length
-      os.write( buffer1.data, offset, length)
-
-    } else {
-      for( (key, value) <- frame.headers ) {
-        key.writeTo(os)
-        os.write(SEPERATOR)
-        value.writeTo(os)
-        os.write(NEWLINE)
-      }
-      os.write(NEWLINE)
-      frame.content.writeTo(os)
-    }
-    END_OF_FRAME_BUFFER.writeTo(os)
-  }
-
-  def marshal(command:Any):Buffer= {
-    val frame = command.asInstanceOf[StompFrame]
-    // make a little bigger since size can be an estimate and we want to avoid
-    // a capacity re-size.
-    val os = new DataByteArrayOutputStream(frame.size + 100);
-    marshal(frame, os)
-    os.toBuffer
-  }
-
-  def unmarshal(packet:Buffer) = {
-    throw new UnsupportedOperationException
-  }
-  def unmarshal(in: DataInput):Object = {
-    throw new UnsupportedOperationException
-  }
-
-  def getName() = "stomp"
-
-  def getWireFormatFactory() = new StompWireFormatFactory
-
-  //
-  // state associated with un-marshalling stomp frames from
-  // a non-blocking NIO channel.
-  //
-  def createUnmarshalSession() = new StompUnmarshalSession
-
-  class StompUnmarshalSession extends UnmarshalSession {
-
-    type FrameReader = (ByteBuffer)=>StompFrame
-
-    var next_action:FrameReader = read_action
-    var end = 0
-    var start = 0
-
-    def getStartPos() = start
-    def setStartPos(pos:Int):Unit = {start=pos}
-
-    def getEndPos() = end
-    def setEndPos(pos:Int):Unit = { end = pos }
-
-    def unmarshal(buffer:ByteBuffer):Object = {
-      // keep running the next action until
-      // a frame is decoded or we run out of input
-      var rc:StompFrame = null
-      while( rc == null && end!=buffer.position ) {
-        rc = next_action(buffer)
-      }
-      rc
-    }
-
-    def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
-        val read_limit = buffer.position
-        while( end < read_limit ) {
-          if( buffer.array()(end) =='\n') {
-            var rc = new Buffer(buffer.array, start, end-start)
-            end += 1;
-            start = end;
-            return rc
-          }
-          if (SIZE_CHECK && end-start > maxLength) {
-              throw new IOException(errorMessage);
-          }
-          end += 1;
-        }
-        return null;
-    }
-
-    def read_action:FrameReader = (buffer)=> {
-      val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
-      if( line !=null ) {
-        var action = line
-        if( TRIM ) {
-            action = action.trim();
-        }
-        if (action.length() > 0) {
-            next_action = read_headers(action)
-        }
-      }
-      null
-    }
-
-    def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
-      val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
-      if( line !=null ) {
-        if( line.trim().length() > 0 ) {
-
-          if (SIZE_CHECK && headers.size > MAX_HEADERS) {
-              throw new IOException("The maximum number of headers was exceeded");
-          }
-
-          try {
-              val seperatorIndex = line.indexOf(SEPERATOR);
-              if( seperatorIndex<0 ) {
-                  throw new IOException("Header line missing seperator [" + ascii(line) + "]");
-              }
-              var name = line.slice(0, seperatorIndex);
-              if( TRIM ) {
-                  name = name.trim();
-              }
-              var value = line.slice(seperatorIndex + 1, line.length());
-              if( TRIM ) {
-                  value = value.trim();
-              }
-              headers.add((ascii(name), ascii(value)))
-          } catch {
-              case e:Exception=>
-                e.printStackTrace
-                throw new IOException("Unable to parser header line [" + line + "]");
-          }
-
-        } else {
-          val contentLength = get(headers, CONTENT_LENGTH)
-          if (contentLength.isDefined) {
-            // Bless the client, he's telling us how much data to read in.
-            var length=0;
-            try {
-                length = Integer.parseInt(contentLength.get.trim().toString());
-            } catch {
-              case e:NumberFormatException=>
-                throw new IOException("Specified content-length is not a valid integer");
-            }
-
-            if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
-                throw new IOException("The maximum data length was exceeded");
-            }
-            next_action = read_binary_body(action, headers, length)
-
-          } else {
-            next_action = read_text_body(action, headers)
-          }
-        }
-      }
-      null
-    }
-
-    def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
-      val i = headers.iterator
-      while( i.hasNext ) {
-        val entry = i.next
-        if( entry._1 == name ) {
-          return Some(entry._2)
-        }
-      }
-      None
-    }
-
-
-    def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
-      val content:Buffer=read_content(buffer, contentLength)
-      if( content != null ) {
-        next_action = read_action
-        new StompFrame(ascii(action), headers.toList, content)
-      } else {
-        null
-      }
-    }
-
-
-    def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
-        val read_limit = buffer.position
-        if( (read_limit-start) < contentLength+1 ) {
-          end = read_limit;
-          null
-        } else {
-          if( buffer.array()(start+contentLength)!= 0 ) {
-             throw new IOException("Exected null termintor after "+contentLength+" content bytes");
-          }
-          var rc = new Buffer(buffer.array, start, contentLength)
-          end = start+contentLength+1;
-          start = end;
-          rc;
-        }
-    }
-
-    def read_to_null(buffer:ByteBuffer):Buffer = {
-        val read_limit = buffer.position
-        while( end < read_limit ) {
-          if( buffer.array()(end) ==0) {
-            var rc = new Buffer(buffer.array, start, end-start)
-            end += 1;
-            start = end;
-            return rc;
-          }
-          end += 1;
-        }
-        return null;
-    }
-
-
-    def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
-      val content:Buffer=read_to_null(buffer)
-      if( content != null ) {
-        next_action = read_action
-        new StompFrame(ascii(action), headers.toList, content)
-      } else {
-        null
-      }
-    }
-  }
-
-}

Added: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961072&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 03:42:58 2010
@@ -0,0 +1,303 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp
+
+import _root_.java.io.{DataOutput, DataInput, IOException}
+import _root_.org.apache.activemq.apollo.broker._
+
+import _root_.org.apache.activemq.wireformat.{WireFormatFactory, WireFormat}
+import java.nio.ByteBuffer
+import _root_.org.apache.activemq.util.buffer._
+import collection.mutable.{ListBuffer, HashMap}
+import AsciiBuffer._
+import Stomp._
+import Stomp.Headers._
+
+import BufferConversions._
+import _root_.scala.collection.JavaConversions._
+import StompFrameConstants._;
+
+
+/**
+ * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
+ */
+class StompWireFormatFactory extends WireFormatFactory {
+  import Stomp.Commands.CONNECT
+
+    def createWireFormat() = new StompWireFormat();
+
+    def isDiscriminatable() = true
+
+    def maxWireformatHeaderLength() = CONNECT.length+10;
+
+    def matchesWireformatHeader(header:Buffer) = {
+        if( header.length < CONNECT.length) {
+          false
+        } else {
+          // the magic can be preceded with newlines / whitespace..
+          header.trimFront.startsWith(CONNECT);
+        }
+    }
+}
+
+object StompWireFormat {
+    val READ_BUFFFER_SIZE = 1024*64;
+    val MAX_COMMAND_LENGTH = 1024;
+    val MAX_HEADER_LENGTH = 1024 * 10;
+    val MAX_HEADERS = 1000;
+    val MAX_DATA_LENGTH = 1024 * 1024 * 100;
+    val TRIM=false
+    val SIZE_CHECK=false
+  }
+
+class StompWireFormat extends WireFormat {
+  import StompWireFormat._
+
+  implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
+  implicit def wrap(x: Byte) = {
+    ByteBuffer.wrap(Array(x));
+  }
+
+  def marshal(command:Any, os:DataOutput) = {
+    val frame = command.asInstanceOf[StompFrame]
+    frame.action.writeTo(os)
+    os.write(NEWLINE)
+
+    // we can optimize a little if the headers and content are in the same buffer..
+    if( !frame.headers.isEmpty && !frame.content.isEmpty &&
+            ( frame.headers.head._1.data eq frame.content.data ) ) {
+
+      val offset = frame.headers.head._1.offset;
+      val buffer1 = frame.headers.head._1;
+      val buffer2 = frame.content;
+      val length = (buffer2.offset-buffer1.offset)+buffer2.length
+      os.write( buffer1.data, offset, length)
+
+    } else {
+      for( (key, value) <- frame.headers ) {
+        key.writeTo(os)
+        os.write(SEPERATOR)
+        value.writeTo(os)
+        os.write(NEWLINE)
+      }
+      os.write(NEWLINE)
+      frame.content.writeTo(os)
+    }
+    END_OF_FRAME_BUFFER.writeTo(os)
+  }
+
+  def marshal(command:Any):Buffer= {
+    val frame = command.asInstanceOf[StompFrame]
+    // make a little bigger since size can be an estimate and we want to avoid
+    // a capacity re-size.
+    val os = new DataByteArrayOutputStream(frame.size + 100);
+    marshal(frame, os)
+    os.toBuffer
+  }
+
+  def unmarshal(packet:Buffer) = {
+    throw new UnsupportedOperationException
+  }
+  def unmarshal(in: DataInput):Object = {
+    throw new UnsupportedOperationException
+  }
+
+  def getName() = "stomp"
+
+  def getWireFormatFactory() = new StompWireFormatFactory
+
+  //
+  // state associated with un-marshalling stomp frames from
+  // a non-blocking NIO channel.
+  //
+  def createUnmarshalSession() = new StompUnmarshalSession
+
+  class StompUnmarshalSession extends UnmarshalSession {
+
+    type FrameReader = (ByteBuffer)=>StompFrame
+
+    var next_action:FrameReader = read_action
+    var end = 0
+    var start = 0
+
+    def getStartPos() = start
+    def setStartPos(pos:Int):Unit = {start=pos}
+
+    def getEndPos() = end
+    def setEndPos(pos:Int):Unit = { end = pos }
+
+    def unmarshal(buffer:ByteBuffer):Object = {
+      // keep running the next action until
+      // a frame is decoded or we run out of input
+      var rc:StompFrame = null
+      while( rc == null && end!=buffer.position ) {
+        rc = next_action(buffer)
+      }
+      rc
+    }
+
+    def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
+        val read_limit = buffer.position
+        while( end < read_limit ) {
+          if( buffer.array()(end) =='\n') {
+            var rc = new Buffer(buffer.array, start, end-start)
+            end += 1;
+            start = end;
+            return rc
+          }
+          if (SIZE_CHECK && end-start > maxLength) {
+              throw new IOException(errorMessage);
+          }
+          end += 1;
+        }
+        return null;
+    }
+
+    def read_action:FrameReader = (buffer)=> {
+      val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
+      if( line !=null ) {
+        var action = line
+        if( TRIM ) {
+            action = action.trim();
+        }
+        if (action.length() > 0) {
+            next_action = read_headers(action)
+        }
+      }
+      null
+    }
+
+    def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
+      val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
+      if( line !=null ) {
+        if( line.trim().length() > 0 ) {
+
+          if (SIZE_CHECK && headers.size > MAX_HEADERS) {
+              throw new IOException("The maximum number of headers was exceeded");
+          }
+
+          try {
+              val seperatorIndex = line.indexOf(SEPERATOR);
+              if( seperatorIndex<0 ) {
+                  throw new IOException("Header line missing seperator [" + ascii(line) + "]");
+              }
+              var name = line.slice(0, seperatorIndex);
+              if( TRIM ) {
+                  name = name.trim();
+              }
+              var value = line.slice(seperatorIndex + 1, line.length());
+              if( TRIM ) {
+                  value = value.trim();
+              }
+              headers.add((ascii(name), ascii(value)))
+          } catch {
+              case e:Exception=>
+                e.printStackTrace
+                throw new IOException("Unable to parser header line [" + line + "]");
+          }
+
+        } else {
+          val contentLength = get(headers, CONTENT_LENGTH)
+          if (contentLength.isDefined) {
+            // Bless the client, he's telling us how much data to read in.
+            var length=0;
+            try {
+                length = Integer.parseInt(contentLength.get.trim().toString());
+            } catch {
+              case e:NumberFormatException=>
+                throw new IOException("Specified content-length is not a valid integer");
+            }
+
+            if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
+                throw new IOException("The maximum data length was exceeded");
+            }
+            next_action = read_binary_body(action, headers, length)
+
+          } else {
+            next_action = read_text_body(action, headers)
+          }
+        }
+      }
+      null
+    }
+
+    def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
+      val i = headers.iterator
+      while( i.hasNext ) {
+        val entry = i.next
+        if( entry._1 == name ) {
+          return Some(entry._2)
+        }
+      }
+      None
+    }
+
+
+    def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
+      val content:Buffer=read_content(buffer, contentLength)
+      if( content != null ) {
+        next_action = read_action
+        new StompFrame(ascii(action), headers.toList, content)
+      } else {
+        null
+      }
+    }
+
+
+    def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
+        val read_limit = buffer.position
+        if( (read_limit-start) < contentLength+1 ) {
+          end = read_limit;
+          null
+        } else {
+          if( buffer.array()(start+contentLength)!= 0 ) {
+             throw new IOException("Exected null termintor after "+contentLength+" content bytes");
+          }
+          var rc = new Buffer(buffer.array, start, contentLength)
+          end = start+contentLength+1;
+          start = end;
+          rc;
+        }
+    }
+
+    def read_to_null(buffer:ByteBuffer):Buffer = {
+        val read_limit = buffer.position
+        while( end < read_limit ) {
+          if( buffer.array()(end) ==0) {
+            var rc = new Buffer(buffer.array, start, end-start)
+            end += 1;
+            start = end;
+            return rc;
+          }
+          end += 1;
+        }
+        return null;
+    }
+
+
+    def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
+      val content:Buffer=read_to_null(buffer)
+      if( content != null ) {
+        next_action = read_action
+        new StompFrame(ascii(action), headers.toList, content)
+      } else {
+        null
+      }
+    }
+  }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=961072&r1=961071&r2=961072&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Wed Jul  7 03:42:58 2010
@@ -108,19 +108,35 @@ public class MultiWireFormatFactory impl
                 UnmarshalSession session;
 
                 public int getStartPos() {
-                    return start;
+                    if( session!=null ) {
+                        return session.getStartPos();
+                    } else {
+                        return start;
+                    }
                 }
 
                 public void setStartPos(int pos) {
-                    start=pos;
+                    if( session!=null ) {
+                        session.setEndPos(pos);
+                    } else {
+                        start=pos;
+                    }
                 }
 
                 public int getEndPos() {
-                    return end;
+                    if( session!=null ) {
+                        return session.getEndPos();
+                    } else {
+                        return end;
+                    }
                 }
 
                 public void setEndPos(int pos) {
-                    end = pos;
+                    if( session!=null ) {
+                        session.setEndPos(pos);
+                    } else {
+                        end = pos;
+                    }
                 }
 
                 public Object unmarshal(ByteBuffer buffer) throws IOException {