You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:46:28 UTC

svn commit: r961082 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apol...

Author: chirino
Date: Wed Jul  7 03:46:27 2010
New Revision: 961082

URL: http://svn.apache.org/viewvc?rev=961082&view=rev
Log:
shuffling stuff around to get 'mvn instal -Dtest=false' to work
simplified wireformat.. wireformats are now stateful.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
      - copied, changed from r961081, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
      - copied, changed from r961081, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:46:27 2010
@@ -17,7 +17,7 @@
 package org.apache.activemq.apollo.broker
 
 import _root_.java.io.{File}
-import _root_.java.util.{LinkedList, LinkedHashMap, ArrayList, HashMap}
+import _root_.java.util.{LinkedList, LinkedHashMap, ArrayList}
 import _root_.org.apache.activemq.transport._
 import _root_.org.apache.activemq.Service
 import _root_.java.lang.{String}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:46:27 2010
@@ -18,12 +18,11 @@ package org.apache.activemq.apollo.broke
 
 import _root_.java.beans.ExceptionListener
 import _root_.java.io.{IOException}
-import _root_.java.util.{LinkedHashMap, HashMap}
 import _root_.org.apache.activemq.filter.{BooleanExpression}
 import _root_.org.apache.activemq.transport._
 import _root_.org.apache.activemq.Service
 import _root_.java.lang.{String}
-import _root_.org.apache.activemq.util.{FactoryFinder, IOExceptionSupport}
+import _root_.org.apache.activemq.util.{FactoryFinder}
 import _root_.org.apache.activemq.wireformat.WireFormat
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 import java.util.concurrent.atomic.AtomicLong

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:46:27 2010
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.java.util.{LinkedList, LinkedHashMap, HashMap}
+import _root_.java.util.{LinkedList}
 import _root_.org.apache.activemq.filter.{MessageEvaluationContext}
 import _root_.java.lang.{String}
 import _root_.org.apache.activemq.util.buffer.{Buffer, AsciiBuffer}

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java?rev=961082&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java Wed Jul  7 03:46:27 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+/**
+ */
+public interface Destination {
+  AsciiBuffer getDomain();
+  AsciiBuffer getName();
+  Destination[] getDestinations();
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala (from r961081, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala&r1=961081&r2=961082&rev=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala Wed Jul  7 03:46:27 2010
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.java.util.{LinkedHashMap, HashMap}
 import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
 import BufferConversions._
 
@@ -28,12 +27,6 @@ class ParserOptions {
   var tempTopicPrefix:AsciiBuffer = null
 }
 
-trait Destination {
-  def getDomain(): AsciiBuffer
-  def getName(): AsciiBuffer
-  def getDestinations():Seq[Destination]
-}
-
 object DestinationParser {
 
     /**
@@ -79,15 +72,15 @@ object DestinationParser {
 
         if( value.contains(compositeSeparator) ) {
             var rc = value.split(compositeSeparator);
-            var md = new MultiDestination();
+            var dl:List[Destination] = Nil
             for (buffer <- rc) {
               val d = parse(buffer, options)
               if( d==null ) {
                 return null;
               }
-              md.destinations = md.destinations ::: d :: Nil
+              dl = dl ::: d :: Nil
             }
-            return md;
+            return new MultiDestination(dl.toArray[Destination]);
         }
         return parse(value, options);
     }
@@ -95,15 +88,15 @@ object DestinationParser {
 
 case class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
 
-  def getDestinations():Seq[Destination] = null;
+  def getDestinations():Array[Destination] = null;
   def getDomain():AsciiBuffer = domain
   def getName():AsciiBuffer = name
 
   override def toString() = ""+domain+":"+name
 }
-case class MultiDestination(var destinations:List[Destination]=Nil) extends Destination {
+case class MultiDestination(var destinations:Array[Destination]) extends Destination {
 
-  def getDestinations():Seq[Destination] = destinations;
+  def getDestinations():Array[Destination] = destinations;
   def getDomain():AsciiBuffer = null
   def getName():AsciiBuffer = null
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (from r961081, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala&r1=961081&r2=961082&rev=961082&view=diff
==============================================================================
    (empty)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Wed Jul  7 03:46:27 2010
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.java.util.{ArrayList}
 import _root_.org.apache.activemq.filter.{FilterException, BooleanExpression}
-import path.{PathFilter}
 import _root_.scala.collection.JavaConversions._
+import path.PathFilter
 
 trait BrokerSubscription {
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 03:46:27 2010
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.apollo.broker;
 
-import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.java.util.{ArrayList, HashMap}
 import _root_.org.apache.activemq.broker.store.memory.MemoryStore
 import _root_.org.apache.activemq.broker.store.{Store}
 import _root_.org.apache.activemq.Service
@@ -25,8 +25,7 @@ import _root_.org.apache.activemq.util.b
 import _root_.org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue}
 import _root_.scala.collection.JavaConversions._
 import _root_.scala.reflect.BeanProperty
-
-import path.{PathFilter}
+import path.PathFilter
 
 object VirtualHost extends Log
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul  7 03:46:27 2010
@@ -16,23 +16,21 @@
  */
 package org.apache.activemq.apollo.broker.perf
 
-import _root_.java.beans.ExceptionListener
-import _root_.java.net.URI
 import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import _root_.java.util.concurrent.TimeUnit
-import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
-import _root_.org.apache.activemq.apollo.broker._
-import _root_.org.apache.activemq.broker.store.{StoreFactory, Store}
 import _root_.org.apache.activemq.metric.{Period, MetricAggregator, MetricCounter}
 import _root_.java.lang.{String}
-import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
 import _root_.org.junit.{Test, Before}
 
 import org.apache.activemq.transport.TransportFactory
 
 import _root_.scala.collection.JavaConversions._
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import java.io.{IOException, File}
+import org.apache.activemq.apollo.broker._
+import org.apache.activemq.util.buffer.AsciiBuffer
+import org.apache.activemq.broker.store.{Store, StoreFactory}
+import java.io.{File, IOException}
+import java.util.concurrent.TimeUnit
+import java.util.ArrayList
 
 
 abstract class RemoteConsumer extends Connection {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 03:46:27 2010
@@ -17,10 +17,10 @@
 package org.apache.activemq.apollo.stomp
 
 import _root_.java.util.LinkedList
-import _root_.org.apache.activemq.apollo.broker.{BufferConversions, Destination, Message}
 import _root_.org.apache.activemq.filter.{Expression, MessageEvaluationContext}
 import _root_.org.apache.activemq.util.buffer._
 import collection.mutable.ListBuffer
+import org.apache.activemq.apollo.broker.{Destination, BufferConversions, Message}
 
 /**
  *

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:46:27 2010
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.stomp
 
-import _root_.org.apache.activemq.apollo.broker._
-
 import _root_.org.apache.activemq.wireformat.{WireFormat}
 import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
 import _root_.org.apache.activemq.util.buffer._
@@ -25,6 +23,7 @@ import collection.mutable.{ListBuffer, H
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
 import AsciiBuffer._
+import org.apache.activemq.apollo.broker._
 import Stomp._
 import BufferConversions._
 import StompFrameConstants._

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 03:46:27 2010
@@ -122,187 +122,180 @@ class StompWireFormat extends WireFormat
 
   def getName() = "stomp"
 
-  def getWireFormatFactory() = new StompWireFormatFactory
-
   //
   // state associated with un-marshalling stomp frames from
-  // a non-blocking NIO channel.
+  // with the  unmarshalNB method.
   //
-  def createUnmarshalSession() = new StompUnmarshalSession
-
-  class StompUnmarshalSession extends UnmarshalSession {
-
-    type FrameReader = (ByteBuffer)=>StompFrame
-
-    var next_action:FrameReader = read_action
-    var end = 0
-    var start = 0
+  type FrameReader = (ByteBuffer)=>StompFrame
 
-    def getStartPos() = start
-    def setStartPos(pos:Int):Unit = {start=pos}
-
-    def getEndPos() = end
-    def setEndPos(pos:Int):Unit = { end = pos }
-
-    def unmarshal(buffer:ByteBuffer):Object = {
-      // keep running the next action until
-      // a frame is decoded or we run out of input
-      var rc:StompFrame = null
-      while( rc == null && end!=buffer.position ) {
-        rc = next_action(buffer)
-      }
+  var next_action:FrameReader = read_action
+  var end = 0
+  var start = 0
+
+  def unmarshalStartPos() = start
+  def unmarshalStartPos(pos:Int):Unit = {start=pos}
+
+  def unmarshalEndPos() = end
+  def unmarshalEndPos(pos:Int):Unit = { end = pos }
+
+  def unmarshalNB(buffer:ByteBuffer):Object = {
+    // keep running the next action until
+    // a frame is decoded or we run out of input
+    var rc:StompFrame = null
+    while( rc == null && end!=buffer.position ) {
+      rc = next_action(buffer)
+    }
 
 //      trace("unmarshalled: "+rc+", start: "+start+", end: "+end+", buffer position: "+buffer.position)
-      rc
-    }
+    rc
+  }
 
-    def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
-        val read_limit = buffer.position
-        while( end < read_limit ) {
-          if( buffer.array()(end) =='\n') {
-            var rc = new Buffer(buffer.array, start, end-start)
-            end += 1;
-            start = end;
-            return rc
-          }
-          if (SIZE_CHECK && end-start > maxLength) {
-              throw new IOException(errorMessage);
-          }
+  def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
+      val read_limit = buffer.position
+      while( end < read_limit ) {
+        if( buffer.array()(end) =='\n') {
+          var rc = new Buffer(buffer.array, start, end-start)
           end += 1;
+          start = end;
+          return rc
         }
-        return null;
-    }
-
-    def read_action:FrameReader = (buffer)=> {
-      val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
-      if( line !=null ) {
-        var action = line
-        if( TRIM ) {
-            action = action.trim();
-        }
-        if (action.length() > 0) {
-            next_action = read_headers(action)
+        if (SIZE_CHECK && end-start > maxLength) {
+            throw new IOException(errorMessage);
         }
+        end += 1;
+      }
+      return null;
+  }
+
+  def read_action:FrameReader = (buffer)=> {
+    val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
+    if( line !=null ) {
+      var action = line
+      if( TRIM ) {
+          action = action.trim();
+      }
+      if (action.length() > 0) {
+          next_action = read_headers(action)
       }
-      null
     }
+    null
+  }
 
-    def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
-      val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
-      if( line !=null ) {
-        if( line.trim().length() > 0 ) {
+  def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
+    val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
+    if( line !=null ) {
+      if( line.trim().length() > 0 ) {
+
+        if (SIZE_CHECK && headers.size > MAX_HEADERS) {
+            throw new IOException("The maximum number of headers was exceeded");
+        }
 
-          if (SIZE_CHECK && headers.size > MAX_HEADERS) {
-              throw new IOException("The maximum number of headers was exceeded");
-          }
+        try {
+            val seperatorIndex = line.indexOf(SEPERATOR);
+            if( seperatorIndex<0 ) {
+                throw new IOException("Header line missing seperator [" + ascii(line) + "]");
+            }
+            var name = line.slice(0, seperatorIndex);
+            if( TRIM ) {
+                name = name.trim();
+            }
+            var value = line.slice(seperatorIndex + 1, line.length());
+            if( TRIM ) {
+                value = value.trim();
+            }
+            headers.add((ascii(name), ascii(value)))
+        } catch {
+            case e:Exception=>
+              e.printStackTrace
+              throw new IOException("Unable to parser header line [" + line + "]");
+        }
 
+      } else {
+        val contentLength = get(headers, CONTENT_LENGTH)
+        if (contentLength.isDefined) {
+          // Bless the client, he's telling us how much data to read in.
+          var length=0;
           try {
-              val seperatorIndex = line.indexOf(SEPERATOR);
-              if( seperatorIndex<0 ) {
-                  throw new IOException("Header line missing seperator [" + ascii(line) + "]");
-              }
-              var name = line.slice(0, seperatorIndex);
-              if( TRIM ) {
-                  name = name.trim();
-              }
-              var value = line.slice(seperatorIndex + 1, line.length());
-              if( TRIM ) {
-                  value = value.trim();
-              }
-              headers.add((ascii(name), ascii(value)))
+              length = Integer.parseInt(contentLength.get.trim().toString());
           } catch {
-              case e:Exception=>
-                e.printStackTrace
-                throw new IOException("Unable to parser header line [" + line + "]");
+            case e:NumberFormatException=>
+              throw new IOException("Specified content-length is not a valid integer");
           }
 
-        } else {
-          val contentLength = get(headers, CONTENT_LENGTH)
-          if (contentLength.isDefined) {
-            // Bless the client, he's telling us how much data to read in.
-            var length=0;
-            try {
-                length = Integer.parseInt(contentLength.get.trim().toString());
-            } catch {
-              case e:NumberFormatException=>
-                throw new IOException("Specified content-length is not a valid integer");
-            }
-
-            if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
-                throw new IOException("The maximum data length was exceeded");
-            }
-            next_action = read_binary_body(action, headers, length)
-
-          } else {
-            next_action = read_text_body(action, headers)
+          if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
+              throw new IOException("The maximum data length was exceeded");
           }
+          next_action = read_binary_body(action, headers, length)
+
+        } else {
+          next_action = read_text_body(action, headers)
         }
       }
-      null
     }
+    null
+  }
 
-    def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
-      val i = headers.iterator
-      while( i.hasNext ) {
-        val entry = i.next
-        if( entry._1 == name ) {
-          return Some(entry._2)
-        }
+  def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
+    val i = headers.iterator
+    while( i.hasNext ) {
+      val entry = i.next
+      if( entry._1 == name ) {
+        return Some(entry._2)
       }
-      None
     }
+    None
+  }
 
 
-    def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
-      val content:Buffer=read_content(buffer, contentLength)
-      if( content != null ) {
-        next_action = read_action
-        new StompFrame(ascii(action), headers.toList, content)
-      } else {
-        null
-      }
+  def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
+    val content:Buffer=read_content(buffer, contentLength)
+    if( content != null ) {
+      next_action = read_action
+      new StompFrame(ascii(action), headers.toList, content)
+    } else {
+      null
     }
+  }
 
 
-    def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
-        val read_limit = buffer.position
-        if( (read_limit-start) < contentLength+1 ) {
-          end = read_limit;
-          null
-        } else {
-          if( buffer.array()(start+contentLength)!= 0 ) {
-             throw new IOException("Exected null termintor after "+contentLength+" content bytes");
-          }
-          var rc = new Buffer(buffer.array, start, contentLength)
-          end = start+contentLength+1;
-          start = end;
-          rc;
+  def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
+      val read_limit = buffer.position
+      if( (read_limit-start) < contentLength+1 ) {
+        end = read_limit;
+        null
+      } else {
+        if( buffer.array()(start+contentLength)!= 0 ) {
+           throw new IOException("Exected null termintor after "+contentLength+" content bytes");
         }
-    }
+        var rc = new Buffer(buffer.array, start, contentLength)
+        end = start+contentLength+1;
+        start = end;
+        rc;
+      }
+  }
 
-    def read_to_null(buffer:ByteBuffer):Buffer = {
-        val read_limit = buffer.position
-        while( end < read_limit ) {
-          if( buffer.array()(end) ==0) {
-            var rc = new Buffer(buffer.array, start, end-start)
-            end += 1;
-            start = end;
-            return rc;
-          }
+  def read_to_null(buffer:ByteBuffer):Buffer = {
+      val read_limit = buffer.position
+      while( end < read_limit ) {
+        if( buffer.array()(end) ==0) {
+          var rc = new Buffer(buffer.array, start, end-start)
           end += 1;
+          start = end;
+          return rc;
         }
-        return null;
-    }
+        end += 1;
+      }
+      return null;
+  }
 
 
-    def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
-      val content:Buffer=read_to_null(buffer)
-      if( content != null ) {
-        next_action = read_action
-        new StompFrame(ascii(action), headers.toList, content)
-      } else {
-        null
-      }
+  def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
+    val content:Buffer=read_to_null(buffer)
+    if( content != null ) {
+      next_action = read_action
+      new StompFrame(ascii(action), headers.toList, content)
+    } else {
+      null
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:46:27 2010
@@ -44,7 +44,6 @@ import static org.apache.activemq.transp
  */
 public class TcpTransport implements Transport {
     private Map<String, Object> socketOptions;
-    private WireFormat.UnmarshalSession unmarshalSession;
 
     enum SocketState {
         CONNECTING,
@@ -131,8 +130,6 @@ public class TcpTransport implements Tra
         }
         transportState = RUNNING;
 
-        unmarshalSession = wireformat.createUnmarshalSession();
-
         if (socketState == CONNECTING) {
             channel = SocketChannel.open();
         }
@@ -260,7 +257,7 @@ public class TcpTransport implements Tra
             setDispatchQueue(null);
             next_outbound_buffer = null;
             outbound_buffer = null;
-            unmarshalSession = null;
+            this.wireformat = null;
         }
     }
 
@@ -369,25 +366,25 @@ public class TcpTransport implements Tra
         while (true) {
 
             // do we need to read in more data???
-            if (unmarshalSession.getEndPos() == readBuffer.position()) {
+            if (this.wireformat.unmarshalEndPos() == readBuffer.position()) {
 
                 // do we need a new data buffer to read data into??
                 if (readBuffer.remaining() == 0) {
 
                     // double the capacity size if needed...
-                    int new_capacity = unmarshalSession.getStartPos() != 0 ? bufferSize : readBuffer.capacity() << 2;
+                    int new_capacity = this.wireformat.unmarshalStartPos() != 0 ? bufferSize : readBuffer.capacity() << 2;
                     byte[] new_buffer = new byte[new_capacity];
 
                     // If there was un-consummed data.. move it to the start of the new buffer.
-                    int size = unmarshalSession.getEndPos() - unmarshalSession.getStartPos();
+                    int size = this.wireformat.unmarshalEndPos() - this.wireformat.unmarshalStartPos();
                     if (size > 0) {
-                        System.arraycopy(readBuffer.array(), unmarshalSession.getStartPos(), new_buffer, 0, size);
+                        System.arraycopy(readBuffer.array(), this.wireformat.unmarshalStartPos(), new_buffer, 0, size);
                     }
 
                     readBuffer = ByteBuffer.wrap(new_buffer);
                     readBuffer.position(size);
-                    unmarshalSession.setStartPos(0);
-                    unmarshalSession.setEndPos(size);
+                    this.wireformat.unmarshalStartPos(0);
+                    this.wireformat.unmarshalEndPos(size);
                 }
 
                 // Try to fill the buffer with data from the socket..
@@ -400,7 +397,7 @@ public class TcpTransport implements Tra
                 }
             }
 
-            Object command = unmarshalSession.unmarshal(readBuffer);
+            Object command = this.wireformat.unmarshalNB(readBuffer);
             if (command != null) {
                 listener.onTransportCommand(command);
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Wed Jul  7 03:46:27 2010
@@ -23,12 +23,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.util.buffer.ByteArrayInputStream;
@@ -53,10 +50,9 @@ public class MultiWireFormatFactory impl
         ArrayList<WireFormatFactory> wireFormatFactories = new ArrayList<WireFormatFactory>();
         WireFormat wireFormat;
         int maxHeaderLength;
+        int start=0;
+        int end=0;
 
-        public int getVersion() {
-            return 0;
-        }
 
         private ByteArrayInputStream peeked;
 
@@ -102,66 +98,57 @@ public class MultiWireFormatFactory impl
             return rc;
         }
 
-        public UnmarshalSession createUnmarshalSession() {
-            return new UnmarshalSession() {
-                int start=0;
-                int end=0;
-                UnmarshalSession session;
-
-                public int getStartPos() {
-                    if( session!=null ) {
-                        return session.getStartPos();
-                    } else {
-                        return start;
-                    }
-                }
+        public int unmarshalStartPos() {
+            if( wireFormat!=null ) {
+                return wireFormat.unmarshalStartPos();
+            } else {
+                return start;
+            }
+        }
 
-                public void setStartPos(int pos) {
-                    if( session!=null ) {
-                        session.setStartPos(pos);
-                    } else {
-                        start=pos;
-                    }
-                }
+        public void unmarshalStartPos(int pos) {
+            if( wireFormat!=null ) {
+                wireFormat.unmarshalStartPos(pos);
+            } else {
+                start=pos;
+            }
+        }
 
-                public int getEndPos() {
-                    if( session!=null ) {
-                        return session.getEndPos();
-                    } else {
-                        return end;
-                    }
-                }
+        public int unmarshalEndPos() {
+            if( wireFormat!=null ) {
+                return wireFormat.unmarshalEndPos();
+            } else {
+                return end;
+            }
+        }
 
-                public void setEndPos(int pos) {
-                    if( session!=null ) {
-                        session.setEndPos(pos);
-                    } else {
-                        end = pos;
-                    }
-                }
+        public void unmarshalEndPos(int pos) {
+            if( wireFormat!=null ) {
+                wireFormat.unmarshalEndPos(pos);
+            } else {
+                end = pos;
+            }
+        }
 
-                public Object unmarshal(ByteBuffer buffer) throws IOException {
-                    if( session!=null ) {
-                        return session.unmarshal(buffer);
-                    }
+        public Object unmarshalNB(ByteBuffer buffer) throws IOException {
+            if( wireFormat!=null ) {
+                return wireFormat.unmarshalNB(buffer);
+            }
 
-                    Buffer b = new Buffer(buffer.array(), start, buffer.position());
-                    for (WireFormatFactory wff : wireFormatFactories) {
-                        if (wff.matchesWireformatHeader( b )) {
-                            wireFormat = wff.createWireFormat();
-                            session = wireFormat.createUnmarshalSession();
-                            session.setStartPos(start);
-                            session.setEndPos(end);
-                            return wireFormat;
-                        }
-                    }
-                    
-                    if( end >= maxHeaderLength ) {
-                        throw new IOException("Could not discriminate the protocol.");
-                    }
-                    return null;
+            Buffer b = new Buffer(buffer.array(), start, buffer.position());
+            for (WireFormatFactory wff : wireFormatFactories) {
+                if (wff.matchesWireformatHeader( b )) {
+                    wireFormat = wff.createWireFormat();
+                    wireFormat.unmarshalStartPos(start);
+                    wireFormat.unmarshalEndPos(end);
+                    return wireFormat;
                 }
-            };
+            }
+
+            if( end >= maxHeaderLength ) {
+                throw new IOException("Could not discriminate the protocol.");
+            }
+            return null;
         }
 
         public void marshal(Object command, DataOutput out) throws IOException {
@@ -195,10 +182,6 @@ public class MultiWireFormatFactory impl
                 return wireFormat.getName();
             }
         }
-
-		public WireFormatFactory getWireFormatFactory() {
-			return new MultiWireFormatFactory(wireFormatFactories);
-		}
     }
 
     public MultiWireFormatFactory() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Wed Jul  7 03:46:27 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.util.Map;
 
@@ -41,6 +42,7 @@ import org.apache.activemq.util.buffer.B
 public class ObjectStreamWireFormat implements WireFormat {
 
     public static final String WIREFORMAT_NAME = "object";
+    
     public Buffer marshal(Object command) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream ds = new DataOutputStream(baos);
@@ -72,35 +74,28 @@ public class ObjectStreamWireFormat impl
         }
     }
 
-    public UnmarshalSession createUnmarshalSession() {
+    public int unmarshalStartPos() {
         throw new UnsupportedOperationException();
     }
 
-    public Object unmarshal(ReadableByteChannel channel) {
+    public void unmarshalStartPos(int pos) {
         throw new UnsupportedOperationException();
     }
 
-    public void setVersion(int version) {
-    }
-
-    public int getVersion() {
-        return 0;
+    public int unmarshalEndPos() {
+        throw new UnsupportedOperationException();
     }
 
-    public String getName() {
-        return WIREFORMAT_NAME;
+    public void unmarshalEndPos(int pos) {
+        throw new UnsupportedOperationException();
     }
 
-    public boolean inReceive() {
-        // TODO implement the inactivity monitor
-        return false;
+    public Object unmarshalNB(ByteBuffer buffer) throws IOException {
+        throw new UnsupportedOperationException();
     }
 
-    public Transport createTransportFilters(Transport transport, Map options) {
-        return transport;
+    public String getName() {
+        return WIREFORMAT_NAME;
     }
 
-	public WireFormatFactory getWireFormatFactory() {
-		return new ObjectStreamWireFormatFactory();
-	}
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java Wed Jul  7 03:46:27 2010
@@ -37,6 +37,11 @@ import org.apache.activemq.util.buffer.B
 public interface WireFormat {
 
     /**
+     * @return The name of the wireformat
+     */
+    String getName();
+
+    /**
      * Packet based marshaling 
      */
     Buffer marshal(Object command) throws IOException;
@@ -56,31 +61,18 @@ public interface WireFormat {
      */
     Object unmarshal(DataInput in) throws IOException;
 
-    /**
-      * For a unmarshal session is used for non-blocking
-      * unmarshalling.
-      */
-     interface UnmarshalSession {
-        int getStartPos();
-        void setStartPos(int pos);
-
-        int getEndPos();
-        void setEndPos(int pos);
-
-        Object unmarshal(ByteBuffer buffer) throws IOException;
-    }
+    int unmarshalStartPos();
+    void unmarshalStartPos(int pos);
 
-    UnmarshalSession createUnmarshalSession();
+    int unmarshalEndPos();
+    void unmarshalEndPos(int pos);
 
     /**
-     * @return The name of the wireformat
+     * For a unmarshal session is used for non-blocking
+     * unmarshalling.
      */
-    String getName();
-    
-    /**
-     * Returns a WireFormatFactory which can create WireFormat of this type.
-     * @return
-     */
-    WireFormatFactory getWireFormatFactory();
+    Object unmarshalNB(ByteBuffer buffer) throws IOException;
+
+
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java Wed Jul  7 03:46:27 2010
@@ -3,6 +3,7 @@ package org.apache.activemq.wireformat.m
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.util.Map;
 
@@ -14,25 +15,11 @@ import org.apache.activemq.wireformat.Wi
 public class MockWireFormatFactory implements WireFormatFactory {
 
     public class MockWireFormat implements WireFormat {
-		public Transport createTransportFilters(Transport transport, Map options) {
-			return transport;
-		}
 
 		public String getName() {
 			return "mock";
 		}
 
-		public int getVersion() {
-			return 0;
-		}
-
-		public boolean inReceive() {
-			return false;
-		}
-
-		public void setVersion(int version) {
-		}
-
 		public Buffer marshal(Object command) throws IOException {
 	        throw new UnsupportedOperationException();
 		}
@@ -49,17 +36,26 @@ public class MockWireFormatFactory imple
 	        throw new UnsupportedOperationException();
 		}
 
-        public UnmarshalSession createUnmarshalSession() {
+        public int unmarshalStartPos() {
             throw new UnsupportedOperationException();
         }
 
-        public Object unmarshal(ReadableByteChannel channel) {
+        public void unmarshalStartPos(int pos) {
+            throw new UnsupportedOperationException();
+        }
+
+        public int unmarshalEndPos() {
+            throw new UnsupportedOperationException();
+        }
+
+        public void unmarshalEndPos(int pos) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Object unmarshalNB(ByteBuffer buffer) throws IOException {
             throw new UnsupportedOperationException();
         }
 
-        public WireFormatFactory getWireFormatFactory() {
-			return new MockWireFormatFactory();
-		}
     }
 
 	public WireFormat createWireFormat() {