You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/04/27 19:33:09 UTC
svn commit: r1097189 [2/42] - in /activemq/activemq-apollo/trunk: ./
apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/
apollo-openwire/src/main/resources/
apollo-openwire/src/main/resources/META-INF/
apollo-openwire/src/main/resources/MET...
Added: activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/pom.xml?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/pom.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/pom.xml Wed Apr 27 17:32:51 2011
@@ -0,0 +1,146 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version
+ 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ applicable law or agreed to in writing, software distributed under
+ the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-scala</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../apollo-scala</relativePath>
+ </parent>
+
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-openwire</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <name>${project.artifactId}</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-broker</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-tcp</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf</artifactId>
+ <version>${hawtbuf-version}</version>
+ </dependency>
+
+
+ <!-- Scala Support -->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>compile</scope>
+ <version>${scala-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala-version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <!-- Testing Dependencies -->
+
+ <!-- bring in the activemq jms client libs -->
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <version>5.5.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- so we can test against a persisentce store -->
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-hawtdb</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-bdb</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-broker</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-util</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ <version>${junit-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest</artifactId>
+ <version>${scalatest-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j-version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <!-- Generate a test jar for the test cases in this package -->
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+</project>
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index Wed Apr 27 17:32:51 2011
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.openwire.OpenwireProtocolCodecFactory
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Wed Apr 27 17:32:51 2011
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.openwire.OpenwireProtocolFactory
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala Wed Apr 27 17:32:51 2011
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire
+
+import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.apache.activemq.apollo.transport.ProtocolCodec
+import OpenwireConstants._
+import java.nio.ByteBuffer
+import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
+import java.io.EOFException
+import org.fusesource.hawtbuf.{BufferEditor, DataByteArrayOutputStream, Buffer}
+import org.apache.activemq.apollo.broker.{Sizer, Message}
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat
+import org.apache.activemq.apollo.openwire.command._
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object OpenwireCodec extends Sizer[Command] {
+ def encode(message: Message):MessageRecord = {
+ throw new UnsupportedOperationException
+ }
+ def decode(message: MessageRecord) = {
+ throw new UnsupportedOperationException
+ }
+
+ def size(value: Command) = {
+ value match {
+ case x:ActiveMQMessage => x.getSize
+ case _ => 100
+ }
+ }
+}
+
+class OpenwireCodec extends ProtocolCodec {
+
+ implicit def toBuffer(value:Array[Byte]):Buffer = new Buffer(value)
+
+ def protocol = PROTOCOL
+
+ var write_buffer_size = 1024*64;
+ var write_counter = 0L
+ var write_channel:WritableByteChannel = null
+
+ var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
+ var write_buffer = ByteBuffer.allocate(0)
+
+ val format = new OpenWireFormat();
+
+
+ def full = next_write_buffer.size() >= (write_buffer_size >> 1)
+ def is_empty = write_buffer.remaining() == 0
+
+ def setWritableByteChannel(channel: WritableByteChannel) = {
+ this.write_channel = channel
+ if( this.write_channel.isInstanceOf[SocketChannel] ) {
+ this.write_channel.asInstanceOf[SocketChannel].socket().setSendBufferSize(write_buffer_size);
+ }
+ }
+
+ def getWriteCounter = write_counter
+
+ def write(command: Any):ProtocolCodec.BufferState = {
+ if ( full) {
+ ProtocolCodec.BufferState.FULL
+ } else {
+ val was_empty = is_empty
+ command match {
+ case frame:Command=>
+ format.marshal(frame, next_write_buffer)
+ }
+ if( was_empty ) {
+ ProtocolCodec.BufferState.WAS_EMPTY
+ } else {
+ ProtocolCodec.BufferState.NOT_EMPTY
+ }
+ }
+ }
+
+ def flush():ProtocolCodec.BufferState = {
+ // if we have a pending write that is being sent over the socket...
+ if ( write_buffer.remaining() != 0 ) {
+ write_counter += write_channel.write(write_buffer)
+ }
+
+ // if it is now empty try to refill...
+ if ( is_empty && next_write_buffer.size()!=0 ) {
+ // size of next buffer is based on how much was used in the previous buffer.
+ val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
+ write_buffer = next_write_buffer.toBuffer().toByteBuffer()
+ next_write_buffer = new DataByteArrayOutputStream(prev_size)
+ }
+
+ if ( is_empty ) {
+ ProtocolCodec.BufferState.EMPTY
+ } else {
+ ProtocolCodec.BufferState.NOT_EMPTY
+ }
+ }
+
+ var read_counter = 0L
+ var read_buffer_size = 1024*64
+ var read_channel:ReadableByteChannel = null
+
+ var read_buffer:ByteBuffer = ByteBuffer.allocate(4)
+ var read_waiting_on = 4
+
+ var next_action:()=>Command = read_header
+
+ def setReadableByteChannel(channel: ReadableByteChannel) = {
+ this.read_channel = channel
+ if( this.read_channel.isInstanceOf[SocketChannel] ) {
+ this.read_channel.asInstanceOf[SocketChannel].socket().setReceiveBufferSize(read_buffer_size);
+ }
+ }
+
+ def unread(buffer: Buffer) = {
+ assert(read_counter == 0)
+ read_buffer = buffer.toByteBuffer
+ read_buffer.position(read_buffer.limit)
+ read_counter += buffer.length
+ read_waiting_on -= buffer.length
+ if ( read_waiting_on <= 0 ) {
+ read_buffer.flip
+ }
+ }
+
+ def getReadCounter = read_counter
+
+ override def read():Object = {
+
+ var command:Object = null
+ while( command==null ) {
+ // do we need to read in more data???
+ if ( read_waiting_on > 0 ) {
+
+ // Try to fill the buffer with data from the socket..
+ var p = read_buffer.position()
+ var count = read_channel.read(read_buffer)
+ if (count == -1) {
+ throw new EOFException("Peer disconnected")
+ } else if (count == 0) {
+ return null
+ }
+ read_counter += count
+ read_waiting_on -= count
+
+ if ( read_waiting_on <= 0 ) {
+ read_buffer.flip
+ }
+
+ } else {
+ command = next_action()
+ if ( read_waiting_on > 0 ) {
+ val next_buffer = ByteBuffer.allocate(read_buffer.remaining+read_waiting_on)
+ next_buffer.put(read_buffer)
+ read_buffer = next_buffer
+ }
+ }
+ }
+ return command
+ }
+
+ def read_header:()=>Command = ()=> {
+
+ read_buffer.mark
+ val size = read_buffer.getInt
+ read_buffer.reset
+
+ read_waiting_on += (size)
+
+ next_action = read_command(size+4)
+ null
+ }
+
+ def read_command(size:Int) = ()=> {
+
+ val buf = new Buffer(read_buffer.array, read_buffer.position, size)
+ val rc = format.unmarshal(buf)
+ read_buffer.position(read_buffer.position+size)
+
+ read_waiting_on += 4
+ next_action = read_header
+ rc.asInstanceOf[Command]
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireConstants.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireConstants.scala?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireConstants.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireConstants.scala Wed Apr 27 17:32:51 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire
+
+import org.fusesource.hawtbuf.Buffer
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object OpenwireConstants {
+
+ val MAGIC = new Buffer("ActiveMQ".getBytes("UTF-8"));
+
+ val PROTOCOL = "openwire"
+
+
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala Wed Apr 27 17:32:51 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire
+
+import org.apache.activemq.apollo.broker.Message
+import java.lang.{String, Class}
+import org.fusesource.hawtdispatch.BaseRetained
+import org.fusesource.hawtbuf.Buffer._
+import OpenwireConstants._
+import org.fusesource.hawtbuf.{UTF8Buffer, AsciiBuffer, Buffer}
+import command.{ActiveMQBytesMessage, ActiveMQTextMessage, ActiveMQMessage}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class OpenwireMessage(val message:ActiveMQMessage) extends BaseRetained with Message {
+
+ val _id = ascii(message.getMessageId.toString)
+
+ def getProperty(name: String) = message.getProperty(name)
+
+ def getLocalConnectionId = message.getProducerId.getConnectionId
+
+ def protocol = OpenwireProtocol
+
+ def producer = ascii(message.getProducerId.toString)
+
+ def priority = message.getPriority
+
+ def persistent = message.isPersistent
+
+ def id = _id
+
+ def expiration = message.getExpiration
+
+ def destination = message.getDestination.toDestination
+
+ def getBodyAs[T](toType : Class[T]) = {
+ (message match {
+ case x:ActiveMQTextMessage =>
+ if( toType == classOf[String] ) {
+ x.getText
+ } else if (toType == classOf[Buffer]) {
+ utf8(x.getText)
+ } else if (toType == classOf[AsciiBuffer]) {
+ ascii(x.getText)
+ } else if (toType == classOf[UTF8Buffer]) {
+ utf8(x.getText)
+ } else {
+ null
+ }
+ case x:ActiveMQBytesMessage =>
+ null
+ case x:ActiveMQMessage =>
+ if( toType == classOf[String] ) {
+ ""
+ } else if (toType == classOf[Buffer]) {
+ ascii("")
+ } else if (toType == classOf[AsciiBuffer]) {
+ ascii("")
+ } else if (toType == classOf[UTF8Buffer]) {
+ utf8("")
+ } else {
+ null
+ }
+ }).asInstanceOf[T]
+ }
+
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala Wed Apr 27 17:32:51 2011
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire
+
+import org.apache.activemq.apollo.broker.protocol.{Protocol, ProtocolFactory}
+import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.apache.activemq.apollo.broker.Message
+import OpenwireConstants._
+import org.apache.activemq.apollo.transport.ProtocolCodecFactory
+import org.fusesource.hawtbuf.Buffer
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object OpenwireProtocolFactory extends ProtocolFactory.Provider {
+
+ def create() = OpenwireProtocol
+
+ def create(config: String) = if(config == PROTOCOL) {
+ OpenwireProtocol
+ } else {
+ null
+ }
+
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object OpenwireProtocol extends OpenwireProtocolCodecFactory with Protocol {
+
+ def createProtocolHandler = new OpenwireProtocolHandler
+
+ def encode(message: Message):MessageRecord = {
+ OpenwireCodec.encode(message)
+ }
+
+ def decode(message: MessageRecord) = {
+ OpenwireCodec.decode(message)
+ }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class OpenwireProtocolCodecFactory extends ProtocolCodecFactory.Provider {
+
+
+ def protocol = PROTOCOL
+
+ def createProtocolCodec() = new OpenwireCodec();
+
+ def isIdentifiable() = true
+
+ def maxIdentificaionLength() = 4 + MAGIC.length
+
+ def matchesIdentification(buffer: Buffer):Boolean = {
+ buffer.length >= 4 + MAGIC.length && buffer.containsAt(MAGIC, 4)
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Apr 27 17:32:51 2011
@@ -0,0 +1,902 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire
+
+import OpenwireConstants._
+
+import org.fusesource.hawtdispatch._
+import org.fusesource.hawtbuf._
+import collection.mutable.{ListBuffer, HashMap}
+
+import AsciiBuffer._
+import org.apache.activemq.apollo.broker._
+import BufferConversions._
+import java.io.IOException
+import org.apache.activemq.apollo.selector.SelectorParser
+import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
+import org.apache.activemq.apollo.transport._
+import org.apache.activemq.apollo.broker.store._
+import org.apache.activemq.apollo.util._
+import java.util.concurrent.TimeUnit
+import java.util.Map.Entry
+import protocol._
+import scala.util.continuations._
+import security.SecurityContext
+import tcp.TcpTransport
+import codec.OpenWireFormat
+import command._
+import org.apache.activemq.apollo.dto.{TopicDestinationDTO, DurableSubscriptionDestinationDTO, DestinationDTO, StompConnectionStatusDTO}
+
+object OpenwireProtocolHandler extends Log {
+
+ val DEFAULT_DIE_DELAY = 5 * 1000L
+ var die_delay = DEFAULT_DIE_DELAY
+
+ val preferred_wireformat_settings = new WireFormatInfo();
+ preferred_wireformat_settings.setVersion(OpenWireFormat.DEFAULT_VERSION);
+ preferred_wireformat_settings.setStackTraceEnabled(true);
+ preferred_wireformat_settings.setCacheEnabled(true);
+ preferred_wireformat_settings.setTcpNoDelayEnabled(true);
+ preferred_wireformat_settings.setTightEncodingEnabled(true);
+ preferred_wireformat_settings.setSizePrefixDisabled(false);
+ preferred_wireformat_settings.setMaxInactivityDuration(30 * 1000 * 1000);
+ preferred_wireformat_settings.setMaxInactivityDurationInitalDelay(10 * 1000 * 1000);
+ preferred_wireformat_settings.setCacheSize(1024);
+
+}
+
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+
+class OpenwireProtocolHandler extends ProtocolHandler {
+
+ var minimum_protocol_version = 1
+
+ import OpenwireProtocolHandler._
+
+ def dispatchQueue: DispatchQueue = connection.dispatch_queue
+
+ def protocol = PROTOCOL
+
+ var outbound_sessions: SinkMux[Command] = null
+ var connection_session: Sink[Command] = null
+ var closed = false
+
+ var last_command_id=0
+
+ def next_command_id = {
+ last_command_id += 1
+ last_command_id
+ }
+
+ var producerRoutes = new LRUCache[List[DestinationDTO], DeliveryProducerRoute](10) {
+ override def onCacheEviction(eldest: Entry[List[DestinationDTO], DeliveryProducerRoute]) = {
+ host.router.disconnect(eldest.getKey.toArray, eldest.getValue)
+ }
+ }
+
+ var host: VirtualHost = null
+
+ private def queue = connection.dispatch_queue
+
+ var session_id: AsciiBuffer = _
+ var wire_format: OpenWireFormat = _
+ var login: Option[AsciiBuffer] = None
+ var passcode: Option[AsciiBuffer] = None
+ var dead = false
+ val security_context = new SecurityContext
+
+ var heart_beat_monitor: HeartBeatMonitor = new HeartBeatMonitor
+
+ var waiting_on: String = "client request"
+ var current_command: Object = _
+
+
+ override def create_connection_status = {
+ var rc = new StompConnectionStatusDTO
+ rc.protocol_version = if (wire_format == null) "" else wire_format.getVersion.toString
+ rc.user = login.map(_.toString).getOrElse(null)
+ // rc.subscription_count = consumers.size
+ rc.waiting_on = waiting_on
+ rc
+ }
+
+ def suspendRead(reason: String) = {
+ waiting_on = reason
+ connection.transport.suspendRead
+ }
+
+ def resumeRead() = {
+ waiting_on = "client request"
+ connection.transport.resumeRead
+ }
+
+ def ack(command: Command):Unit = {
+ if (command.isResponseRequired()) {
+ val rc = new Response();
+ rc.setCorrelationId(command.getCommandId());
+ connection_session.offer(rc);
+ }
+ }
+
+
+ override def on_transport_failure(error: IOException) = {
+ if (!connection.stopped) {
+ error.printStackTrace
+ suspendRead("shutdown")
+ debug(error, "Shutting connection down due to: %s", error)
+ connection.stop
+ }
+ }
+
+ override def on_transport_connected():Unit = {
+ security_context.local_address = connection.transport.getLocalAddress
+ security_context.remote_address = connection.transport.getRemoteAddress
+ outbound_sessions = new SinkMux[Command](connection.transport_sink.map {
+ x:Command =>
+ x.setCommandId(next_command_id)
+ info("sending frame: %s", x)
+ x
+ }, dispatchQueue, OpenwireCodec)
+ connection_session = new OverflowSink(outbound_sessions.open(dispatchQueue));
+ connection_session.refiller = NOOP
+
+ // Send our preferred wire format settings..
+ connection.transport.offer(preferred_wireformat_settings)
+
+ resumeRead
+ reset {
+ suspendRead("virtual host lookup")
+ this.host = connection.connector.broker.get_default_virtual_host
+ resumeRead
+ if(host==null) {
+ async_die("Could not find default virtual host")
+ }
+ }
+ }
+
+ override def on_transport_disconnected():Unit = {
+ if (!closed) {
+ closed = true;
+ dead = true;
+
+ heart_beat_monitor.stop
+
+ import collection.JavaConversions._
+ producerRoutes.foreach{
+ case (dests, route) => host.router.disconnect(dests.toArray, route)
+ }
+ producerRoutes.clear
+
+ // consumers.foreach{
+ // case (_, consumer) =>
+ // if (consumer.binding == null) {
+ // host.router.unbind(consumer.destination, consumer)
+ // } else {
+ // host.router.get_queue(consumer.binding) {
+ // queue =>
+ // queue.foreach(_.unbind(consumer :: Nil))
+ // }
+ // }
+ // }
+ // consumers = Map()
+ trace("stomp protocol resources released")
+ }
+ }
+
+
+ override def on_transport_command(command: Object):Unit = {
+ if( dead ) {
+ // We stop processing client commands once we are dead
+ return;
+ }
+ try {
+ current_command = command
+ println("received: %s", command)
+ if (wire_format == null) {
+ command match {
+ case codec: OpenwireCodec =>
+ // this is passed on to us by the protocol discriminator
+ // so we know which wire format is being used.
+ case command: WireFormatInfo =>
+ on_wire_format_info(command)
+ case _ =>
+ die("Unexpected command: " + command.getClass);
+ }
+ } else {
+ command match {
+ case msg:ActiveMQMessage=> on_message(msg)
+ case ack:MessageAck=> on_message_ack(ack)
+ case info:TransactionInfo => on_transaction_info(info)
+ case info:ProducerInfo=> on_producer_info(info)
+ case info:ConsumerInfo=> on_consumer_info(info)
+ case info:SessionInfo=> on_session_info(info)
+ case info:ConnectionInfo=> on_connection_info(info)
+ case info:RemoveInfo=> on_remove_info(info)
+ case info:KeepAliveInfo=> ack(info)
+ case info:ShutdownInfo=> ack(info); connection.stop
+ case info:FlushCommand=> ack(info)
+
+ // case info:ConnectionControl=>
+ // case info:ConnectionError=>
+ // case info:ConsumerControl=>
+ // case info:DestinationInfo=>
+ // case info:RemoveSubscriptionInfo=>
+ // case info:ControlCommand=>
+
+ ///////////////////////////////////////////////////////////////////
+ // Methods for cluster operations
+ // These commands are sent to the broker when it's acting like a
+ //client to another broker.
+ // /////////////////////////////////////////////////////////////////
+ // case info:BrokerInfo=>
+ // case info:MessageDispatch=>
+ // case info:MessageDispatchNotification=>
+ // case info:ProducerAck=>
+
+
+ case _ =>
+ die("Unspported command: " + command.getClass);
+ }
+ }
+ } catch {
+ case e: Break =>
+ case e: Exception =>
+ e.printStackTrace
+ async_die("Internal Server Error")
+ } finally {
+ current_command = null
+ }
+ }
+
+ class ProtocolException(msg:String) extends RuntimeException(msg)
+ class Break extends RuntimeException
+
+ def async_fail(msg: String, actual:Command=null):Unit = try {
+ fail(msg, actual)
+ } catch {
+ case x:Break=>
+ }
+
+ def fail[T](msg: String, actual:Command=null):T = {
+ def respond(command:Command) = {
+ if(command.isResponseRequired()) {
+ val e = new ProtocolException(msg);
+ e.fillInStackTrace
+
+ val rc = new ExceptionResponse();
+ rc.setCorrelationId(command.getCommandId());
+ rc.setException(e)
+ connection_session.offer(rc);
+ }
+ }
+ (current_command,actual) match {
+ case (null, null)=>
+ case (null, command:Command)=>
+ case (command:Command, null)=>
+ case (command:Command, command2:Command)=>
+ respond(command)
+ }
+ throw new Break()
+ }
+
+ def async_die(msg: String):Unit = try {
+ die(msg)
+ } catch {
+ case x:Break=>
+ }
+
+ /**
+ * A protocol error that cannot be recovered from. It results in the connections being terminated.
+ */
+ def die[T](msg: String):T = {
+ if (!dead) {
+ dead = true
+ debug("Shutting connection down due to: " + msg)
+ // TODO: if there are too many open connections we should just close the connection
+ // without waiting for the error to get sent to the client.
+ queue.after(die_delay, TimeUnit.MILLISECONDS) {
+ connection.stop()
+ }
+ fail(msg)
+ }
+ throw new Break()
+ }
+
+
+
+ def on_wire_format_info(info: WireFormatInfo) = {
+
+ if (!info.isValid()) {
+ die("Remote wire format magic is invalid")
+ } else if (info.getVersion() < minimum_protocol_version) {
+ die("Remote wire format (%s) is lower the minimum version required (%s)".format(info.getVersion(), minimum_protocol_version))
+ }
+
+ wire_format = connection.transport.getProtocolCodec.asInstanceOf[OpenwireCodec].format
+ wire_format.renegotiateWireFormat(info, preferred_wireformat_settings)
+
+ connection.transport match {
+ case x: TcpTransport =>
+ x.getSocketChannel.socket.setTcpNoDelay(wire_format.isTcpNoDelayEnabled())
+ case _ =>
+ }
+
+ val inactive_time = preferred_wireformat_settings.getMaxInactivityDuration().min(info.getMaxInactivityDuration())
+ val initial_delay = preferred_wireformat_settings.getMaxInactivityDurationInitalDelay().min(info.getMaxInactivityDurationInitalDelay())
+
+ if (initial_delay != inactive_time) {
+ die("We only support an initial delay inactivity duration equal to the max inactivity duration")
+ }
+
+ if (inactive_time > 0) {
+ heart_beat_monitor.read_interval = inactive_time
+ // lets be a little forgiving to account to packet transmission latency.
+ heart_beat_monitor.read_interval += inactive_time.min(5000)
+
+ heart_beat_monitor.on_dead = () => {
+ async_die("Stale connection. Missed heartbeat.")
+ }
+
+ heart_beat_monitor.write_interval = inactive_time
+ heart_beat_monitor.on_keep_alive = () => {
+ // we don't care if the offer gets rejected.. since that just
+ // means there is other traffic getting transmitted.
+ connection.transport.offer(new KeepAliveInfo)
+ }
+ }
+
+ heart_beat_monitor.transport = connection.transport
+ heart_beat_monitor.start
+
+ // Give the client some info about this broker.
+ val brokerInfo = new BrokerInfo();
+ brokerInfo.setBrokerId(new BrokerId(host.config.id));
+ brokerInfo.setBrokerName(host.config.id);
+ brokerInfo.setBrokerURL(host.broker.get_connect_address);
+ connection_session.offer(brokerInfo);
+ }
+
+ val all_connections = new HashMap[ConnectionId, ConnectionContext]();
+ val all_sessions = new HashMap[SessionId, SessionContext]();
+ val all_producers = new HashMap[ProducerId, ProducerContext]();
+ val all_consumers = new HashMap[ConsumerId, ConsumerContext]();
+ val all_transactions = new HashMap[TransactionId, TransactionContext]();
+ val all_temp_dests = List[ActiveMQDestination]();
+
+ class ConnectionContext(val info: ConnectionInfo) {
+
+ val sessions = new HashMap[SessionId, SessionContext]();
+ val transactions = new HashMap[TransactionId, TransactionContext]();
+
+ def default_session_id = new SessionId(info.getConnectionId(), -1)
+
+ def attach = {
+ // create the default session.
+ new SessionContext(this, new SessionInfo(default_session_id)).attach
+ all_connections.put(info.getConnectionId, this)
+ }
+
+ def dettach = {
+ sessions.values.toArray.foreach(_.dettach)
+ transactions.values.toArray.foreach(_.dettach)
+ all_connections.remove(info.getConnectionId)
+ }
+ }
+
+ class SessionContext(val parent: ConnectionContext, val info: SessionInfo) {
+ val producers = new HashMap[ProducerId, ProducerContext]();
+ val consumers = new HashMap[ConsumerId, ConsumerContext]();
+
+ def attach = {
+ parent.sessions.put(info.getSessionId, this)
+ all_sessions.put(info.getSessionId, this)
+ }
+
+ def dettach = {
+ producers.values.toArray.foreach(_.dettach)
+ consumers.values.toArray.foreach(_.dettach)
+ parent.sessions.remove(info.getSessionId)
+ all_sessions.remove(info.getSessionId)
+ }
+
+
+ }
+
+ def noop = shift { k: (Unit=>Unit) => k() }
+
+ class ProducerContext(val parent: SessionContext, val info: ProducerInfo) {
+ def attach = {
+ parent.producers.put(info.getProducerId, this)
+ all_producers.put(info.getProducerId, this)
+ }
+
+ def dettach = {
+ parent.producers.remove(info.getProducerId)
+ all_producers.remove(info.getProducerId)
+ }
+ }
+
+ class ConsumerContext(val parent: SessionContext, val info: ConsumerInfo) extends BaseRetained with DeliveryConsumer {
+
+ var selector_expression:BooleanExpression = _
+ var destination:Array[DestinationDTO] = _
+
+ def attach = {
+
+ if( info.getDestination == null ) fail("destination was not set")
+
+ destination = info.getDestination.toDestination
+ parent.consumers.put(info.getConsumerId, this)
+ all_consumers.put(info.getConsumerId, this)
+ var is_durable_sub = info.getSubscriptionName!=null
+
+ selector_expression = info.getSelector match {
+ case null=> null
+ case x=>
+ try {
+ SelectorParser.parse(x)
+ } catch {
+ case e:FilterException =>
+ fail("Invalid selector expression: "+e.getMessage)
+ }
+ }
+
+ if( is_durable_sub ) {
+ destination = destination.map { _ match {
+ case x:TopicDestinationDTO=>
+ val rc = new DurableSubscriptionDestinationDTO(x.name)
+ rc.client_id = parent.parent.info.getClientId
+ rc.subscription_id = if( is_durable_sub ) info.getSubscriptionName else null
+ rc.filter = info.getSelector
+ rc
+ case _ => die("A durable subscription can only be used on a topic destination")
+ }
+ }
+ }
+
+ reset {
+ val rc = host.router.bind(destination, this, security_context)
+ rc match {
+ case Success(_) =>
+ ack(info)
+ noop
+ case Failure(reason) =>
+ fail(reason, info)
+ noop
+ }
+ }
+ this.release
+ }
+
+ def dettach = {
+ host.router.unbind(destination, this)
+ parent.consumers.remove(info.getConsumerId)
+ all_consumers.remove(info.getConsumerId)
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // DeliveryConsumer impl
+ ///////////////////////////////////////////////////////////////////
+
+ def dispatch_queue = OpenwireProtocolHandler.this.dispatchQueue
+
+ override def connection = Some(OpenwireProtocolHandler.this.connection)
+
+ def is_persistent = false
+
+ def matches(delivery:Delivery) = {
+ if( delivery.message.protocol eq OpenwireProtocol ) {
+ if( selector_expression!=null ) {
+ selector_expression.matches(delivery.message)
+ } else {
+ true
+ }
+ } else {
+ false
+ }
+ }
+
+ def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter {
+ retain
+
+ def producer = p
+ def consumer = ConsumerContext.this
+
+ val outbound_session = outbound_sessions.open(producer.dispatch_queue)
+
+ def downstream = outbound_session
+
+ def close = {
+ outbound_sessions.close(outbound_session)
+ release
+ }
+
+ // Delegate all the flow control stuff to the session
+ def offer(delivery:Delivery) = {
+ if( outbound_session.full ) {
+ false
+ } else {
+ var msg = delivery.message.asInstanceOf[OpenwireMessage].message
+ ack_handler.track(msg.getMessageId, delivery.ack)
+ val dispatch = new MessageDispatch
+ dispatch.setConsumerId(info.getConsumerId)
+ dispatch.setDestination(msg.getDestination)
+ dispatch.setMessage(msg)
+
+ val rc = outbound_session.offer(dispatch)
+ assert(rc, "offer should be accepted since it was not full")
+ true
+ }
+ }
+
+
+ }
+
+ object ack_handler {
+
+ // TODO: Need to validate all the range ack cases...
+ var consumer_acks = ListBuffer[(MessageId, (Boolean, StoreUOW)=>Unit)]()
+
+ def track(id:MessageId, callback:(Boolean, StoreUOW)=>Unit) = {
+ queue {
+ consumer_acks += (( id, callback ))
+ }
+
+ }
+
+ def apply(messageAck: MessageAck, uow:StoreUOW=null) = {
+
+ var found = false
+ val (acked, not_acked) = consumer_acks.partition{ case (id, _)=>
+ if( found ) {
+ false
+ } else {
+ if( id == messageAck.getLastMessageId ) {
+ found = true
+ }
+ true
+ }
+ }
+
+ if( acked.isEmpty ) {
+ async_fail("ACK failed, invalid message id: %s".format(messageAck.getLastMessageId), messageAck)
+ } else {
+ consumer_acks = not_acked
+ acked.foreach{case (_, callback)=>
+ if( callback!=null ) {
+ callback(true, uow)
+ }
+ }
+ }
+ }
+
+ }
+
+ }
+
+ class TransactionContext(val parent: ConnectionContext, val id: TransactionId) {
+
+ // TODO: eventually we want to back this /w a broker Queue which
+ // can provides persistence and memory swapping.
+// Buffer xid = null;
+// if (tid.isXATransaction()) {
+// xid = XidImpl.toBuffer((Xid) tid);
+// }
+// t = host.getTransactionManager().createTransaction(xid);
+// transactions.put(tid, t);
+
+ val actions = ListBuffer[(StoreUOW)=>Unit]()
+
+ def attach = {
+ parent.transactions.put(id, this)
+ all_transactions.put(id, this)
+ }
+
+ def dettach = {
+ actions.clear
+ parent.transactions.remove(id)
+ all_transactions.remove(id)
+ }
+
+
+ def apply(proc:(StoreUOW)=>Unit) = {
+ actions += proc
+ }
+
+ def commit(onComplete: => Unit) = {
+
+ val uow = if( host.store!=null ) {
+ host.store.create_uow
+ } else {
+ null
+ }
+
+ actions.foreach { proc =>
+ proc(uow)
+ }
+
+ if( uow!=null ) {
+ uow.on_complete(^{
+ onComplete
+ })
+ uow.release
+ } else {
+ onComplete
+ }
+
+ }
+
+ def rollback() = {
+ actions.clear
+ }
+
+ }
+
+ def create_tx_ctx(connection:ConnectionContext, txid:TransactionId):TransactionContext= {
+ if ( all_transactions.contains(txid) ) {
+ die("transaction allready started")
+ } else {
+ val context = new TransactionContext(connection, txid)
+ context.attach
+ context
+ }
+ }
+
+ def get_or_create_tx_ctx(connection:ConnectionContext, txid:TransactionId):TransactionContext = {
+ all_transactions.get(txid) match {
+ case Some(ctx)=> ctx
+ case None=>
+ val context = new TransactionContext(connection, txid)
+ context.attach
+ context
+ }
+ }
+
+ def get_tx_ctx(txid:TransactionId):TransactionContext = {
+ all_transactions.get(txid) match {
+ case Some(ctx)=> ctx
+ case None=> die("transaction not active: %d".format(txid))
+ }
+ }
+
+ def remove_tx_ctx(txid:TransactionId):TransactionContext= {
+ all_transactions.get(txid) match {
+ case None=>
+ die("transaction not active: %d".format(txid))
+ case Some(tx)=>
+ tx.dettach
+ tx
+ }
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Connection / Session / Consumer / Producer state tracking.
+ ///////////////////////////////////////////////////////////////////
+
+ def on_connection_info(info: ConnectionInfo) = {
+ val id = info.getConnectionId()
+ if (!all_connections.contains(id)) {
+ new ConnectionContext(info).attach
+ }
+ ack(info);
+ }
+
+ def on_session_info(info: SessionInfo) = {
+ val id = info.getSessionId();
+ if (!all_sessions.contains(id)) {
+ val parent = all_connections.get(id.getParentId()).getOrElse(die("Cannot add a session to a connection that had not been registered."))
+ new SessionContext(parent, info).attach
+ }
+ ack(info);
+ }
+
+ def on_producer_info(info: ProducerInfo) = {
+ val id = info.getProducerId
+ if (!all_producers.contains(id)) {
+ val parent = all_sessions.get(id.getParentId()).getOrElse(die("Cannot add a producer to a session that had not been registered."))
+ new ProducerContext(parent, info).attach
+ }
+ ack(info);
+ }
+
+ def on_consumer_info(info: ConsumerInfo) = {
+ val id = info.getConsumerId
+ if (!all_consumers.contains(id)) {
+ val parent = all_sessions.get(id.getParentId()).getOrElse(die("Cannot add a consumer to a session that had not been registered."))
+ new ConsumerContext(parent, info).attach
+ } else {
+ ack(info);
+ }
+ }
+
+ def on_remove_info(info: RemoveInfo) = {
+ info.getObjectId match {
+ case id: ConnectionId => all_connections.get(id).foreach(_.dettach)
+ case id: SessionId => all_sessions.get(id).foreach(_.dettach)
+ case id: ProducerId => all_producers.get(id).foreach(_.dettach)
+ case id: ConsumerId => all_consumers.get(id).foreach(_.dettach )
+ // case id: DestinationInfo =>
+ case _ => die("Invalid object id.")
+ }
+ ack(info)
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Methods for transaction management
+ ///////////////////////////////////////////////////////////////////
+ def on_transaction_info(info:TransactionInfo) = {
+ val parent = all_connections.get(info.getConnectionId()).getOrElse(die("Cannot add a session to a connection that had not been registered."))
+ val id = info.getTransactionId
+ info.getType match {
+ case TransactionInfo.BEGIN =>
+ get_or_create_tx_ctx(parent, id)
+ ack(info)
+
+ case TransactionInfo.COMMIT_ONE_PHASE =>
+ get_tx_ctx(id).commit {
+ ack(info)
+ }
+
+ case TransactionInfo.ROLLBACK =>
+ get_tx_ctx(id).rollback
+ ack(info)
+
+ case TransactionInfo.END =>
+ die("XA not yet supported")
+ case TransactionInfo.PREPARE =>
+ die("XA not yet supported")
+ case TransactionInfo.COMMIT_TWO_PHASE =>
+ die("XA not yet supported")
+ case TransactionInfo.RECOVER =>
+ die("XA not yet supported")
+ case TransactionInfo.FORGET =>
+ die("XA not yet supported")
+
+ case _ =>
+ fail("Transaction info type unknown: " + info.getType)
+
+ }
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Core message processing
+ ///////////////////////////////////////////////////////////////////
+
+ def on_message(msg: ActiveMQMessage) = {
+ val producer = all_producers.get(msg.getProducerId).getOrElse(die("Producer associated with the message has not been registered."))
+
+ if (msg.getOriginalDestination() == null) {
+ msg.setOriginalDestination(msg.getDestination());
+ }
+
+ if( msg.getTransactionId==null ) {
+ perform_send(msg)
+ } else {
+ get_or_create_tx_ctx(producer.parent.parent, msg.getTransactionId) { (uow)=>
+ perform_send(msg, uow)
+ }
+ }
+ }
+
+ def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
+
+ val destiantion = msg.getDestination.toDestination
+ val key = destiantion.toList
+ producerRoutes.get(key) match {
+ case null =>
+ // create the producer route...
+
+ val route = new DeliveryProducerRoute(host.router) {
+ override def connection = Some(OpenwireProtocolHandler.this.connection)
+ override def dispatch_queue = queue
+ refiller = ^ {
+ resumeRead
+ }
+ }
+
+ // don't process frames until producer is connected...
+ connection.transport.suspendRead
+ reset {
+ val rc = host.router.connect(destiantion, route, security_context)
+ if( rc.failed ) {
+ async_fail(rc.failure, msg)
+ } else {
+ if (!connection.stopped) {
+ resumeRead
+ producerRoutes.put(key, route)
+ send_via_route(route, msg, uow)
+ }
+ }
+ }
+
+ case route =>
+ // we can re-use the existing producer route
+ send_via_route(route, msg, uow)
+
+ }
+ }
+
+ def send_via_route(route:DeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
+ if( !route.targets.isEmpty ) {
+
+ // We may need to add some headers..
+ val delivery = new Delivery
+ delivery.message = new OpenwireMessage(message)
+ delivery.size = message.getSize
+ delivery.uow = uow
+
+ if( message.isResponseRequired ) {
+ delivery.ack = { (consumed, uow) =>
+ dispatchQueue <<| ^{
+ ack(message)
+ }
+ }
+ }
+
+ // routes can always accept at least 1 delivery...
+ assert( !route.full )
+ route.offer(delivery)
+ if( route.full ) {
+ // but once it gets full.. suspend, so that we get more stomp messages
+ // until it's not full anymore.
+ suspendRead("blocked destination: "+route.overflowSessions.mkString(", "))
+ }
+
+ } else {
+ // info("Dropping message. No consumers interested in message.")
+ ack(message)
+ }
+ // message.release
+ }
+
+
+ def on_message_ack(info:MessageAck) = {
+ val consumer = all_consumers.get(info.getConsumerId).getOrElse(die("Cannot ack a message on a consumer that had not been registered."))
+ info.getTransactionId match {
+ case null =>
+ consumer.ack_handler(info)
+ case txid =>
+ get_or_create_tx_ctx(consumer.parent.parent, txid){ (uow)=>
+ consumer.ack_handler(info, uow)
+ }
+ }
+ ack(info)
+ }
+
+ // public Response processAddDestination(DestinationInfo info) throws Exception {
+ // ActiveMQDestination destination = info.getDestination();
+ // if (destination.isTemporary()) {
+ // // Keep track of it so that we can remove them this connection
+ // // shuts down.
+ // temporaryDestinations.add(destination);
+ // }
+ // host.createQueue(destination);
+ // return ack(info);
+ // }
+
+
+ //
+
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/BooleanStream.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/BooleanStream.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/BooleanStream.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/BooleanStream.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.codec;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public final class BooleanStream {
+
+ byte data[] = new byte[48];
+ short arrayLimit;
+ short arrayPos;
+ byte bytePos;
+
+ public boolean readBoolean() throws IOException {
+ assert arrayPos <= arrayLimit;
+ byte b = data[arrayPos];
+ boolean rc = ((b >> bytePos) & 0x01) != 0;
+ bytePos++;
+ if (bytePos >= 8) {
+ bytePos = 0;
+ arrayPos++;
+ }
+ return rc;
+ }
+
+ public void writeBoolean(boolean value) throws IOException {
+ if (bytePos == 0) {
+ arrayLimit++;
+ if (arrayLimit >= data.length) {
+ // re-grow the array.
+ byte d[] = new byte[data.length * 2];
+ System.arraycopy(data, 0, d, 0, data.length);
+ data = d;
+ }
+ }
+ if (value) {
+ data[arrayPos] |= 0x01 << bytePos;
+ }
+ bytePos++;
+ if (bytePos >= 8) {
+ bytePos = 0;
+ arrayPos++;
+ }
+ }
+
+ public void marshal(DataOutput dataOut) throws IOException {
+ if (arrayLimit < 64) {
+ dataOut.writeByte(arrayLimit);
+ } else if (arrayLimit < 256) { // max value of unsigned byte
+ dataOut.writeByte(0xC0);
+ dataOut.writeByte(arrayLimit);
+ } else {
+ dataOut.writeByte(0x80);
+ dataOut.writeShort(arrayLimit);
+ }
+
+ dataOut.write(data, 0, arrayLimit);
+ clear();
+ }
+
+ public void marshal(ByteBuffer dataOut) {
+ if (arrayLimit < 64) {
+ dataOut.put((byte)arrayLimit);
+ } else if (arrayLimit < 256) { // max value of unsigned byte
+ dataOut.put((byte)0xC0);
+ dataOut.put((byte)arrayLimit);
+ } else {
+ dataOut.put((byte)0x80);
+ dataOut.putShort(arrayLimit);
+ }
+
+ dataOut.put(data, 0, arrayLimit);
+ }
+
+ public void unmarshal(DataInput dataIn) throws IOException {
+
+ arrayLimit = (short)(dataIn.readByte() & 0xFF);
+ if (arrayLimit == 0xC0) {
+ arrayLimit = (short)(dataIn.readByte() & 0xFF);
+ } else if (arrayLimit == 0x80) {
+ arrayLimit = dataIn.readShort();
+ }
+ if (data.length < arrayLimit) {
+ data = new byte[arrayLimit];
+ }
+ dataIn.readFully(data, 0, arrayLimit);
+ clear();
+ }
+
+ public void clear() {
+ arrayPos = 0;
+ bytePos = 0;
+ }
+
+ public int marshalledSize() {
+ if (arrayLimit < 64) {
+ return 1 + arrayLimit;
+ } else if (arrayLimit < 256) {
+ return 2 + arrayLimit;
+ } else {
+ return 3 + arrayLimit;
+ }
+ }
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/BooleanStream.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/CommandIdComparator.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/CommandIdComparator.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/CommandIdComparator.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/CommandIdComparator.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.codec;
+
+import java.util.Comparator;
+
+import org.apache.activemq.apollo.openwire.command.Command;
+
+/**
+ * A @{link Comparator} of commands using their {@link Command#getCommandId()}
+ *
+ */
+public class CommandIdComparator implements Comparator<Command> {
+
+ public int compare(Command c1, Command c2) {
+ return c1.getCommandId() - c2.getCommandId();
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/DataStreamMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/DataStreamMarshaller.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/DataStreamMarshaller.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/DataStreamMarshaller.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.codec;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.apollo.openwire.command.DataStructure;
+
+public interface DataStreamMarshaller {
+
+ byte getDataStructureType();
+ DataStructure createObject();
+
+ int tightMarshal1(OpenWireFormat format, Object c, BooleanStream bs) throws IOException;
+ void tightMarshal2(OpenWireFormat format, Object c, DataOutput ds, BooleanStream bs) throws IOException;
+ void tightUnmarshal(OpenWireFormat format, Object data, DataInput dis, BooleanStream bs) throws IOException;
+
+ void looseMarshal(OpenWireFormat format, Object c, DataOutput ds) throws IOException;
+ void looseUnmarshal(OpenWireFormat format, Object data, DataInput dis) throws IOException;
+
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/DataStreamMarshaller.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/OpenWireFormat.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/OpenWireFormat.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/OpenWireFormat.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,635 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.codec;
+
+import org.apache.activemq.apollo.openwire.command.CommandTypes;
+import org.apache.activemq.apollo.openwire.command.DataStructure;
+import org.apache.activemq.apollo.openwire.command.WireFormatInfo;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.BufferEditor;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.channels.ReadableByteChannel;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public final class OpenWireFormat {
+
+ public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION;
+ public static final String WIREFORMAT_NAME = "openwire";
+
+ static final byte NULL_TYPE = CommandTypes.NULL;
+ private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
+ private static final int MARSHAL_CACHE_FREE_SPACE = 100;
+
+ private DataStreamMarshaller dataMarshallers[];
+ private int version;
+ private boolean stackTraceEnabled;
+ private boolean tcpNoDelayEnabled;
+ private boolean cacheEnabled;
+ private boolean tightEncodingEnabled;
+ private boolean sizePrefixDisabled;
+
+ // The following fields are used for value caching
+ private short nextMarshallCacheIndex;
+ private short nextMarshallCacheEvictionIndex;
+ private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
+ private DataStructure marshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
+ private DataStructure unmarshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
+ private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
+ private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
+
+ private AtomicBoolean receivingMessage = new AtomicBoolean(false);
+
+ public OpenWireFormat() {
+ this(DEFAULT_VERSION);
+ }
+
+ public OpenWireFormat(int i) {
+ setVersion(i);
+ }
+
+ public int hashCode() {
+ return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) ^ (stackTraceEnabled ? 0x01000000 : 0x02000000) ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
+ ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
+ }
+
+ public OpenWireFormat copy() {
+ OpenWireFormat answer = new OpenWireFormat();
+ answer.version = version;
+ answer.stackTraceEnabled = stackTraceEnabled;
+ answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
+ answer.cacheEnabled = cacheEnabled;
+ answer.tightEncodingEnabled = tightEncodingEnabled;
+ answer.sizePrefixDisabled = sizePrefixDisabled;
+ return answer;
+ }
+
+ public boolean equals(Object object) {
+ if (object == null) {
+ return false;
+ }
+ OpenWireFormat o = (OpenWireFormat) object;
+ return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
+ && o.sizePrefixDisabled == sizePrefixDisabled;
+ }
+
+ public String toString() {
+ return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" + tightEncodingEnabled
+ + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
+ // return "OpenWireFormat{id="+id+",
+ // tightEncodingEnabled="+tightEncodingEnabled+"}";
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public String getName() {
+ return WIREFORMAT_NAME;
+ }
+
+ public synchronized Buffer marshal(Object command) throws IOException {
+
+ if (cacheEnabled) {
+ runMarshallCacheEvictionSweep();
+ }
+
+ // MarshallAware ma = null;
+ // // If not using value caching, then the marshaled form is always the
+ // // same
+ // if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
+ // ma = (MarshallAware)command;
+ // }
+
+ Buffer sequence = null;
+ // if( ma!=null ) {
+ // sequence = ma.getCachedMarshalledForm(this);
+ // }
+
+ if (sequence == null) {
+
+ int size = 1;
+ if (command != null) {
+
+ DataStructure c = (DataStructure) command;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ if (tightEncodingEnabled) {
+
+ BooleanStream bs = new BooleanStream();
+ size += dsm.tightMarshal1(this, c, bs);
+ size += bs.marshalledSize();
+
+ bytesOut.restart(size);
+ if (!sizePrefixDisabled) {
+ bytesOut.writeInt(size);
+ }
+ bytesOut.writeByte(type);
+ bs.marshal(bytesOut);
+ dsm.tightMarshal2(this, c, bytesOut, bs);
+ sequence = bytesOut.toBuffer();
+
+ } else {
+ bytesOut.restart();
+ if (!sizePrefixDisabled) {
+ bytesOut.writeInt(0); // we don't know the final size
+ // yet but write this here for
+ // now.
+ }
+ bytesOut.writeByte(type);
+ dsm.looseMarshal(this, c, bytesOut);
+ sequence = bytesOut.toBuffer();
+
+ if (!sizePrefixDisabled) {
+ size = sequence.getLength() - 4;
+ int pos = sequence.offset;
+ sequence.offset = 0;
+ BufferEditor.big(sequence).writeInt(size);
+ sequence.offset = pos;
+ }
+ }
+
+ } else {
+ bytesOut.restart(5);
+ bytesOut.writeInt(size);
+ bytesOut.writeByte(NULL_TYPE);
+ sequence = bytesOut.toBuffer();
+ }
+
+ // if( ma!=null ) {
+ // ma.setCachedMarshalledForm(this, sequence);
+ // }
+ }
+ return sequence;
+ }
+
+ public synchronized Object unmarshal(Buffer sequence) throws IOException {
+ bytesIn.restart(sequence);
+ // DataInputStream dis = new DataInputStream(new
+ // ByteArrayInputStream(sequence));
+
+ if (!sizePrefixDisabled) {
+ int size = bytesIn.readInt();
+ if (sequence.getLength() - 4 != size) {
+ // throw new IOException("Packet size does not match marshaled
+ // size");
+ }
+ }
+
+ Object command = doUnmarshal(bytesIn);
+ // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
+ // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
+ // }
+ return command;
+ }
+
+ public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
+
+ if (cacheEnabled) {
+ runMarshallCacheEvictionSweep();
+ }
+
+ int size = 1;
+ if (o != null) {
+
+ DataStructure c = (DataStructure) o;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ if (tightEncodingEnabled) {
+ BooleanStream bs = new BooleanStream();
+ size += dsm.tightMarshal1(this, c, bs);
+ size += bs.marshalledSize();
+
+ if (!sizePrefixDisabled) {
+ dataOut.writeInt(size);
+ }
+
+ dataOut.writeByte(type);
+ bs.marshal(dataOut);
+ dsm.tightMarshal2(this, c, dataOut, bs);
+
+ } else {
+ DataOutput looseOut = dataOut;
+
+ if (!sizePrefixDisabled) {
+ bytesOut.restart();
+ looseOut = bytesOut;
+ }
+
+ looseOut.writeByte(type);
+ dsm.looseMarshal(this, c, looseOut);
+
+ if (!sizePrefixDisabled) {
+ Buffer sequence = bytesOut.toBuffer();
+ dataOut.writeInt(sequence.getLength());
+ dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+ }
+
+ }
+
+ } else {
+ if (!sizePrefixDisabled) {
+ dataOut.writeInt(size);
+ }
+ dataOut.writeByte(NULL_TYPE);
+ }
+ }
+
+ public Object unmarshal(DataInput dis) throws IOException {
+ DataInput dataIn = dis;
+ if (!sizePrefixDisabled) {
+ dis.readInt();
+ // int size = dis.readInt();
+ // byte[] data = new byte[size];
+ // dis.readFully(data);
+ // bytesIn.restart(data);
+ // dataIn = bytesIn;
+ }
+ return doUnmarshal(dataIn);
+ }
+
+ public Object unmarshal(ReadableByteChannel channel) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Used by NIO or AIO transports
+ */
+ public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
+ int size = 1;
+ if (o != null) {
+ DataStructure c = (DataStructure) o;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+
+ size += dsm.tightMarshal1(this, c, bs);
+ size += bs.marshalledSize();
+ }
+ return size;
+ }
+
+ /**
+ * Used by NIO or AIO transports; note that the size is not written as part
+ * of this method.
+ */
+ public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
+ if (cacheEnabled) {
+ runMarshallCacheEvictionSweep();
+ }
+
+ if (o != null) {
+ DataStructure c = (DataStructure) o;
+ byte type = c.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ ds.writeByte(type);
+ bs.marshal(ds);
+ dsm.tightMarshal2(this, c, ds, bs);
+ }
+ }
+
+ /**
+ * Allows you to dynamically switch the version of the openwire protocol
+ * being used.
+ *
+ * @param version
+ */
+ public void setVersion(int version) {
+ String mfName = getClass().getPackage().getName()+".v" + version + ".MarshallerFactory";
+ Class mfClass;
+ try {
+ mfClass = Class.forName(mfName, false, getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw (IllegalArgumentException) new IllegalArgumentException("Invalid version: " + version + ", could not load " + mfName).initCause(e);
+ }
+ try {
+ Method method = mfClass.getMethod("createMarshallerMap", new Class[] { OpenWireFormat.class });
+ dataMarshallers = (DataStreamMarshaller[]) method.invoke(null, new Object[] { this });
+ } catch (Throwable e) {
+ throw (IllegalArgumentException) new IllegalArgumentException("Invalid version: " + version + ", " + mfName + " does not properly implement the createMarshallerMap method.").initCause(e);
+ }
+ this.version = version;
+ }
+
+ public Object doUnmarshal(DataInput dis) throws IOException {
+ byte dataType = dis.readByte();
+ receivingMessage.set(true);
+ if (dataType != NULL_TYPE) {
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + dataType);
+ }
+ Object data = dsm.createObject();
+ if (this.tightEncodingEnabled) {
+ BooleanStream bs = new BooleanStream();
+ bs.unmarshal(dis);
+ dsm.tightUnmarshal(this, data, dis, bs);
+ } else {
+ dsm.looseUnmarshal(this, data, dis);
+ }
+ receivingMessage.set(false);
+ return data;
+ } else {
+ receivingMessage.set(false);
+ return null;
+ }
+ }
+
+ // public void debug(String msg) {
+ // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
+ // System.out.println(t+": "+msg);
+ // }
+ public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
+ bs.writeBoolean(o != null);
+ if (o == null) {
+ return 0;
+ }
+
+ if (o.isMarshallAware()) {
+ // MarshallAware ma = (MarshallAware)o;
+ Buffer sequence = null;
+ // sequence=ma.getCachedMarshalledForm(this);
+ bs.writeBoolean(sequence != null);
+ if (sequence != null) {
+ return 1 + sequence.getLength();
+ }
+ }
+
+ byte type = o.getDataStructureType();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ return 1 + dsm.tightMarshal1(this, o, bs);
+ }
+
+ public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs) throws IOException {
+ if (!bs.readBoolean()) {
+ return;
+ }
+
+ byte type = o.getDataStructureType();
+ ds.writeByte(type);
+
+ if (o.isMarshallAware() && bs.readBoolean()) {
+ // We should not be doing any caching
+ throw new IOException("Corrupted stream");
+ } else {
+
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ dsm.tightMarshal2(this, o, ds, bs);
+
+ }
+ }
+
+ public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
+ if (bs.readBoolean()) {
+
+ byte dataType = dis.readByte();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + dataType);
+ }
+ DataStructure data = dsm.createObject();
+
+ if (data.isMarshallAware() && bs.readBoolean()) {
+
+ dis.readInt();
+ dis.readByte();
+
+ BooleanStream bs2 = new BooleanStream();
+ bs2.unmarshal(dis);
+ dsm.tightUnmarshal(this, data, dis, bs2);
+
+ // TODO: extract the sequence from the dis and associate it.
+ // MarshallAware ma = (MarshallAware)data
+ // ma.setCachedMarshalledForm(this, sequence);
+
+ } else {
+ dsm.tightUnmarshal(this, data, dis, bs);
+ }
+
+ return data;
+ } else {
+ return null;
+ }
+ }
+
+ public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
+ if (dis.readBoolean()) {
+
+ byte dataType = dis.readByte();
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + dataType);
+ }
+ DataStructure data = dsm.createObject();
+ dsm.looseUnmarshal(this, data, dis);
+ return data;
+
+ } else {
+ return null;
+ }
+ }
+
+ public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
+ dataOut.writeBoolean(o != null);
+ if (o != null) {
+ byte type = o.getDataStructureType();
+ dataOut.writeByte(type);
+ DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
+ if (dsm == null) {
+ throw new IOException("Unknown data type: " + type);
+ }
+ dsm.looseMarshal(this, o, dataOut);
+ }
+ }
+
+ public void runMarshallCacheEvictionSweep() {
+ // Do we need to start evicting??
+ while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
+
+ marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
+ marshallCache[nextMarshallCacheEvictionIndex] = null;
+
+ nextMarshallCacheEvictionIndex++;
+ if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
+ nextMarshallCacheEvictionIndex = 0;
+ }
+
+ }
+ }
+
+ public Short getMarshallCacheIndex(DataStructure o) {
+ return marshallCacheMap.get(o);
+ }
+
+ public Short addToMarshallCache(DataStructure o) {
+ short i = nextMarshallCacheIndex++;
+ if (nextMarshallCacheIndex >= marshallCache.length) {
+ nextMarshallCacheIndex = 0;
+ }
+
+ // We can only cache that item if there is space left.
+ if (marshallCacheMap.size() < marshallCache.length) {
+ marshallCache[i] = o;
+ Short index = new Short(i);
+ marshallCacheMap.put(o, index);
+ return index;
+ } else {
+ // Use -1 to indicate that the value was not cached due to cache
+ // being full.
+ return new Short((short) -1);
+ }
+ }
+
+ public void setInUnmarshallCache(short index, DataStructure o) {
+
+ // There was no space left in the cache, so we can't
+ // put this in the cache.
+ if (index == -1) {
+ return;
+ }
+
+ unmarshallCache[index] = o;
+ }
+
+ public DataStructure getFromUnmarshallCache(short index) {
+ return unmarshallCache[index];
+ }
+
+ public void setStackTraceEnabled(boolean b) {
+ stackTraceEnabled = b;
+ }
+
+ public boolean isStackTraceEnabled() {
+ return stackTraceEnabled;
+ }
+
+ public boolean isTcpNoDelayEnabled() {
+ return tcpNoDelayEnabled;
+ }
+
+ public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
+ this.tcpNoDelayEnabled = tcpNoDelayEnabled;
+ }
+
+ public boolean isCacheEnabled() {
+ return cacheEnabled;
+ }
+
+ public void setCacheEnabled(boolean cacheEnabled) {
+ this.cacheEnabled = cacheEnabled;
+ }
+
+ public boolean isTightEncodingEnabled() {
+ return tightEncodingEnabled;
+ }
+
+ public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
+ this.tightEncodingEnabled = tightEncodingEnabled;
+ }
+
+ public boolean isSizePrefixDisabled() {
+ return sizePrefixDisabled;
+ }
+
+ public void setSizePrefixDisabled(boolean prefixPacketSize) {
+ this.sizePrefixDisabled = prefixPacketSize;
+ }
+
+
+ public void renegotiateWireFormat(WireFormatInfo info, WireFormatInfo preferedWireFormatInfo) throws IOException {
+
+ if (preferedWireFormatInfo == null || info==null ) {
+ throw new IllegalStateException("Wireformat cannot not be renegotiated.");
+ }
+
+ this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
+ info.setVersion(this.getVersion());
+
+ this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
+ info.setStackTraceEnabled(this.stackTraceEnabled);
+
+ this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
+ info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
+
+ this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
+ info.setCacheEnabled(this.cacheEnabled);
+
+ this.tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo.isTightEncodingEnabled();
+ info.setTightEncodingEnabled(this.tightEncodingEnabled);
+
+ this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled();
+ info.setSizePrefixDisabled(this.sizePrefixDisabled);
+
+ if (cacheEnabled) {
+
+ int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
+ info.setCacheSize(size);
+
+ if (size == 0) {
+ size = MARSHAL_CACHE_SIZE;
+ }
+
+ marshallCache = new DataStructure[size];
+ unmarshallCache = new DataStructure[size];
+ nextMarshallCacheIndex = 0;
+ nextMarshallCacheEvictionIndex = 0;
+ marshallCacheMap = new HashMap<DataStructure, Short>();
+ } else {
+ marshallCache = null;
+ unmarshallCache = null;
+ nextMarshallCacheIndex = 0;
+ nextMarshallCacheEvictionIndex = 0;
+ marshallCacheMap = null;
+ }
+
+ }
+
+ protected int min(int version1, int version2) {
+ if (version1 < version2 && version1 > 0 || version2 <= 0) {
+ return version1;
+ }
+ return version2;
+ }
+
+}