You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:00:32 UTC
svn commit: r961119 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/
activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/
activemq...
Author: chirino
Date: Wed Jul 7 04:00:31 2010
New Revision: 961119
URL: http://svn.apache.org/viewvc?rev=961119&view=rev
Log:
Added a Protocol class to help marshall/unmarshall protocol messages from storage
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols
- copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols
- copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala
- copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/QueueDescriptor.java
- copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/Store.java
- copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java
- copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/StoreFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/memory/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/memory/MemoryStore.java
- copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java
- copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java
- copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
- copied, changed from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/StoreFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols Wed Jul 7 04:00:31 2010
@@ -1,17 +1,17 @@
-## ---------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements. See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License. You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ---------------------------------------------------------------------------
-class=org.apache.activemq.apollo.broker.MultiProtocolHandler
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.broker.protocol.MultiProtocol
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 04:00:31 2010
@@ -27,6 +27,7 @@ import _root_.org.apache.activemq.wirefo
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import java.util.concurrent.atomic.AtomicLong
import org.fusesource.hawtdispatch.Dispatch
+import protocol.{ProtocolFactory, ProtocolHandler}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -95,7 +96,7 @@ class BrokerConnection(val connector: Co
override protected def _start(onCompleted:Runnable) = {
connector.dispatchQueue.retain
- protocolHandler = ProtocolHandlerFactory.createProtocolHandler(protocol)
+ protocolHandler = ProtocolFactory.get(protocol).createProtocolHandler
protocolHandler.setConnection(this);
super._start(onCompleted)
}
@@ -132,70 +133,6 @@ class BrokerConnection(val connector: Co
*/
class ProtocolException(message:String, e:Throwable=null) extends Exception(message, e)
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class MultiProtocolHandler extends ProtocolHandler {
-
- var connected = false
-
- override def onTransportCommand(command:Any) = {
-
- if (!command.isInstanceOf[WireFormat]) {
- throw new ProtocolException("First command should be a WireFormat");
- }
-
- var wireformat:WireFormat = command.asInstanceOf[WireFormat];
- val protocol = wireformat.getName()
- val protocolHandler = try {
- // Create the new protocol handler..
- ProtocolHandlerFactory.createProtocolHandler(protocol);
- } catch {
- case e:Exception=>
- throw new ProtocolException("No protocol handler available for protocol: " + protocol, e);
- }
- protocolHandler.setConnection(connection);
-
- // replace the current handler with the new one.
- connection.protocol = protocol
- connection.protocolHandler = protocolHandler
- connection.transport.suspendRead
- protocolHandler.onTransportConnected
- }
-
- override def onTransportConnected = {
- connection.transport.resumeRead
- }
-
-}
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object ProtocolHandlerFactory {
- val PROTOCOL_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/apollo/broker/protocol/");
-
- def createProtocolHandler(protocol:String) = {
- PROTOCOL_HANDLER_FINDER.newInstance(protocol).asInstanceOf[ProtocolHandler]
- }
-}
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait ProtocolHandler extends DefaultTransportListener {
-
- var connection:BrokerConnection = null;
-
- def setConnection(brokerConnection:BrokerConnection) = {
- this.connection = brokerConnection
- }
-
- override def onTransportFailure(error:IOException) = {
- connection.stop()
- }
-
-}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 04:00:31 2010
@@ -20,6 +20,8 @@ import _root_.org.apache.activemq.filter
import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch._
import org.fusesource.hawtbuf._
+import org.apache.activemq.broker.store.StoreTransaction
+
/**
* A producer which sends Delivery objects to a delivery consumer.
*
@@ -110,7 +112,7 @@ trait Message {
/**
* The protocol encoding of the message.
*/
- def protocol:String
+ def protocol:AsciiBuffer
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:00:31 2010
@@ -23,6 +23,8 @@ import org.fusesource.hawtdispatch.{Scal
import org.apache.activemq.util.TreeMap.TreeEntry
import java.util.{Collections, ArrayList, LinkedList}
import org.apache.activemq.util.list.{LinkedNodeList, LinkedNode}
+import org.apache.activemq.broker.store.{StoredMessage, StoreTransaction}
+import protocol.ProtocolFactory
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -76,7 +78,7 @@ class Queue(val host: VirtualHost, val d
ack_source.setEventHandler(^ {drain_acks});
ack_source.resume
- val store_load_source = createSource(new ListEventAggregator[(QueueEntry, Delivery)](), dispatchQueue)
+ val store_load_source = createSource(new ListEventAggregator[(QueueEntry, StoredMessage)](), dispatchQueue)
store_load_source.setEventHandler(^ {drain_store_loads});
store_load_source.resume
@@ -374,8 +376,16 @@ class Queue(val host: VirtualHost, val d
var ref = loaded.delivery.storeId
if( ref == -1 ) {
val tx = host.database.createStoreTransaction
- tx.store(loaded.delivery)
- tx.enqueue(storeId, entry.seq, loaded.delivery.storeId)
+
+ val message = loaded.delivery.message
+ val sm = new StoredMessage
+ sm.protocol = message.protocol
+ sm.value = ProtocolFactory.get(message.protocol).encode(message)
+ sm.size = loaded.size
+
+ tx.store(sm)
+ loaded.delivery.storeId = sm.id
+ tx.enqueue(storeId, entry.seq, sm.id)
tx.release
}
flushingSize += entry.value.size
@@ -394,7 +404,15 @@ class Queue(val host: VirtualHost, val d
data.foreach { event =>
val entry = event._1
- entry.loaded(event._2)
+ val stored = event._2
+
+ val delivery = new Delivery()
+ delivery.message = ProtocolFactory.get(stored.protocol).decode(stored.value)
+ delivery.size = stored.size
+ delivery.storeId = stored.id
+
+ entry.loaded(delivery)
+
size += entry.value.size
if( entry.hasSubs ) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:00:31 2010
@@ -18,7 +18,6 @@ package org.apache.activemq.apollo.broke
import _root_.java.util.{ArrayList, HashMap}
import _root_.org.apache.activemq.broker.store.memory.MemoryStore
-import _root_.org.apache.activemq.broker.store.{Store}
import _root_.org.apache.activemq.Service
import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue}
@@ -30,6 +29,9 @@ import org.apache.activemq.apollo.dto.Vi
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import ReporterLevel._
+import org.apache.activemq.apollo.store.memory.MemoryBrokerDatabase
+import org.apache.activemq.broker.store.{StoredQueue, BrokerDatabase, Store}
+import org.fusesource.hawtbuf.proto.WireFormat
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -86,9 +88,11 @@ class VirtualHost(val broker: Broker) ex
this.names = names.toList
}
- var database:BrokerDatabase = new MemoryBrokerDatabase(this)
+ var database:BrokerDatabase = new MemoryBrokerDatabase()
var transactionManager:TransactionManagerX = new TransactionManagerX
+ var protocols = Map[AsciiBuffer, WireFormat]()
+
override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
/**
@@ -117,7 +121,7 @@ class VirtualHost(val broker: Broker) ex
x match {
case Some(info)=>
dispatchQueue ^{
- val dest = DestinationParser.parse(info.record.name, destination_parser_options)
+ val dest = DestinationParser.parse(info.name, destination_parser_options)
if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
val queue = new Queue(this, dest, id)
@@ -126,7 +130,7 @@ class VirtualHost(val broker: Broker) ex
queue.message_seq_counter = info.last+1
queue.count = info.count
- queues.put(info.record.name, queue)
+ queues.put(info.name, queue)
}
}
case _ =>
@@ -180,7 +184,8 @@ class VirtualHost(val broker: Broker) ex
def addQueue(dest:Destination)(cb: (Queue)=>Unit ) = ^{
val name = DestinationParser.toBuffer(dest, destination_parser_options)
- val record = QueueRecord(0, name, null, null)
+ val record = new StoredQueue
+ record.name = name
database.addQueue(record) { rc =>
rc match {
case Some(id) =>
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala?rev=961119&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala Wed Jul 7 04:00:31 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import org.apache.activemq.apollo.broker.{Message, ProtocolException}
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
+import org.apache.activemq.wireformat.{MultiWireFormatFactory, WireFormat}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MultiProtocol extends Protocol {
+
+ val wff = new MultiWireFormatFactory
+
+ def name = new AsciiBuffer("multi")
+
+ def createWireFormat = wff.createWireFormat
+ def createProtocolHandler = new MultiProtocolHandler
+
+ def encode(message: Message) = throw new UnsupportedOperationException
+ def decode(message: Buffer) = throw new UnsupportedOperationException
+}
+
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MultiProtocolHandler extends ProtocolHandler {
+
+ var connected = false
+
+ override def onTransportCommand(command:Any) = {
+
+ if (!command.isInstanceOf[WireFormat]) {
+ throw new ProtocolException("First command should be a WireFormat");
+ }
+
+ var wireformat:WireFormat = command.asInstanceOf[WireFormat];
+ val protocol = wireformat.getName()
+ val protocolHandler = try {
+ // Create the new protocol handler..
+ ProtocolFactory.get(protocol).createProtocolHandler
+ } catch {
+ case e:Exception=>
+ throw new ProtocolException("No protocol handler available for protocol: " + protocol, e);
+ }
+ protocolHandler.setConnection(connection);
+
+ // replace the current handler with the new one.
+ connection.protocol = protocol
+ connection.protocolHandler = protocolHandler
+ connection.transport.suspendRead
+ protocolHandler.onTransportConnected
+ }
+
+ override def onTransportConnected = {
+ connection.transport.resumeRead
+ }
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=961119&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala Wed Jul 7 04:00:31 2010
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import java.util.Properties
+import java.net.{URLClassLoader, URL}
+import org.apache.activemq.transport.DefaultTransportListener
+import java.io.{IOException, File, InputStream}
+import org.apache.activemq.apollo.broker.{Message, BrokerConnection}
+import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
+import org.apache.activemq.wireformat.WireFormat
+
+
+/**
+ * <p>
+ * Used to discover classes using the META-INF discovery trick.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class ClassFinder[T](path:String, loaders:Seq[ClassLoader]=Thread.currentThread.getContextClassLoader::Nil) {
+
+ def find(): List[Class[T]] = {
+ var classes = List[Class[T]]()
+ loaders.foreach { loader=>
+
+ val resources = loader.getResources(path)
+ var classNames: List[String] = Nil
+ while(resources.hasMoreElements) {
+ val url = resources.nextElement;
+ val p = loadProperties(url.openStream)
+ val enum = p.keys
+ while (enum.hasMoreElements) {
+ classNames = classNames ::: enum.nextElement.asInstanceOf[String] :: Nil
+ }
+ }
+ classNames = classNames.removeDuplicates
+
+ classes :::= classNames.map { name=>
+ loader.loadClass(name).asInstanceOf[Class[T]]
+ }
+
+ }
+
+ return classes.removeDuplicates
+ }
+
+ private def loadProperties(is:InputStream):Properties = {
+ if( is==null ) {
+ return null;
+ }
+ try {
+ val p = new Properties()
+ p.load(is);
+ return p
+ } catch {
+ case e:Exception =>
+ return null
+ } finally {
+ try {
+ is.close()
+ } catch {
+ case _ =>
+ }
+ }
+ }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ProtocolFactory {
+
+ val finder = ClassFinder[Protocol]("META-INF/services/org.apache.activemq.apollo/protocols")
+ var protocols = Map[AsciiBuffer, Protocol]()
+
+ finder.find.foreach{ clazz =>
+ try {
+
+ val protocol = clazz.newInstance.asInstanceOf[Protocol]
+ protocols += protocol.name -> protocol
+
+ } catch {
+ case e:Throwable =>
+ e.printStackTrace
+ }
+ }
+
+ def get(name:String):Protocol = get(new AsciiBuffer(name))
+
+ def get(name:AsciiBuffer):Protocol = {
+ protocols.get(name) match {
+ case None =>
+ throw new IllegalArgumentException("Protocol not found: "+name)
+ case Some(x) => x
+ }
+ }
+
+}
+
+trait Protocol {
+
+ def name:AsciiBuffer
+
+ def createWireFormat:WireFormat
+ def createProtocolHandler:ProtocolHandler
+
+ def encode(message:Message):Buffer
+ def decode(message:Buffer):Message
+
+}
+
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait ProtocolHandler extends DefaultTransportListener {
+
+ var connection:BrokerConnection = null;
+
+ def setConnection(brokerConnection:BrokerConnection) = {
+ this.connection = brokerConnection
+ }
+
+ override def onTransportFailure(error:IOException) = {
+ connection.stop()
+ }
+
+}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocols Wed Jul 7 04:00:31 2010
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-class=org.apache.activemq.apollo.stomp.StompProtocolHandler
\ No newline at end of file
+org.apache.activemq.apollo.stomp.StompProtocol
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 04:00:31 2010
@@ -24,6 +24,7 @@ import _root_.org.fusesource.hawtdispatc
import AsciiBuffer._
import org.apache.activemq.apollo.broker._
+import protocol.{Protocol, ProtocolHandler}
import Stomp._
import BufferConversions._
import StompFrameConstants._
@@ -49,6 +50,27 @@ object StompConstants {
}
+class StompProtocol extends Protocol {
+ val wff = new StompWireFormatFactory
+
+ def name = new AsciiBuffer("stomp")
+
+ def createWireFormat = wff.createWireFormat
+
+ def createProtocolHandler = new StompProtocolHandler
+
+ def encode(message: Message) = {
+ val sfm = message.asInstanceOf[StompFrameMessage]
+ createWireFormat.marshal(sfm.frame)
+ }
+
+ def decode(message: Buffer) = {
+ val frame = createWireFormat.unmarshal(message).asInstanceOf[StompFrame]
+ StompFrameMessage(frame)
+ }
+
+}
+
import StompConstants._
object StompProtocolHandler extends Log
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 04:00:31 2010
@@ -86,6 +86,14 @@ class StompWireFormat extends WireFormat
os.toBuffer
}
+ def unmarshal(packet:Buffer):AnyRef = {
+ unmarshal(new DataByteArrayInputStream(packet))
+ }
+
+ def unmarshal(in: DataInput):AnyRef = {
+ throw new UnsupportedOperationException
+ }
+
def marshal(frame:StompFrame, os:DataOutput) = {
frame.action.writeTo(os)
os.write(NEWLINE)
@@ -113,13 +121,6 @@ class StompWireFormat extends WireFormat
END_OF_FRAME_BUFFER.writeTo(os)
}
- def unmarshal(packet:Buffer) = {
- throw new UnsupportedOperationException
- }
- def unmarshal(in: DataInput):Object = {
- throw new UnsupportedOperationException
- }
-
def getName() = "stomp"
//
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml?rev=961119&r1=961118&r2=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/pom.xml Wed Jul 7 04:00:31 2010
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-project</artifactId>
+ <artifactId>activemq-scala</artifactId>
<version>6.0-SNAPSHOT</version>
</parent>
@@ -43,9 +43,35 @@
<artifactId>hawtbuf-core</artifactId>
<version>${hawtbuf-version}</version>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.hawtdispatch</groupId>
+ <artifactId>hawtdispatch-scala</artifactId>
+ <version>${hawtdispatch-version}</version>
+ </dependency>
+
+ <!-- Scala Support -->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>compile</scope>
+ <version>${scala-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala-version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
<!-- Testing Dependencies -->
<dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest</artifactId>
+ <version>${scalatest-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
Added: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala?rev=961119&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala Wed Jul 7 04:00:31 2010
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store
+
+import _root_.java.lang.{String}
+import org.fusesource.hawtbuf._
+import org.apache.activemq.Service
+import org.fusesource.hawtdispatch.{Retained}
+
+class StoredQueue {
+ var id:Long = -1
+ var name:AsciiBuffer = null
+ var parent:AsciiBuffer = null
+ var config:String = null
+ var first:Long = -1
+ var last:Long = -1
+ var count:Int = 0
+}
+
+class StoredMessage {
+ var id:Long = -1
+ var protocol:AsciiBuffer = null
+ var value:Buffer = null
+ var size:Int = 0
+}
+
+/**
+ * A StoreTransaction is used to perform persistent
+ * operations as unit of work.
+ *
+ * The disposer assigned to the store transaction will
+ * be executed once all associated persistent operations
+ * have been persisted.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait StoreTransaction extends Retained {
+
+ /**
+ * Assigns the delivery a store id if it did not already
+ * have one assigned.
+ */
+ def store(delivery:StoredMessage)
+
+ /**
+ * Adds a delivery to a specified queue at a the specified position in the queue.
+ */
+ def enqueue(queue:Long, seq:Long, msg:Long)
+
+ /**
+ * Removes a delivery from a specified queue at a the specified position in the queue.
+ */
+ def dequeue(queue:Long, seq:Long, msg:Long)
+
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait BrokerDatabase extends Service {
+
+
+ /**
+ * Stores a queue, calls back with a unquie id for the stored queue.
+ */
+ def addQueue(record:StoredQueue)(cb:(Option[Long])=>Unit):Unit
+
+ /**
+ * Loads the queue information for a given queue id.
+ */
+ def getQueueInfo(id:Long)(cb:(Option[StoredQueue])=>Unit )
+
+ /**
+ * gets a listing of all queues previously added.
+ */
+ def listQueues(cb: (Seq[Long])=>Unit )
+
+ /**
+ * Removes a the delivery associated with the provided from any
+ * internal buffers/caches. The callback is executed once, the message is
+ * no longer buffered.
+ */
+ def flushDelivery(id:Long)(cb: =>Unit)
+
+ /**
+ * Loads a delivery with the associated id from persistent storage.
+ */
+ def loadDelivery(id:Long)(cb:(Option[StoredMessage])=>Unit )
+
+ /**
+ * Creates a StoreTransaction which is used to perform persistent
+ * operations as unit of work.
+ */
+ def createStoreTransaction():StoreTransaction
+
+}
+
Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/memory/MemoryBrokerDatabase.scala Wed Jul 7 04:00:31 2010
@@ -14,7 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.broker
+package org.apache.activemq.apollo.store.memory
+
import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
@@ -26,81 +27,15 @@ import java.util.{ArrayList, HashSet}
import collection.mutable.HashMap
import org.apache.activemq.Service
import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained, Retained}
-
-case class QueueRecord(val id:Long, val name:AsciiBuffer, val parent:AsciiBuffer, val config:String)
-case class QueueInfo(record:QueueRecord, first:Long, last:Long, count:Int)
+import org.apache.activemq.apollo.util.BaseService
+import org.apache.activemq.broker.store._
/**
- * A StoreTransaction is used to perform persistent
- * operations as unit of work.
- *
- * The disposer assigned to the store transaction will
- * be executed once all associated persistent operations
- * have been persisted.
+ * <p>
+ * </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait StoreTransaction extends Retained {
-
- /**
- * Assigns the delivery a store id if it did not already
- * have one assigned.
- */
- def store(delivery:Delivery)
-
- /**
- * Adds a delivery to a specified queue at a the specified position in the queue.
- */
- def enqueue(queue:Long, seq:Long, msg:Long)
-
- /**
- * Removes a delivery from a specified queue at a the specified position in the queue.
- */
- def dequeue(queue:Long, seq:Long, msg:Long)
-
-}
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait BrokerDatabase extends Service {
-
-
- /**
- * Stores a queue, calls back with a unquie id for the stored queue.
- */
- def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit
-
- /**
- * Loads the queue information for a given queue id.
- */
- def getQueueInfo(id:Long)(cb:(Option[QueueInfo])=>Unit )
-
- /**
- * gets a listing of all queues previously added.
- */
- def listQueues(cb: (Seq[Long])=>Unit )
-
- /**
- * Removes a the delivery associated with the provided from any
- * internal buffers/caches. The callback is executed once, the message is
- * no longer buffered.
- */
- def flushDelivery(id:Long)(cb: =>Unit)
-
- /**
- * Loads a delivery with the associated id from persistent storage.
- */
- def loadDelivery(id:Long)(cb:(Option[Delivery])=>Unit )
-
- /**
- * Creates a StoreTransaction which is used to perform persistent
- * operations as unit of work.
- */
- def createStoreTransaction():StoreTransaction
-
-}
-
class Counter(private var value:Int = 0) {
def get() = value
@@ -126,9 +61,10 @@ class Counter(private var value:Int = 0)
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class MemoryBrokerDatabase(host:VirtualHost) extends BaseService with BrokerDatabase {
+class MemoryBrokerDatabase() extends BaseService with BrokerDatabase {
val dispatchQueue = createQueue("MessagesTable")
+ def getDispatchQueue = dispatchQueue
/////////////////////////////////////////////////////////////////////
//
@@ -136,11 +72,11 @@ class MemoryBrokerDatabase(host:VirtualH
//
/////////////////////////////////////////////////////////////////////
- protected def _stop(onCompleted: Runnable) = {
+ def _stop(onCompleted: Runnable) = {
onCompleted.run
}
- protected def _start(onCompleted: Runnable) = {
+ def _start(onCompleted: Runnable) = {
onCompleted.run
}
@@ -152,7 +88,7 @@ class MemoryBrokerDatabase(host:VirtualH
private val queue_id_generator = new AtomicLong
val queues = new TreeMap[Long, QueueData]
- case class QueueData(val record:QueueRecord) {
+ case class QueueData(val record:StoredQueue) {
var messges = new TreeMap[Long, Long]()
}
@@ -160,22 +96,26 @@ class MemoryBrokerDatabase(host:VirtualH
JavaConversions.asSet(queues.keySet).toSeq
} >>: dispatchQueue
- def getQueueInfo(id:Long)(cb:(Option[QueueInfo])=>Unit ) = reply(cb) {
+ def getQueueInfo(id:Long)(cb:(Option[StoredQueue])=>Unit ) = reply(cb) {
val qd = queues.get(id)
if( qd == null ) {
None
} else {
- Some(
- if( qd.messges.isEmpty ) {
- QueueInfo(qd.record, -1, -1, 0)
- } else {
- QueueInfo(qd.record, qd.messges.firstKey, qd.messges.lastKey, qd.messges.size)
- }
- )
+ val rc = qd.record
+ if( qd.messges.isEmpty ) {
+ rc.count = 0
+ rc.first = -1
+ rc.last = -1
+ } else {
+ rc.count = qd.messges.size
+ rc.first = qd.messges.firstKey
+ rc.last = qd.messges.lastKey
+ }
+ Some(rc)
}
} >>: dispatchQueue
- def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit = reply(cb) {
+ def addQueue(record:StoredQueue)(cb:(Option[Long])=>Unit):Unit = reply(cb) {
val id = queue_id_generator.incrementAndGet
if( queues.containsKey(id) ) {
None
@@ -190,7 +130,7 @@ class MemoryBrokerDatabase(host:VirtualH
// Methods related to message storage
//
/////////////////////////////////////////////////////////////////////
- class MessageData(val delivery:Delivery) {
+ class MessageData(val delivery:StoredMessage) {
val queueRefs = new Counter()
var onFlush = List[()=>Unit]()
}
@@ -207,7 +147,7 @@ class MemoryBrokerDatabase(host:VirtualH
}
} >>: dispatchQueue
- def loadDelivery(ref:Long)(cb:(Option[Delivery])=>Unit ) = reply(cb) {
+ def loadDelivery(ref:Long)(cb:(Option[StoredMessage])=>Unit ) = reply(cb) {
val rc = messages.get(ref)
if( rc == null ) {
None
@@ -235,13 +175,13 @@ class MemoryBrokerDatabase(host:VirtualH
val updated = HashMap[Long, MessageData]()
- def store(delivery:Delivery) = {
- if( delivery.storeId == -1 ) {
- delivery.storeId = msg_id_generator.incrementAndGet
+ def store(sm:StoredMessage) = {
+ if( sm.id == -1 ) {
+ sm.id = msg_id_generator.incrementAndGet
using(this) {
- val md = new MessageData(delivery)
- updated.put(delivery.storeId, md)
- messages.put(delivery.storeId, md)
+ val md = new MessageData(sm)
+ updated.put(sm.id, md)
+ messages.put(sm.id, md)
} >>: dispatchQueue
}
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/QueueDescriptor.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/QueueDescriptor.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/QueueDescriptor.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/QueueDescriptor.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
(empty)
Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/Store.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/Store.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/Store.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
(empty)
Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/StoreFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/StoreFactory.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
(empty)
Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/memory/MemoryStore.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/memory/MemoryStore.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/memory/MemoryStore.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
(empty)
Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
(empty)
Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
(empty)
Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java (from r961118, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java&r1=961118&r2=961119&rev=961119&view=diff
==============================================================================
(empty)