You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:46:28 UTC
svn commit: r961082 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/test/scala/org/apache/activemq/apol...
Author: chirino
Date: Wed Jul 7 03:46:27 2010
New Revision: 961082
URL: http://svn.apache.org/viewvc?rev=961082&view=rev
Log:
shuffling stuff around to get 'mvn instal -Dtest=false' to work
simplified wireformat.. wireformats are now stateful.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
- copied, changed from r961081, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
- copied, changed from r961081, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:46:27 2010
@@ -17,7 +17,7 @@
package org.apache.activemq.apollo.broker
import _root_.java.io.{File}
-import _root_.java.util.{LinkedList, LinkedHashMap, ArrayList, HashMap}
+import _root_.java.util.{LinkedList, LinkedHashMap, ArrayList}
import _root_.org.apache.activemq.transport._
import _root_.org.apache.activemq.Service
import _root_.java.lang.{String}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:46:27 2010
@@ -18,12 +18,11 @@ package org.apache.activemq.apollo.broke
import _root_.java.beans.ExceptionListener
import _root_.java.io.{IOException}
-import _root_.java.util.{LinkedHashMap, HashMap}
import _root_.org.apache.activemq.filter.{BooleanExpression}
import _root_.org.apache.activemq.transport._
import _root_.org.apache.activemq.Service
import _root_.java.lang.{String}
-import _root_.org.apache.activemq.util.{FactoryFinder, IOExceptionSupport}
+import _root_.org.apache.activemq.util.{FactoryFinder}
import _root_.org.apache.activemq.wireformat.WireFormat
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import java.util.concurrent.atomic.AtomicLong
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:46:27 2010
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.apollo.broker
-import _root_.java.util.{LinkedList, LinkedHashMap, HashMap}
+import _root_.java.util.{LinkedList}
import _root_.org.apache.activemq.filter.{MessageEvaluationContext}
import _root_.java.lang.{String}
import _root_.org.apache.activemq.util.buffer.{Buffer, AsciiBuffer}
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java?rev=961082&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.java Wed Jul 7 03:46:27 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+/**
+ */
+public interface Destination {
+ AsciiBuffer getDomain();
+ AsciiBuffer getName();
+ Destination[] getDestinations();
+}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala (from r961081, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala&r1=961081&r2=961082&rev=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala Wed Jul 7 03:46:27 2010
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.apollo.broker
-import _root_.java.util.{LinkedHashMap, HashMap}
import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
import BufferConversions._
@@ -28,12 +27,6 @@ class ParserOptions {
var tempTopicPrefix:AsciiBuffer = null
}
-trait Destination {
- def getDomain(): AsciiBuffer
- def getName(): AsciiBuffer
- def getDestinations():Seq[Destination]
-}
-
object DestinationParser {
/**
@@ -79,15 +72,15 @@ object DestinationParser {
if( value.contains(compositeSeparator) ) {
var rc = value.split(compositeSeparator);
- var md = new MultiDestination();
+ var dl:List[Destination] = Nil
for (buffer <- rc) {
val d = parse(buffer, options)
if( d==null ) {
return null;
}
- md.destinations = md.destinations ::: d :: Nil
+ dl = dl ::: d :: Nil
}
- return md;
+ return new MultiDestination(dl.toArray[Destination]);
}
return parse(value, options);
}
@@ -95,15 +88,15 @@ object DestinationParser {
case class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
- def getDestinations():Seq[Destination] = null;
+ def getDestinations():Array[Destination] = null;
def getDomain():AsciiBuffer = domain
def getName():AsciiBuffer = name
override def toString() = ""+domain+":"+name
}
-case class MultiDestination(var destinations:List[Destination]=Nil) extends Destination {
+case class MultiDestination(var destinations:Array[Destination]) extends Destination {
- def getDestinations():Seq[Destination] = destinations;
+ def getDestinations():Array[Destination] = destinations;
def getDomain():AsciiBuffer = null
def getName():AsciiBuffer = null
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (from r961081, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.scala&r1=961081&r2=961082&rev=961082&view=diff
==============================================================================
(empty)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Wed Jul 7 03:46:27 2010
@@ -16,10 +16,10 @@
*/
package org.apache.activemq.apollo.broker
-import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.java.util.{ArrayList}
import _root_.org.apache.activemq.filter.{FilterException, BooleanExpression}
-import path.{PathFilter}
import _root_.scala.collection.JavaConversions._
+import path.PathFilter
trait BrokerSubscription {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 03:46:27 2010
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.apollo.broker;
-import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.java.util.{ArrayList, HashMap}
import _root_.org.apache.activemq.broker.store.memory.MemoryStore
import _root_.org.apache.activemq.broker.store.{Store}
import _root_.org.apache.activemq.Service
@@ -25,8 +25,7 @@ import _root_.org.apache.activemq.util.b
import _root_.org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue}
import _root_.scala.collection.JavaConversions._
import _root_.scala.reflect.BeanProperty
-
-import path.{PathFilter}
+import path.PathFilter
object VirtualHost extends Log
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul 7 03:46:27 2010
@@ -16,23 +16,21 @@
*/
package org.apache.activemq.apollo.broker.perf
-import _root_.java.beans.ExceptionListener
-import _root_.java.net.URI
import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import _root_.java.util.concurrent.TimeUnit
-import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
-import _root_.org.apache.activemq.apollo.broker._
-import _root_.org.apache.activemq.broker.store.{StoreFactory, Store}
import _root_.org.apache.activemq.metric.{Period, MetricAggregator, MetricCounter}
import _root_.java.lang.{String}
-import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
import _root_.org.junit.{Test, Before}
import org.apache.activemq.transport.TransportFactory
import _root_.scala.collection.JavaConversions._
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import java.io.{IOException, File}
+import org.apache.activemq.apollo.broker._
+import org.apache.activemq.util.buffer.AsciiBuffer
+import org.apache.activemq.broker.store.{Store, StoreFactory}
+import java.io.{File, IOException}
+import java.util.concurrent.TimeUnit
+import java.util.ArrayList
abstract class RemoteConsumer extends Connection {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 7 03:46:27 2010
@@ -17,10 +17,10 @@
package org.apache.activemq.apollo.stomp
import _root_.java.util.LinkedList
-import _root_.org.apache.activemq.apollo.broker.{BufferConversions, Destination, Message}
import _root_.org.apache.activemq.filter.{Expression, MessageEvaluationContext}
import _root_.org.apache.activemq.util.buffer._
import collection.mutable.ListBuffer
+import org.apache.activemq.apollo.broker.{Destination, BufferConversions, Message}
/**
*
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:46:27 2010
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.apollo.stomp
-import _root_.org.apache.activemq.apollo.broker._
-
import _root_.org.apache.activemq.wireformat.{WireFormat}
import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
import _root_.org.apache.activemq.util.buffer._
@@ -25,6 +23,7 @@ import collection.mutable.{ListBuffer, H
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import AsciiBuffer._
+import org.apache.activemq.apollo.broker._
import Stomp._
import BufferConversions._
import StompFrameConstants._
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 03:46:27 2010
@@ -122,187 +122,180 @@ class StompWireFormat extends WireFormat
def getName() = "stomp"
- def getWireFormatFactory() = new StompWireFormatFactory
-
//
// state associated with un-marshalling stomp frames from
- // a non-blocking NIO channel.
+ // with the unmarshalNB method.
//
- def createUnmarshalSession() = new StompUnmarshalSession
-
- class StompUnmarshalSession extends UnmarshalSession {
-
- type FrameReader = (ByteBuffer)=>StompFrame
-
- var next_action:FrameReader = read_action
- var end = 0
- var start = 0
+ type FrameReader = (ByteBuffer)=>StompFrame
- def getStartPos() = start
- def setStartPos(pos:Int):Unit = {start=pos}
-
- def getEndPos() = end
- def setEndPos(pos:Int):Unit = { end = pos }
-
- def unmarshal(buffer:ByteBuffer):Object = {
- // keep running the next action until
- // a frame is decoded or we run out of input
- var rc:StompFrame = null
- while( rc == null && end!=buffer.position ) {
- rc = next_action(buffer)
- }
+ var next_action:FrameReader = read_action
+ var end = 0
+ var start = 0
+
+ def unmarshalStartPos() = start
+ def unmarshalStartPos(pos:Int):Unit = {start=pos}
+
+ def unmarshalEndPos() = end
+ def unmarshalEndPos(pos:Int):Unit = { end = pos }
+
+ def unmarshalNB(buffer:ByteBuffer):Object = {
+ // keep running the next action until
+ // a frame is decoded or we run out of input
+ var rc:StompFrame = null
+ while( rc == null && end!=buffer.position ) {
+ rc = next_action(buffer)
+ }
// trace("unmarshalled: "+rc+", start: "+start+", end: "+end+", buffer position: "+buffer.position)
- rc
- }
+ rc
+ }
- def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
- val read_limit = buffer.position
- while( end < read_limit ) {
- if( buffer.array()(end) =='\n') {
- var rc = new Buffer(buffer.array, start, end-start)
- end += 1;
- start = end;
- return rc
- }
- if (SIZE_CHECK && end-start > maxLength) {
- throw new IOException(errorMessage);
- }
+ def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
+ val read_limit = buffer.position
+ while( end < read_limit ) {
+ if( buffer.array()(end) =='\n') {
+ var rc = new Buffer(buffer.array, start, end-start)
end += 1;
+ start = end;
+ return rc
}
- return null;
- }
-
- def read_action:FrameReader = (buffer)=> {
- val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
- if( line !=null ) {
- var action = line
- if( TRIM ) {
- action = action.trim();
- }
- if (action.length() > 0) {
- next_action = read_headers(action)
+ if (SIZE_CHECK && end-start > maxLength) {
+ throw new IOException(errorMessage);
}
+ end += 1;
+ }
+ return null;
+ }
+
+ def read_action:FrameReader = (buffer)=> {
+ val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
+ if( line !=null ) {
+ var action = line
+ if( TRIM ) {
+ action = action.trim();
+ }
+ if (action.length() > 0) {
+ next_action = read_headers(action)
}
- null
}
+ null
+ }
- def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
- val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
- if( line !=null ) {
- if( line.trim().length() > 0 ) {
+ def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
+ val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
+ if( line !=null ) {
+ if( line.trim().length() > 0 ) {
+
+ if (SIZE_CHECK && headers.size > MAX_HEADERS) {
+ throw new IOException("The maximum number of headers was exceeded");
+ }
- if (SIZE_CHECK && headers.size > MAX_HEADERS) {
- throw new IOException("The maximum number of headers was exceeded");
- }
+ try {
+ val seperatorIndex = line.indexOf(SEPERATOR);
+ if( seperatorIndex<0 ) {
+ throw new IOException("Header line missing seperator [" + ascii(line) + "]");
+ }
+ var name = line.slice(0, seperatorIndex);
+ if( TRIM ) {
+ name = name.trim();
+ }
+ var value = line.slice(seperatorIndex + 1, line.length());
+ if( TRIM ) {
+ value = value.trim();
+ }
+ headers.add((ascii(name), ascii(value)))
+ } catch {
+ case e:Exception=>
+ e.printStackTrace
+ throw new IOException("Unable to parser header line [" + line + "]");
+ }
+ } else {
+ val contentLength = get(headers, CONTENT_LENGTH)
+ if (contentLength.isDefined) {
+ // Bless the client, he's telling us how much data to read in.
+ var length=0;
try {
- val seperatorIndex = line.indexOf(SEPERATOR);
- if( seperatorIndex<0 ) {
- throw new IOException("Header line missing seperator [" + ascii(line) + "]");
- }
- var name = line.slice(0, seperatorIndex);
- if( TRIM ) {
- name = name.trim();
- }
- var value = line.slice(seperatorIndex + 1, line.length());
- if( TRIM ) {
- value = value.trim();
- }
- headers.add((ascii(name), ascii(value)))
+ length = Integer.parseInt(contentLength.get.trim().toString());
} catch {
- case e:Exception=>
- e.printStackTrace
- throw new IOException("Unable to parser header line [" + line + "]");
+ case e:NumberFormatException=>
+ throw new IOException("Specified content-length is not a valid integer");
}
- } else {
- val contentLength = get(headers, CONTENT_LENGTH)
- if (contentLength.isDefined) {
- // Bless the client, he's telling us how much data to read in.
- var length=0;
- try {
- length = Integer.parseInt(contentLength.get.trim().toString());
- } catch {
- case e:NumberFormatException=>
- throw new IOException("Specified content-length is not a valid integer");
- }
-
- if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
- throw new IOException("The maximum data length was exceeded");
- }
- next_action = read_binary_body(action, headers, length)
-
- } else {
- next_action = read_text_body(action, headers)
+ if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
+ throw new IOException("The maximum data length was exceeded");
}
+ next_action = read_binary_body(action, headers, length)
+
+ } else {
+ next_action = read_text_body(action, headers)
}
}
- null
}
+ null
+ }
- def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
- val i = headers.iterator
- while( i.hasNext ) {
- val entry = i.next
- if( entry._1 == name ) {
- return Some(entry._2)
- }
+ def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
+ val i = headers.iterator
+ while( i.hasNext ) {
+ val entry = i.next
+ if( entry._1 == name ) {
+ return Some(entry._2)
}
- None
}
+ None
+ }
- def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
- val content:Buffer=read_content(buffer, contentLength)
- if( content != null ) {
- next_action = read_action
- new StompFrame(ascii(action), headers.toList, content)
- } else {
- null
- }
+ def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
+ val content:Buffer=read_content(buffer, contentLength)
+ if( content != null ) {
+ next_action = read_action
+ new StompFrame(ascii(action), headers.toList, content)
+ } else {
+ null
}
+ }
- def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
- val read_limit = buffer.position
- if( (read_limit-start) < contentLength+1 ) {
- end = read_limit;
- null
- } else {
- if( buffer.array()(start+contentLength)!= 0 ) {
- throw new IOException("Exected null termintor after "+contentLength+" content bytes");
- }
- var rc = new Buffer(buffer.array, start, contentLength)
- end = start+contentLength+1;
- start = end;
- rc;
+ def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
+ val read_limit = buffer.position
+ if( (read_limit-start) < contentLength+1 ) {
+ end = read_limit;
+ null
+ } else {
+ if( buffer.array()(start+contentLength)!= 0 ) {
+ throw new IOException("Exected null termintor after "+contentLength+" content bytes");
}
- }
+ var rc = new Buffer(buffer.array, start, contentLength)
+ end = start+contentLength+1;
+ start = end;
+ rc;
+ }
+ }
- def read_to_null(buffer:ByteBuffer):Buffer = {
- val read_limit = buffer.position
- while( end < read_limit ) {
- if( buffer.array()(end) ==0) {
- var rc = new Buffer(buffer.array, start, end-start)
- end += 1;
- start = end;
- return rc;
- }
+ def read_to_null(buffer:ByteBuffer):Buffer = {
+ val read_limit = buffer.position
+ while( end < read_limit ) {
+ if( buffer.array()(end) ==0) {
+ var rc = new Buffer(buffer.array, start, end-start)
end += 1;
+ start = end;
+ return rc;
}
- return null;
- }
+ end += 1;
+ }
+ return null;
+ }
- def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
- val content:Buffer=read_to_null(buffer)
- if( content != null ) {
- next_action = read_action
- new StompFrame(ascii(action), headers.toList, content)
- } else {
- null
- }
+ def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
+ val content:Buffer=read_to_null(buffer)
+ if( content != null ) {
+ next_action = read_action
+ new StompFrame(ascii(action), headers.toList, content)
+ } else {
+ null
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:46:27 2010
@@ -44,7 +44,6 @@ import static org.apache.activemq.transp
*/
public class TcpTransport implements Transport {
private Map<String, Object> socketOptions;
- private WireFormat.UnmarshalSession unmarshalSession;
enum SocketState {
CONNECTING,
@@ -131,8 +130,6 @@ public class TcpTransport implements Tra
}
transportState = RUNNING;
- unmarshalSession = wireformat.createUnmarshalSession();
-
if (socketState == CONNECTING) {
channel = SocketChannel.open();
}
@@ -260,7 +257,7 @@ public class TcpTransport implements Tra
setDispatchQueue(null);
next_outbound_buffer = null;
outbound_buffer = null;
- unmarshalSession = null;
+ this.wireformat = null;
}
}
@@ -369,25 +366,25 @@ public class TcpTransport implements Tra
while (true) {
// do we need to read in more data???
- if (unmarshalSession.getEndPos() == readBuffer.position()) {
+ if (this.wireformat.unmarshalEndPos() == readBuffer.position()) {
// do we need a new data buffer to read data into??
if (readBuffer.remaining() == 0) {
// double the capacity size if needed...
- int new_capacity = unmarshalSession.getStartPos() != 0 ? bufferSize : readBuffer.capacity() << 2;
+ int new_capacity = this.wireformat.unmarshalStartPos() != 0 ? bufferSize : readBuffer.capacity() << 2;
byte[] new_buffer = new byte[new_capacity];
// If there was un-consummed data.. move it to the start of the new buffer.
- int size = unmarshalSession.getEndPos() - unmarshalSession.getStartPos();
+ int size = this.wireformat.unmarshalEndPos() - this.wireformat.unmarshalStartPos();
if (size > 0) {
- System.arraycopy(readBuffer.array(), unmarshalSession.getStartPos(), new_buffer, 0, size);
+ System.arraycopy(readBuffer.array(), this.wireformat.unmarshalStartPos(), new_buffer, 0, size);
}
readBuffer = ByteBuffer.wrap(new_buffer);
readBuffer.position(size);
- unmarshalSession.setStartPos(0);
- unmarshalSession.setEndPos(size);
+ this.wireformat.unmarshalStartPos(0);
+ this.wireformat.unmarshalEndPos(size);
}
// Try to fill the buffer with data from the socket..
@@ -400,7 +397,7 @@ public class TcpTransport implements Tra
}
}
- Object command = unmarshalSession.unmarshal(readBuffer);
+ Object command = this.wireformat.unmarshalNB(readBuffer);
if (command != null) {
listener.onTransportCommand(command);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Wed Jul 7 03:46:27 2010
@@ -23,12 +23,9 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.buffer.Buffer;
import org.apache.activemq.util.buffer.ByteArrayInputStream;
@@ -53,10 +50,9 @@ public class MultiWireFormatFactory impl
ArrayList<WireFormatFactory> wireFormatFactories = new ArrayList<WireFormatFactory>();
WireFormat wireFormat;
int maxHeaderLength;
+ int start=0;
+ int end=0;
- public int getVersion() {
- return 0;
- }
private ByteArrayInputStream peeked;
@@ -102,66 +98,57 @@ public class MultiWireFormatFactory impl
return rc;
}
- public UnmarshalSession createUnmarshalSession() {
- return new UnmarshalSession() {
- int start=0;
- int end=0;
- UnmarshalSession session;
-
- public int getStartPos() {
- if( session!=null ) {
- return session.getStartPos();
- } else {
- return start;
- }
- }
+ public int unmarshalStartPos() {
+ if( wireFormat!=null ) {
+ return wireFormat.unmarshalStartPos();
+ } else {
+ return start;
+ }
+ }
- public void setStartPos(int pos) {
- if( session!=null ) {
- session.setStartPos(pos);
- } else {
- start=pos;
- }
- }
+ public void unmarshalStartPos(int pos) {
+ if( wireFormat!=null ) {
+ wireFormat.unmarshalStartPos(pos);
+ } else {
+ start=pos;
+ }
+ }
- public int getEndPos() {
- if( session!=null ) {
- return session.getEndPos();
- } else {
- return end;
- }
- }
+ public int unmarshalEndPos() {
+ if( wireFormat!=null ) {
+ return wireFormat.unmarshalEndPos();
+ } else {
+ return end;
+ }
+ }
- public void setEndPos(int pos) {
- if( session!=null ) {
- session.setEndPos(pos);
- } else {
- end = pos;
- }
- }
+ public void unmarshalEndPos(int pos) {
+ if( wireFormat!=null ) {
+ wireFormat.unmarshalEndPos(pos);
+ } else {
+ end = pos;
+ }
+ }
- public Object unmarshal(ByteBuffer buffer) throws IOException {
- if( session!=null ) {
- return session.unmarshal(buffer);
- }
+ public Object unmarshalNB(ByteBuffer buffer) throws IOException {
+ if( wireFormat!=null ) {
+ return wireFormat.unmarshalNB(buffer);
+ }
- Buffer b = new Buffer(buffer.array(), start, buffer.position());
- for (WireFormatFactory wff : wireFormatFactories) {
- if (wff.matchesWireformatHeader( b )) {
- wireFormat = wff.createWireFormat();
- session = wireFormat.createUnmarshalSession();
- session.setStartPos(start);
- session.setEndPos(end);
- return wireFormat;
- }
- }
-
- if( end >= maxHeaderLength ) {
- throw new IOException("Could not discriminate the protocol.");
- }
- return null;
+ Buffer b = new Buffer(buffer.array(), start, buffer.position());
+ for (WireFormatFactory wff : wireFormatFactories) {
+ if (wff.matchesWireformatHeader( b )) {
+ wireFormat = wff.createWireFormat();
+ wireFormat.unmarshalStartPos(start);
+ wireFormat.unmarshalEndPos(end);
+ return wireFormat;
}
- };
+ }
+
+ if( end >= maxHeaderLength ) {
+ throw new IOException("Could not discriminate the protocol.");
+ }
+ return null;
}
public void marshal(Object command, DataOutput out) throws IOException {
@@ -195,10 +182,6 @@ public class MultiWireFormatFactory impl
return wireFormat.getName();
}
}
-
- public WireFormatFactory getWireFormatFactory() {
- return new MultiWireFormatFactory(wireFormatFactories);
- }
}
public MultiWireFormatFactory() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Wed Jul 7 03:46:27 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.activemq.util.buffer.B
public class ObjectStreamWireFormat implements WireFormat {
public static final String WIREFORMAT_NAME = "object";
+
public Buffer marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(baos);
@@ -72,35 +74,28 @@ public class ObjectStreamWireFormat impl
}
}
- public UnmarshalSession createUnmarshalSession() {
+ public int unmarshalStartPos() {
throw new UnsupportedOperationException();
}
- public Object unmarshal(ReadableByteChannel channel) {
+ public void unmarshalStartPos(int pos) {
throw new UnsupportedOperationException();
}
- public void setVersion(int version) {
- }
-
- public int getVersion() {
- return 0;
+ public int unmarshalEndPos() {
+ throw new UnsupportedOperationException();
}
- public String getName() {
- return WIREFORMAT_NAME;
+ public void unmarshalEndPos(int pos) {
+ throw new UnsupportedOperationException();
}
- public boolean inReceive() {
- // TODO implement the inactivity monitor
- return false;
+ public Object unmarshalNB(ByteBuffer buffer) throws IOException {
+ throw new UnsupportedOperationException();
}
- public Transport createTransportFilters(Transport transport, Map options) {
- return transport;
+ public String getName() {
+ return WIREFORMAT_NAME;
}
- public WireFormatFactory getWireFormatFactory() {
- return new ObjectStreamWireFormatFactory();
- }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java Wed Jul 7 03:46:27 2010
@@ -37,6 +37,11 @@ import org.apache.activemq.util.buffer.B
public interface WireFormat {
/**
+ * @return The name of the wireformat
+ */
+ String getName();
+
+ /**
* Packet based marshaling
*/
Buffer marshal(Object command) throws IOException;
@@ -56,31 +61,18 @@ public interface WireFormat {
*/
Object unmarshal(DataInput in) throws IOException;
- /**
- * For a unmarshal session is used for non-blocking
- * unmarshalling.
- */
- interface UnmarshalSession {
- int getStartPos();
- void setStartPos(int pos);
-
- int getEndPos();
- void setEndPos(int pos);
-
- Object unmarshal(ByteBuffer buffer) throws IOException;
- }
+ int unmarshalStartPos();
+ void unmarshalStartPos(int pos);
- UnmarshalSession createUnmarshalSession();
+ int unmarshalEndPos();
+ void unmarshalEndPos(int pos);
/**
- * @return The name of the wireformat
+ * For a unmarshal session is used for non-blocking
+ * unmarshalling.
*/
- String getName();
-
- /**
- * Returns a WireFormatFactory which can create WireFormat of this type.
- * @return
- */
- WireFormatFactory getWireFormatFactory();
+ Object unmarshalNB(ByteBuffer buffer) throws IOException;
+
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java?rev=961082&r1=961081&r2=961082&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java Wed Jul 7 03:46:27 2010
@@ -3,6 +3,7 @@ package org.apache.activemq.wireformat.m
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Map;
@@ -14,25 +15,11 @@ import org.apache.activemq.wireformat.Wi
public class MockWireFormatFactory implements WireFormatFactory {
public class MockWireFormat implements WireFormat {
- public Transport createTransportFilters(Transport transport, Map options) {
- return transport;
- }
public String getName() {
return "mock";
}
- public int getVersion() {
- return 0;
- }
-
- public boolean inReceive() {
- return false;
- }
-
- public void setVersion(int version) {
- }
-
public Buffer marshal(Object command) throws IOException {
throw new UnsupportedOperationException();
}
@@ -49,17 +36,26 @@ public class MockWireFormatFactory imple
throw new UnsupportedOperationException();
}
- public UnmarshalSession createUnmarshalSession() {
+ public int unmarshalStartPos() {
throw new UnsupportedOperationException();
}
- public Object unmarshal(ReadableByteChannel channel) {
+ public void unmarshalStartPos(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
+ public int unmarshalEndPos() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void unmarshalEndPos(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object unmarshalNB(ByteBuffer buffer) throws IOException {
throw new UnsupportedOperationException();
}
- public WireFormatFactory getWireFormatFactory() {
- return new MockWireFormatFactory();
- }
}
public WireFormat createWireFormat() {