You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/22 18:37:52 UTC
svn commit: r1052005 [2/2] - in /activemq/activemq-apollo/trunk:
apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/
apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/
apollo-broker/src/main/resources/META-INF/ser...
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/SubscriptionRecord.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/SubscriptionRecord.java?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/SubscriptionRecord.java (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/SubscriptionRecord.java Wed Dec 22 17:37:50 2010
@@ -27,7 +27,7 @@ public class SubscriptionRecord {
public AsciiBuffer name;
public AsciiBuffer selector;
public AsciiBuffer destination;
- public boolean isDurable;
+ public boolean durable;
public long expiration = -1;
public Buffer attachment;
Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala (from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala&r1=1052004&r2=1052005&rev=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala Wed Dec 22 17:37:50 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.transport.vm
+package org.apache.activemq.apollobroker.transport
import _root_.java.io.IOException
import _root_.java.net.URI
Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/package.html (from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/package.html?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/package.html&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html&r1=1052004&r2=1052005&rev=1052005&view=diff
==============================================================================
(empty)
Copied: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/ConfigStoreTest.scala (from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/web/FileConfigStoreTest.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/ConfigStoreTest.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/ConfigStoreTest.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/web/FileConfigStoreTest.scala&r1=1052004&r2=1052005&rev=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/web/FileConfigStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/ConfigStoreTest.scala Wed Dec 22 17:37:50 2010
@@ -1,4 +1,4 @@
-package org.apache.activemq.apollo.web
+package org.apache.activemq.apollo.broker
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -18,7 +18,6 @@ package org.apache.activemq.apollo.web
*/
import java.io.File
import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.broker.FileConfigStore
import org.fusesource.hawtdispatch._
/**
@@ -27,7 +26,7 @@ import org.fusesource.hawtdispatch._
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class FileConfigStoreTest extends FunSuiteSupport {
+class ConfigStoreTest extends FunSuiteSupport {
test("file config store") {
val store = new FileConfigStore
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala Wed Dec 22 17:37:50 2010
@@ -26,6 +26,9 @@ import java.io.IOException
import org.apache.activemq.apollo.transport.TransportFactory
abstract class RemoteConnection extends Connection {
+
+ import Connection._
+
var uri: String = null
var name: String = null
@@ -50,7 +53,7 @@ abstract class RemoteConnection extends
super._start(^ {})
}
- override def onTransportConnected() = {
+ override def on_transport_connected() = {
onConnected()
transport.resumeRead
callbackWhenConnected.run
@@ -59,15 +62,15 @@ abstract class RemoteConnection extends
protected def onConnected()
- override def onTransportFailure(error: IOException) = {
+ override def on_transport_failure(error: IOException) = {
if (!stopped) {
if (stopping.get()) {
transport.stop
} else {
- onFailure(error)
+ on_failure(error)
if (callbackWhenConnected != null) {
warn("connect attempt failed. will retry connection..")
- dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^ {
+ dispatch_queue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^ {
if (stopping.get()) {
callbackWhenConnected.run
} else {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala Wed Dec 22 17:37:50 2010
@@ -34,7 +34,7 @@ abstract class StoreBenchmarkSupport ext
var store:Store = null
- def createStore(flushDelay:Long):Store
+ def create_store(flushDelay:Long):Store
/**
* Handy helper to call an async method on the store and wait for
@@ -57,7 +57,7 @@ abstract class StoreBenchmarkSupport ext
override protected def beforeAll() = {
- store = createStore(5*1000)
+ store = create_store(5*1000)
val tracker = new LoggingTracker("store startup")
tracker.start(store)
tracker.await
@@ -84,12 +84,12 @@ abstract class StoreBenchmarkSupport ext
val queue_key_counter = new LongCounter
- def addQueue(name:String):Long = {
+ def add_queue(name:String):Long = {
var queueA = new QueueRecord
queueA.key = queue_key_counter.incrementAndGet
queueA.binding_kind = ascii("test")
queueA.binding_data = ascii(name)
- val rc:Boolean = CB( cb=> store.addQueue(queueA)(cb) )
+ val rc:Boolean = CB( cb=> store.add_queue(queueA)(cb) )
expect(true)(rc)
queueA.key
}
@@ -103,11 +103,11 @@ abstract class StoreBenchmarkSupport ext
}
- def entry(queueKey:Long, queueSeq:Long, messageKey:Long=0) = {
+ def entry(queue_key:Long, entry_seq:Long, message_key:Long=0) = {
var queueEntry = new QueueEntryRecord
- queueEntry.queueKey = queueKey
- queueEntry.queueSeq = queueSeq
- queueEntry.messageKey = messageKey
+ queueEntry.queue_key = queue_key
+ queueEntry.entry_seq = entry_seq
+ queueEntry.message_key = message_key
queueEntry
}
@@ -126,51 +126,51 @@ abstract class StoreBenchmarkSupport ext
}
}
- def populate(queueKey:Long, messages:List[String], firstSeq:Long=1) = {
- var batch = store.createStoreUOW
+ def populate(queue_key:Long, messages:List[String], firstSeq:Long=1) = {
+ var batch = store.create_uow
var msgKeys = ListBuffer[Long]()
var nextSeq = firstSeq
messages.foreach { message=>
val msgKey = addMessage(batch, message)
msgKeys += msgKey
- batch.enqueue(entry(queueKey, nextSeq, msgKey))
+ batch.enqueue(entry(queue_key, nextSeq, msgKey))
nextSeq += 1
}
val tracker = new TaskTracker()
tracker.release(batch)
- msgKeys.foreach { msgKey =>
- store.flushMessage(msgKey) {}
+ msgKeys.foreach { msg_key =>
+ store.flush_message(msg_key) {}
}
tracker.await
msgKeys
}
test("store enqueue and load latencey") {
- val A = addQueue("A")
- var messageKeys = storeMessages(A)
- loadMessages(A, messageKeys)
+ val A = add_queue("A")
+ var message_keys = storeMessages(A)
+ loadMessages(A, message_keys)
}
def storeMessages(queue:Long) = {
var seq = 0L
- var messageKeys = ListBuffer[Long]()
+ var message_keys = ListBuffer[Long]()
val content = payload("message\n", 1024)
var metric = benchmarkCount(100000) {
seq += 1
- var batch = store.createStoreUOW
+ var batch = store.create_uow
val message = addMessage(batch, content)
- messageKeys += message
+ message_keys += message
batch.enqueue(entry(queue, seq, message))
val latch = new CountDownLatch(1)
batch.setDisposer(^{latch.countDown} )
batch.release
- store.flushMessage(message) {}
+ store.flush_message(message) {}
latch.await
@@ -178,15 +178,15 @@ abstract class StoreBenchmarkSupport ext
println("enqueue metrics: "+metric)
println("enqueue latency is: "+metric.latency(TimeUnit.MILLISECONDS)+" ms")
println("enqueue rate is: "+metric.rate(TimeUnit.SECONDS)+" enqueues/s")
- messageKeys.toList
+ message_keys.toList
}
- def loadMessages(queue:Long, messageKeys: List[Long]) = {
+ def loadMessages(queue:Long, message_keys: List[Long]) = {
- var keys = messageKeys.toList
+ var keys = message_keys.toList
val metric = benchmarkCount(keys.size) {
val latch = new CountDownLatch(1)
- store.loadMessage(keys.head) { msg=>
+ store.load_message(keys.head) { msg=>
assert(msg.isDefined, "message key not found: "+keys.head)
latch.countDown
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Wed Dec 22 17:37:50 2010
@@ -33,7 +33,7 @@ abstract class StoreFunSuiteSupport exte
var store:Store = null
- def createStore(flushDelay:Long):Store
+ def create_store(flushDelay:Long):Store
/**
* Handy helper to call an async method on the store and wait for
@@ -56,7 +56,7 @@ abstract class StoreFunSuiteSupport exte
override protected def beforeAll() = {
- store = createStore(5*1000)
+ store = create_store(5*1000)
val tracker = new LoggingTracker("store startup")
tracker.start(store)
tracker.await
@@ -83,17 +83,17 @@ abstract class StoreFunSuiteSupport exte
val queue_key_counter = new LongCounter
- def addQueue(name:String):Long = {
- var queueA = new QueueRecord
- queueA.key = queue_key_counter.incrementAndGet
- queueA.binding_kind = ascii("test")
- queueA.binding_data = ascii(name)
- val rc:Boolean = CB( cb=> store.addQueue(queueA)(cb) )
+ def add_queue(name:String):Long = {
+ var queue_a = new QueueRecord
+ queue_a.key = queue_key_counter.incrementAndGet
+ queue_a.binding_kind = ascii("test")
+ queue_a.binding_data = ascii(name)
+ val rc:Boolean = CB( cb=> store.add_queue(queue_a)(cb) )
expect(true)(rc)
- queueA.key
+ queue_a.key
}
- def addMessage(batch:StoreUOW, content:String):Long = {
+ def add_message(batch:StoreUOW, content:String):Long = {
var message = new MessageRecord
message.protocol = ascii("test-protocol")
message.buffer = ascii(content).buffer
@@ -102,81 +102,81 @@ abstract class StoreFunSuiteSupport exte
}
- def entry(queueKey:Long, queueSeq:Long, messageKey:Long=0) = {
+ def entry(queue_key:Long, entry_seq:Long, message_key:Long=0) = {
var queueEntry = new QueueEntryRecord
- queueEntry.queueKey = queueKey
- queueEntry.queueSeq = queueSeq
- queueEntry.messageKey = messageKey
+ queueEntry.queue_key = queue_key
+ queueEntry.entry_seq = entry_seq
+ queueEntry.message_key = message_key
queueEntry
}
- def populate(queueKey:Long, messages:List[String], firstSeq:Long=1) = {
- var batch = store.createStoreUOW
- var msgKeys = ListBuffer[Long]()
- var nextSeq = firstSeq
+ def populate(queue_key:Long, messages:List[String], first_seq:Long=1) = {
+ var batch = store.create_uow
+ var msg_keys = ListBuffer[Long]()
+ var next_seq = first_seq
messages.foreach { message=>
- val msgKey = addMessage(batch, message)
- msgKeys += msgKey
- batch.enqueue(entry(queueKey, nextSeq, msgKey))
- nextSeq += 1
+ val msgKey = add_message(batch, message)
+ msg_keys += msgKey
+ batch.enqueue(entry(queue_key, next_seq, msgKey))
+ next_seq += 1
}
val tracker = new TaskTracker()
tracker.release(batch)
- msgKeys.foreach { msgKey =>
- store.flushMessage(msgKey) {}
+ msg_keys.foreach { msgKey =>
+ store.flush_message(msgKey) {}
}
tracker.await
- msgKeys
+ msg_keys
}
test("load stored message") {
- val A = addQueue("A")
- val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+ val A = add_queue("A")
+ val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
- val rc:Option[MessageRecord] = CB( cb=> store.loadMessage(msgKeys.head)(cb) )
+ val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head)(cb) )
expect(ascii("message 1").buffer) {
rc.get.buffer
}
}
test("add and list queues") {
- val A = addQueue("A")
- val B = addQueue("B")
- val C = addQueue("C")
+ val A = add_queue("A")
+ val B = add_queue("B")
+ val C = add_queue("C")
expectCB(List(A,B,C).toSeq) { cb=>
- store.listQueues(cb)
+ store.list_queues(cb)
}
}
test("get queue status") {
- val A = addQueue("my queue name")
+ val A = add_queue("my queue name")
populate(A, "message 1"::"message 2"::"message 3"::Nil)
- val rc:Option[QueueRecord] = CB( cb=> store.getQueue(A)(cb) )
+ val rc:Option[QueueRecord] = CB( cb=> store.get_queue(A)(cb) )
expect(ascii("my queue name")) {
rc.get.binding_data.ascii
}
}
test("list queue entries") {
- val A = addQueue("A")
- val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+ val A = add_queue("A")
+ val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
- val rc:Seq[QueueEntryRecord] = CB( cb=> store.listQueueEntries(A,msgKeys.head, msgKeys.last)(cb) )
- expect(msgKeys.toSeq) {
- rc.map( _.messageKey )
+ val rc:Seq[QueueEntryRecord] = CB( cb=> store.list_queue_entries(A,msg_keys.head, msg_keys.last)(cb) )
+ expect(msg_keys.toSeq) {
+ rc.map( _.message_key )
}
}
test("batch completes after a delay") {x}
def x = {
- val A = addQueue("A")
- var batch = store.createStoreUOW
+ val A = add_queue("A")
+ var batch = store.create_uow
- val m1 = addMessage(batch, "message 1")
+ val m1 = add_message(batch, "message 1")
batch.enqueue(entry(A, 1, m1))
val tracker = new TaskTracker()
@@ -190,16 +190,16 @@ abstract class StoreFunSuiteSupport exte
}
test("flush cancels the delay") {
- val A = addQueue("A")
- var batch = store.createStoreUOW
+ val A = add_queue("A")
+ var batch = store.create_uow
- val m1 = addMessage(batch, "message 1")
+ val m1 = add_message(batch, "message 1")
batch.enqueue(entry(A, 1, m1))
val tracker = new TaskTracker()
tracker.release(batch)
- store.flushMessage(m1) {}
+ store.flush_message(m1) {}
expect(true) {
tracker.await(1, TimeUnit.SECONDS)
Copied: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/transport/VMTransportTest.java (from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/transport/VMTransportTest.java?p2=activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/transport/VMTransportTest.java&p1=activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java&r1=1052004&r2=1052005&rev=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/transport/VMTransportTest.java Wed Dec 22 17:37:50 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.transport.vm;
+package org.apache.activemq.apollo.broker.transport;
import java.io.IOException;
Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala Wed Dec 22 17:37:50 2010
@@ -78,7 +78,7 @@ class CassandraClient() {
import PBQueueEntryRecord._
val pb = PBQueueEntryRecord.FACTORY.parseUnframed(v)
val rc = new QueueEntryRecord
- rc.messageKey = pb.getMessageKey
+ rc.message_key = pb.getMessageKey
rc.attachment = pb.getAttachment
rc.size = pb.getSize
rc.redeliveries = pb.getRedeliveries.toShort
@@ -87,7 +87,7 @@ class CassandraClient() {
implicit def encodeQueueEntryRecord(v: QueueEntryRecord): Array[Byte] = {
val pb = new PBQueueEntryRecord.Bean
- pb.setMessageKey(v.messageKey)
+ pb.setMessageKey(v.message_key)
pb.setAttachment(v.attachment)
pb.setSize(v.size)
pb.setRedeliveries(v.redeliveries)
@@ -131,11 +131,11 @@ class CassandraClient() {
}
}
- def removeQueue(queueKey: Long):Boolean = {
+ def removeQueue(queue_key: Long):Boolean = {
withSession {
session =>
- session.remove(schema.entries \ queueKey)
- session.remove(schema.queue_name \ queueKey)
+ session.remove(schema.entries \ queue_key)
+ session.remove(schema.queue_name \ queue_key)
}
true
}
@@ -180,14 +180,14 @@ class CassandraClient() {
}
action.enqueues.foreach {
queueEntry =>
- val qid = queueEntry.queueKey
- val seq = queueEntry.queueSeq
+ val qid = queueEntry.queue_key
+ val seq = queueEntry.entry_seq
operations ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
}
action.dequeues.foreach {
queueEntry =>
- val qid = queueEntry.queueKey
- val seq = queueEntry.queueSeq
+ val qid = queueEntry.queue_key
+ val seq = queueEntry.entry_seq
operations ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
}
}
@@ -210,22 +210,22 @@ class CassandraClient() {
}
}
- def listQueueEntryGroups(queueKey: Long, limit: Int): Seq[QueueEntryRange] = {
+ def listQueueEntryGroups(queue_key: Long, limit: Int): Seq[QueueEntryRange] = {
withSession {
session =>
var rc = ListBuffer[QueueEntryRange]()
var group:QueueEntryRange = null
// TODO: this is going to bring back lots of entries.. not good.
- session.list(schema.entries \ queueKey).foreach { x=>
+ session.list(schema.entries \ queue_key).foreach { x=>
val record:QueueEntryRecord = x.value
if( group == null ) {
group = new QueueEntryRange
- group.firstQueueSeq = record.queueSeq
+ group.first_entry_seq = record.entry_seq
}
- group.lastQueueSeq = record.queueSeq
+ group.last_entry_seq = record.entry_seq
group.count += 1
group.size += record.size
if( group.count == limit) {
@@ -241,13 +241,13 @@ class CassandraClient() {
}
}
- def getQueueEntries(queueKey: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
+ def getQueueEntries(queue_key: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
withSession {
session =>
- session.list(schema.entries \ queueKey, RangePredicate(firstSeq, lastSeq)).map { x=>
+ session.list(schema.entries \ queue_key, RangePredicate(firstSeq, lastSeq)).map { x=>
val rc:QueueEntryRecord = x.value
- rc.queueKey = queueKey
- rc.queueSeq = x.name
+ rc.queue_key = queue_key
+ rc.entry_seq = x.name
rc
}
}
Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala Wed Dec 22 17:37:50 2010
@@ -79,7 +79,7 @@ class CassandraStore extends DelayingSto
protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
blocking {
client.store(uows)
- dispatchQueue {
+ dispatch_queue {
callback
}
}
@@ -88,16 +88,16 @@ class CassandraStore extends DelayingSto
def configure(config: StoreDTO, reporter: Reporter):Unit = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
- def storeStatusDTO(callback:(StoreStatusDTO)=>Unit) = dispatchQueue {
+ def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
val rc = new SimpleStoreStatusDTO
- rc.state = serviceState.toString
- rc.state_since = serviceState.since
+ rc.state = service_state.toString
+ rc.state_since = service_state.since
callback(rc)
}
def configure(config: CassandraStoreDTO, reporter: Reporter):Unit = {
if ( CassandraStore.validate(config, reporter) < ERROR ) {
- if( serviceState.isStarted ) {
+ if( service_state.is_started ) {
// TODO: apply changes while he broker is running.
reporter.report(WARN, "Updating cassandra store configuration at runtime is not yet supported. You must restart the broker for the change to take effect.")
} else {
@@ -106,7 +106,7 @@ class CassandraStore extends DelayingSto
}
}
- protected def _start(onCompleted: Runnable) = {
+ protected def _start(on_completed: Runnable) = {
info("Starting cassandra store at: '%s'", config.hosts.toList.mkString(", "))
blocking = Executors.newFixedThreadPool(20, new ThreadFactory(){
def newThread(r: Runnable) = {
@@ -130,10 +130,10 @@ class CassandraStore extends DelayingSto
client.start
schedualDisplayStats
- onCompleted.run
+ on_completed.run
}
- protected def _stop(onCompleted: Runnable) = {
+ protected def _stop(on_completed: Runnable) = {
info("Stopping cassandra store at: '%s'", config.hosts.toList.mkString(", "))
blocking.shutdown
new Thread("casandra client shutdown") {
@@ -142,7 +142,7 @@ class CassandraStore extends DelayingSto
warn("cassandra thread pool is taking a long time to shutdown.")
}
client.stop
- onCompleted.run
+ on_completed.run
}
}.start
}
@@ -156,13 +156,13 @@ class CassandraStore extends DelayingSto
val storeLatency = new TimeCounter
def schedualDisplayStats:Unit = {
def displayStats = {
- if( serviceState.isStarted ) {
+ if( service_state.is_started ) {
val cl = storeLatency.apply(true)
info("metrics: store latency: %,.3f ms", cl.avgTime(TimeUnit.MILLISECONDS))
schedualDisplayStats
}
}
- dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^{ displayStats })
+ dispatch_queue.dispatchAfter(5, TimeUnit.SECONDS, ^{ displayStats })
}
/**
@@ -179,51 +179,51 @@ class CassandraStore extends DelayingSto
/**
* Ges the next queue key identifier.
*/
- def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
+ def get_last_queue_key(callback:(Option[Long])=>Unit):Unit = {
// TODO:
callback( Some(1L) )
}
- def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
+ def add_queue(record: QueueRecord)(callback: (Boolean) => Unit) = {
blocking {
client.addQueue(record)
callback(true)
}
}
- def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
+ def remove_queue(queueKey: Long)(callback: (Boolean) => Unit) = {
blocking {
callback(client.removeQueue(queueKey))
}
}
- def getQueue(id: Long)(callback: (Option[QueueRecord]) => Unit) = {
+ def get_queue(id: Long)(callback: (Option[QueueRecord]) => Unit) = {
blocking {
callback( client.getQueue(id) )
}
}
- def listQueues(callback: (Seq[Long]) => Unit) = {
+ def list_queues(callback: (Seq[Long]) => Unit) = {
blocking {
callback( client.listQueues )
}
}
- def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
+ def load_message(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
blocking {
callback( client.loadMessage(id) )
}
}
- def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
+ def list_queue_entry_ranges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
blocking {
callback( client.listQueueEntryGroups(queueKey, limit) )
}
}
- def listQueueEntries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+ def list_queue_entries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
blocking {
callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
}
Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/test/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/test/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStoreTest.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/test/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/test/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStoreTest.scala Wed Dec 22 17:37:50 2010
@@ -23,7 +23,7 @@ import org.apache.activemq.apollo.broker
*/
class CassandraStoreTest extends StoreFunSuiteSupport with CassandraServerMixin {
- def createStore(flushDelay:Long):Store = {
+ def create_store(flushDelay:Long):Store = {
val rc = new CassandraStore
rc.config.flush_delay = flushDelay
rc
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala Wed Dec 22 17:37:50 2010
@@ -60,7 +60,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
override def log: Log = HawtDBClient
- def dispatchQueue = hawtDBStore.dispatchQueue
+ def dispatchQueue = hawtDBStore.dispatch_queue
private val indexFileFactory = new TxPageFileFactory()
@@ -247,8 +247,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
action.dequeues.foreach {
queueEntry =>
- val queueKey = queueEntry.queueKey
- val queueSeq = queueEntry.queueSeq
+ val queueKey = queueEntry.queue_key
+ val queueSeq = queueEntry.entry_seq
batch += new RemoveQueueEntry.Bean().setQueueKey(queueKey).setQueueSeq(queueSeq)
}
}
@@ -308,9 +308,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
entryIndex.iterator.foreach { entry =>
if( group == null ) {
group = new QueueEntryRange
- group.firstQueueSeq = entry.getKey.longValue
+ group.first_entry_seq = entry.getKey.longValue
}
- group.lastQueueSeq = entry.getKey.longValue
+ group.last_entry_seq = entry.getKey.longValue
group.count += 1
group.size += entry.getValue.getSize
if( group.count == limit) {
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala Wed Dec 22 17:37:50 2010
@@ -71,7 +71,7 @@ class HawtDBStore extends DelayingStoreS
var config:HawtDBStoreDTO = defaultConfig
val client = new HawtDBClient(this)
- val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatchQueue)
+ val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatch_queue)
load_source.setEventHandler(^{drain_loads});
override def toString = "hawtdb store"
@@ -83,7 +83,7 @@ class HawtDBStore extends DelayingStoreS
protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
executor_pool {
client.store(uows, ^{
- dispatchQueue {
+ dispatch_queue {
callback
}
})
@@ -94,7 +94,7 @@ class HawtDBStore extends DelayingStoreS
def configure(config: HawtDBStoreDTO, reporter: Reporter) = {
if ( HawtDBStore.validate(config, reporter) < ERROR ) {
- if( serviceState.isStarted ) {
+ if( service_state.is_started ) {
// TODO: apply changes while he broker is running.
reporter.report(WARN, "Updating hawtdb store configuration at runtime is not yet supported. You must restart the broker for the change to take effect.")
} else {
@@ -103,7 +103,7 @@ class HawtDBStore extends DelayingStoreS
}
}
- protected def _start(onCompleted: Runnable) = {
+ protected def _start(on_completed: Runnable) = {
info("Starting hawtdb store at: '%s'", config.directory)
executor_pool = Executors.newFixedThreadPool(1, new ThreadFactory(){
def newThread(r: Runnable) = {
@@ -122,7 +122,7 @@ class HawtDBStore extends DelayingStoreS
scheduleCleanup(v)
scheduleFlush(v)
load_source.resume
- onCompleted.run
+ on_completed.run
})
}
}
@@ -136,7 +136,7 @@ class HawtDBStore extends DelayingStoreS
}
}
}
- dispatchQueue.dispatchAfter(client.index_flush_interval, TimeUnit.MILLISECONDS, ^ {try_flush})
+ dispatch_queue.dispatchAfter(client.index_flush_interval, TimeUnit.MILLISECONDS, ^ {try_flush})
}
def scheduleCleanup(version:Int): Unit = {
@@ -148,10 +148,10 @@ class HawtDBStore extends DelayingStoreS
}
}
}
- dispatchQueue.dispatchAfter(client.cleanup_interval, TimeUnit.MILLISECONDS, ^ {try_cleanup})
+ dispatch_queue.dispatchAfter(client.cleanup_interval, TimeUnit.MILLISECONDS, ^ {try_cleanup})
}
- protected def _stop(onCompleted: Runnable) = {
+ protected def _stop(on_completed: Runnable) = {
info("Stopping hawtdb store at: '%s'", config.directory)
schedule_version.incrementAndGet
new Thread() {
@@ -161,7 +161,7 @@ class HawtDBStore extends DelayingStoreS
executor_pool.awaitTermination(86400, TimeUnit.SECONDS)
executor_pool = null
client.stop
- onCompleted.run
+ on_completed.run
}
}.start
}
@@ -189,37 +189,37 @@ class HawtDBStore extends DelayingStoreS
/**
* Ges the last queue key identifier stored.
*/
- def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
+ def get_last_queue_key(callback:(Option[Long])=>Unit):Unit = {
executor_pool {
callback(Some(client.rootBuffer.getLastQueueKey.longValue))
}
}
- def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
+ def add_queue(record: QueueRecord)(callback: (Boolean) => Unit) = {
executor_pool {
client.addQueue(record, ^{ callback(true) })
}
}
- def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
+ def remove_queue(queueKey: Long)(callback: (Boolean) => Unit) = {
executor_pool {
client.removeQueue(queueKey,^{ callback(true) })
}
}
- def getQueue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
+ def get_queue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
executor_pool {
callback( client.getQueue(queueKey) )
}
}
- def listQueues(callback: (Seq[Long]) => Unit) = {
+ def list_queues(callback: (Seq[Long]) => Unit) = {
executor_pool {
callback( client.listQueues )
}
}
- def loadMessage(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
+ def load_message(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
message_load_latency_counter.start { end=>
load_source.merge((messageKey, { (result)=>
end()
@@ -236,13 +236,13 @@ class HawtDBStore extends DelayingStoreS
}
}
- def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
+ def list_queue_entry_ranges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
executor_pool ^{
callback( client.listQueueEntryGroups(queueKey, limit) )
}
}
- def listQueueEntries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+ def list_queue_entries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
executor_pool ^{
callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
}
@@ -250,7 +250,7 @@ class HawtDBStore extends DelayingStoreS
def poll_stats:Unit = {
def displayStats = {
- if( serviceState.isStarted ) {
+ if( service_state.is_started ) {
flush_latency = flush_latency_counter(true)
message_load_latency = message_load_latency_counter(true)
@@ -263,14 +263,14 @@ class HawtDBStore extends DelayingStoreS
}
}
- dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
+ dispatch_queue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
}
- def storeStatusDTO(callback:(StoreStatusDTO)=>Unit) = dispatchQueue {
+ def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
val rc = new HawtDBStoreStatusDTO
- rc.state = serviceState.toString
- rc.state_since = serviceState.since
+ rc.state = service_state.toString
+ rc.state_since = service_state.since
rc.flush_latency = flush_latency
rc.message_load_latency = message_load_latency
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala Wed Dec 22 17:37:50 2010
@@ -56,9 +56,9 @@ object Helpers {
implicit def toQueueEntryRecord(pb: AddQueueEntry.Getter): QueueEntryRecord = {
val rc = new QueueEntryRecord
- rc.queueKey = pb.getQueueKey
- rc.queueSeq = pb.getQueueSeq
- rc.messageKey = pb.getMessageKey
+ rc.queue_key = pb.getQueueKey
+ rc.entry_seq = pb.getQueueSeq
+ rc.message_key = pb.getMessageKey
rc.attachment = pb.getAttachment
rc.size = pb.getSize
rc.redeliveries = pb.getRedeliveries.toShort
@@ -67,9 +67,9 @@ object Helpers {
implicit def fromQueueEntryRecord(v: QueueEntryRecord): AddQueueEntry.Bean = {
val pb = new AddQueueEntry.Bean
- pb.setQueueKey(v.queueKey)
- pb.setQueueSeq(v.queueSeq)
- pb.setMessageKey(v.messageKey)
+ pb.setQueueKey(v.queue_key)
+ pb.setQueueSeq(v.entry_seq)
+ pb.setMessageKey(v.message_key)
pb.setAttachment(v.attachment)
pb.setSize(v.size)
pb.setRedeliveries(v.redeliveries)
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreBenchmark.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreBenchmark.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreBenchmark.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreBenchmark.scala Wed Dec 22 17:37:50 2010
@@ -25,7 +25,7 @@ import org.apache.activemq.apollo.broker
*/
class HawtDBStoreBenchmark extends StoreBenchmarkSupport {
- def createStore(flushDelay:Long):Store = {
+ def create_store(flushDelay:Long):Store = {
val rc = new HawtDBStore
rc.config.flush_delay = flushDelay
rc
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreTest.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreTest.scala Wed Dec 22 17:37:50 2010
@@ -23,7 +23,7 @@ import org.apache.activemq.apollo.broker
*/
class HawtDBStoreTest extends StoreFunSuiteSupport {
- def createStore(flushDelay:Long):Store = {
+ def create_store(flushDelay:Long):Store = {
val rc = new HawtDBStore
rc.config.flush_delay = flushDelay
rc
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Dec 22 17:37:50 2010
@@ -102,7 +102,7 @@ class StompProtocolHandler extends Proto
override protected def log = StompProtocolHandler
- protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
+ protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
trait AckHandler {
def track(delivery:Delivery):Unit
@@ -208,7 +208,7 @@ class StompProtocolHandler extends Proto
override val exclusive:Boolean
) extends BaseRetained with DeliveryConsumer {
- val dispatchQueue = StompProtocolHandler.this.dispatchQueue
+ val dispatch_queue = StompProtocolHandler.this.dispatchQueue
override def connection = Some(StompProtocolHandler.this.connection)
@@ -237,10 +237,10 @@ class StompProtocolHandler extends Proto
def consumer = StompConsumer.this
var closed = false
- val session = session_manager.open(producer.dispatchQueue)
+ val session = session_manager.open(producer.dispatch_queue)
def close = {
- assert(getCurrentQueue == producer.dispatchQueue)
+ assert(getCurrentQueue == producer.dispatch_queue)
if( !closed ) {
closed = true
if( browser ) {
@@ -310,7 +310,7 @@ class StompProtocolHandler extends Proto
var host:VirtualHost = null
- private def queue = connection.dispatchQueue
+ private def queue = connection.dispatch_queue
// uses by STOMP 1.0 clients
var connection_ack_handlers = HashMap[AsciiBuffer, AckHandler]()
@@ -323,8 +323,8 @@ class StompProtocolHandler extends Proto
var waiting_on:String = "client request"
var config:StompDTO = _
- override def setConnection(connection: BrokerConnection) = {
- super.setConnection(connection)
+ override def set_connection(connection: BrokerConnection) = {
+ super.set_connection(connection)
import collection.JavaConversions._
config = connection.connector.config.protocols.find( _.isInstanceOf[StompDTO]).map(_.asInstanceOf[StompDTO]).getOrElse(new StompDTO)
}
@@ -377,9 +377,9 @@ class StompProtocolHandler extends Proto
throw new Break()
}
- override def onTransportConnected() = {
+ override def on_transport_connected() = {
- session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){x=>
+ session_manager = new SinkMux[StompFrame]( MapSink(connection.transport_sink){x=>
trace("sending frame: %s", x)
x
}, dispatchQueue, StompFrame)
@@ -388,7 +388,7 @@ class StompProtocolHandler extends Proto
resumeRead
}
- override def onTransportDisconnected() = {
+ override def on_transport_disconnected() = {
if( !closed ) {
heart_beat_monitor.stop
closed=true;
@@ -416,7 +416,7 @@ class StompProtocolHandler extends Proto
}
- override def onTransportCommand(command:Any):Unit = {
+ override def on_transport_command(command:AnyRef):Unit = {
if( dead ) {
// We stop processing client commands once we are dead
return;
@@ -656,7 +656,7 @@ class StompProtocolHandler extends Proto
val producer = new DeliveryProducer() {
override def connection = Some(StompProtocolHandler.this.connection)
- override def dispatchQueue = queue
+ override def dispatch_queue = queue
}
// don't process frames until producer is connected...
@@ -952,11 +952,11 @@ class StompProtocolHandler extends Proto
}
- override def onTransportFailure(error: IOException) = {
+ override def on_transport_failure(error: IOException) = {
if( !connection.stopped ) {
suspendRead("shutdown")
debug(error, "Shutting connection down due to: %s", error)
- super.onTransportFailure(error);
+ super.on_transport_failure(error);
}
}
@@ -1002,22 +1002,22 @@ class StompProtocolHandler extends Proto
queue += proc
}
- def commit(onComplete: => Unit) = {
+ def commit(on_complete: => Unit) = {
val uow = if( host.store!=null ) {
- host.store.createStoreUOW
+ host.store.create_uow
} else {
null
}
queue.foreach{ _(uow) }
if( uow!=null ) {
- uow.onComplete(^{
- onComplete
+ uow.on_complete(^{
+ on_complete
})
uow.release
} else {
- onComplete
+ on_complete
}
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala Wed Dec 22 17:37:50 2010
@@ -35,7 +35,7 @@ class StompRemoteConsumer extends Remote
var outboundSink: OverflowSink[StompFrame] = null
def onConnected() = {
- outboundSink = new OverflowSink[StompFrame](MapSink(transportSink) {x => x})
+ outboundSink = new OverflowSink[StompFrame](MapSink(transport_sink) {x => x})
outboundSink.refiller = ^ {}
val stompDestination = if (destination.domain == Router.QUEUE_DOMAIN) {
@@ -59,7 +59,7 @@ class StompRemoteConsumer extends Remote
outboundSink.offer(frame);
}
- override def onTransportCommand(command: Object) = {
+ override def on_transport_command(command: Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
case StompFrame(CONNECTED, headers, _, _) =>
@@ -73,16 +73,16 @@ class StompRemoteConsumer extends Remote
}
case StompFrame(ERROR, headers, content, _) =>
- onFailure(new Exception("Server reported an error: " + frame.content));
+ on_failure(new Exception("Server reported an error: " + frame.content));
case _ =>
- onFailure(new Exception("Unexpected stomp command: " + frame.action));
+ on_failure(new Exception("Unexpected stomp command: " + frame.action));
}
}
protected def messageReceived() {
if (thinkTime > 0) {
transport.suspendRead
- dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+ dispatch_queue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
rate.increment();
if (!stopped) {
transport.resumeRead
@@ -136,9 +136,9 @@ class StompRemoteProducer extends Remote
// if we are not going to wait for an ack back from the server,
// then jut send the next one...
if (thinkTime > 0) {
- dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
+ dispatch_queue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
} else {
- dispatchQueue << task
+ dispatch_queue << task
}
}
}
@@ -146,7 +146,7 @@ class StompRemoteProducer extends Remote
}
override def onConnected() = {
- outboundSink = new OverflowSink[StompFrame](MapSink(transportSink) {x => x})
+ outboundSink = new OverflowSink[StompFrame](MapSink(transport_sink) {x => x})
outboundSink.refiller = ^ {drain}
if (destination.domain == Router.QUEUE_DOMAIN) {
@@ -158,7 +158,7 @@ class StompRemoteProducer extends Remote
send_next
}
- override def onTransportCommand(command: Object) = {
+ override def on_transport_command(command: Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
case StompFrame(RECEIPT, headers, _, _) =>
@@ -168,19 +168,24 @@ class StompRemoteProducer extends Remote
case StompFrame(CONNECTED, headers, _, _) =>
case StompFrame(ERROR, headers, content, _) =>
- onFailure(new Exception("Server reported an error: " + frame.content.utf8));
+ on_failure(new Exception("Server reported an error: " + frame.content.utf8));
case _ =>
- onFailure(new Exception("Unexpected stomp command: " + frame.action));
+ on_failure(new Exception("Unexpected stomp command: " + frame.action));
}
}
}
+object Watchog extends Log
+
trait Watchog extends RemoteConsumer {
+
+ import Watchog._
+
var messageCount = 0
def watchdog(lastMessageCount: Int): Unit = {
val seconds = 10
- dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
+ dispatch_queue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
if (messageCount == lastMessageCount) {
warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
stop
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala Wed Dec 22 17:37:50 2010
@@ -28,21 +28,21 @@ object BaseService extends Log
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait BaseService extends Service with Logging {
+trait BaseService extends Service {
- override protected def log:Log = BaseService
+ import BaseService._
sealed class State {
val since = System.currentTimeMillis
override def toString = getClass.getSimpleName
- def isCreated = false
- def isStarting = false
- def isStarted = false
- def isStopping = false
- def isStopped= false
- def isFailed= false
+ def is_created = false
+ def is_starting = false
+ def is_started = false
+ def is_stopping = false
+ def is_stopped= false
+ def is_failed= false
}
trait CallbackSupport {
@@ -51,35 +51,35 @@ trait BaseService extends Service with L
def done = { callbacks.foreach(_.run); callbacks=Nil }
}
- protected class CREATED extends State { override def isCreated = true }
- protected class STARTING extends State with CallbackSupport { override def isStarting = true }
- protected class FAILED extends State { override def isFailed = true }
- protected class STARTED extends State { override def isStarted = true }
- protected class STOPPING extends State with CallbackSupport { override def isStopping = true }
- protected class STOPPED extends State { override def isStopped = true }
+ protected class CREATED extends State { override def is_created = true }
+ protected class STARTING extends State with CallbackSupport { override def is_starting = true }
+ protected class FAILED extends State { override def is_failed = true }
+ protected class STARTED extends State { override def is_started = true }
+ protected class STOPPING extends State with CallbackSupport { override def is_stopping = true }
+ protected class STOPPED extends State { override def is_stopped = true }
- protected val dispatchQueue:DispatchQueue
+ protected val dispatch_queue:DispatchQueue
final def start() = start(null)
final def stop() = stop(null)
@volatile
- protected var _serviceState:State = new CREATED
+ protected var _service_state:State = new CREATED
- def serviceState = _serviceState
+ def service_state = _service_state
@volatile
protected var _serviceFailure:Exception = null
def serviceFailure = _serviceFailure
- final def start(onCompleted:Runnable) = ^{
+ final def start(on_completed:Runnable) = ^{
def do_start = {
val state = new STARTING()
- state << onCompleted
- _serviceState = state
+ state << on_completed
+ _service_state = state
try {
_start(^ {
- _serviceState = new STARTED
+ _service_state = new STARTED
state.done
})
}
@@ -87,45 +87,45 @@ trait BaseService extends Service with L
case e:Exception =>
error(e, "Start failed due to %s", e)
_serviceFailure = e
- _serviceState = new FAILED
+ _service_state = new FAILED
state.done
}
}
def done = {
- if( onCompleted!=null ) {
- onCompleted.run
+ if( on_completed!=null ) {
+ on_completed.run
}
}
- _serviceState match {
+ _service_state match {
case state:CREATED =>
do_start
case state:STOPPED =>
do_start
case state:STARTING =>
- state << onCompleted
+ state << on_completed
case state:STARTED =>
done
case state =>
done
error("Start should not be called from state: %s", state);
}
- } |>>: dispatchQueue
+ } |>>: dispatch_queue
- final def stop(onCompleted:Runnable) = {
+ final def stop(on_completed:Runnable) = {
def stop_task = {
def done = {
- if( onCompleted!=null ) {
- onCompleted.run
+ if( on_completed!=null ) {
+ on_completed.run
}
}
- _serviceState match {
+ _service_state match {
case state:STARTED =>
val state = new STOPPING
- state << onCompleted
- _serviceState = state
+ state << on_completed
+ _service_state = state
try {
_stop(^ {
- _serviceState = new STOPPED
+ _service_state = new STOPPED
state.done
})
}
@@ -133,11 +133,11 @@ trait BaseService extends Service with L
case e:Exception =>
error(e, "Stop failed due to: %s", e)
_serviceFailure = e
- _serviceState = new FAILED
+ _service_state = new FAILED
state.done
}
case state:STOPPING =>
- state << onCompleted
+ state << on_completed
case state:STOPPED =>
done
case state =>
@@ -145,10 +145,10 @@ trait BaseService extends Service with L
error("Stop should not be called from state: %s", state);
}
}
- ^{ stop_task } |>>: dispatchQueue
+ ^{ stop_task } |>>: dispatch_queue
}
- protected def _start(onCompleted:Runnable)
- protected def _stop(onCompleted:Runnable)
+ protected def _start(on_completed:Runnable)
+ protected def _stop(on_completed:Runnable)
}
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Reporter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Reporter.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Reporter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Reporter.scala Wed Dec 22 17:37:50 2010
@@ -59,7 +59,9 @@ trait Reporter {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait LoggingReporter extends Logging with Reporter {
+case class LoggingReporter(log:Log) extends Reporter {
+ import log._
+
override def report(level:ReporterLevel, message:String) = {
level match {
case INFO=>
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Wed Dec 22 17:37:50 2010
@@ -45,7 +45,7 @@ case class RuntimeResource(parent:Broker
result(NOT_FOUND)
} else {
Future[Option[T]] { cb=>
- broker.dispatchQueue {
+ broker.dispatch_queue {
func(broker, cb)
}
}.getOrElse(result(NOT_FOUND))
@@ -56,7 +56,7 @@ case class RuntimeResource(parent:Broker
with_broker { case (broker, cb) =>
broker.virtual_hosts.valuesIterator.find( _.id == id) match {
case Some(virtualHost)=>
- virtualHost.dispatchQueue {
+ virtualHost.dispatch_queue {
func(virtualHost, cb)
}
case None=> cb(None)
@@ -72,8 +72,8 @@ case class RuntimeResource(parent:Broker
result.id = broker.id
result.current_time = System.currentTimeMillis
- result.state = broker.serviceState.toString
- result.state_since = broker.serviceState.since
+ result.state = broker.service_state.toString
+ result.state_since = broker.service_state.since
result.config = broker.config
broker.virtual_hosts.values.foreach{ host=>
@@ -109,8 +109,8 @@ case class RuntimeResource(parent:Broker
with_virtual_host(id) { case (virtualHost,cb) =>
val result = new VirtualHostStatusDTO
result.id = virtualHost.id
- result.state = virtualHost.serviceState.toString
- result.state_since = virtualHost.serviceState.since
+ result.state = virtualHost.service_state.toString
+ result.state_since = virtualHost.service_state.since
result.config = virtualHost.config
virtualHost.router.routing_nodes.foreach { node=>
@@ -118,7 +118,7 @@ case class RuntimeResource(parent:Broker
}
if( virtualHost.store != null ) {
- virtualHost.store.storeStatusDTO { x=>
+ virtualHost.store.get_store_status { x=>
result.store = x
cb(Some(result))
}
@@ -206,7 +206,7 @@ case class RuntimeResource(parent:Broker
cb(None)
} else {
val q = qo.get
- q.dispatchQueue {
+ q.dispatch_queue {
val rc = new QueueStatusDTO
rc.id = q.id
rc.binding = q.binding.binding_dto
@@ -296,8 +296,8 @@ case class RuntimeResource(parent:Broker
val result = new ConnectorStatusDTO
result.id = connector.id
- result.state = connector.serviceState.toString
- result.state_since = connector.serviceState.since
+ result.state = connector.service_state.toString
+ result.state_since = connector.service_state.since
result.config = connector.config
result.accepted = connector.accept_counter.get
@@ -334,7 +334,7 @@ case class RuntimeResource(parent:Broker
broker.connectors.flatMap{ _.connections.get(id) }.headOption match {
case None => cb(None)
case Some(connection:BrokerConnection) =>
- connection.dispatchQueue {
+ connection.dispatch_queue {
cb(Some(connection.get_connection_status))
}
}