You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/22 18:37:52 UTC

svn commit: r1052005 [2/2] - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/resources/META-INF/ser...

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/SubscriptionRecord.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/SubscriptionRecord.java?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/SubscriptionRecord.java (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/SubscriptionRecord.java Wed Dec 22 17:37:50 2010
@@ -27,7 +27,7 @@ public class SubscriptionRecord {
     public AsciiBuffer name;
     public AsciiBuffer selector;
     public AsciiBuffer destination;
-    public boolean isDurable;
+    public boolean durable;
     public long expiration = -1;
     public Buffer attachment;
 

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala (from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala&r1=1052004&r2=1052005&rev=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/VMTransport.scala Wed Dec 22 17:37:50 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.transport.vm
+package org.apache.activemq.apollobroker.transport
 
 import _root_.java.io.IOException
 import _root_.java.net.URI

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/package.html (from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/package.html?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/package.html&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html&r1=1052004&r2=1052005&rev=1052005&view=diff
==============================================================================
    (empty)

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/ConfigStoreTest.scala (from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/web/FileConfigStoreTest.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/ConfigStoreTest.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/ConfigStoreTest.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/web/FileConfigStoreTest.scala&r1=1052004&r2=1052005&rev=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/web/FileConfigStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/ConfigStoreTest.scala Wed Dec 22 17:37:50 2010
@@ -1,4 +1,4 @@
-package org.apache.activemq.apollo.web
+package org.apache.activemq.apollo.broker
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -18,7 +18,6 @@ package org.apache.activemq.apollo.web
  */
 import java.io.File
 import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.broker.FileConfigStore
 import org.fusesource.hawtdispatch._
 
 /**
@@ -27,7 +26,7 @@ import org.fusesource.hawtdispatch._
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class FileConfigStoreTest extends FunSuiteSupport {
+class ConfigStoreTest extends FunSuiteSupport {
   test("file config store") {
 
     val store = new FileConfigStore

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala Wed Dec 22 17:37:50 2010
@@ -26,6 +26,9 @@ import java.io.IOException
 import org.apache.activemq.apollo.transport.TransportFactory
 
 abstract class RemoteConnection extends Connection {
+
+  import Connection._
+
   var uri: String = null
   var name: String = null
 
@@ -50,7 +53,7 @@ abstract class RemoteConnection extends 
     super._start(^ {})
   }
 
-  override def onTransportConnected() = {
+  override def on_transport_connected() = {
     onConnected()
     transport.resumeRead
     callbackWhenConnected.run
@@ -59,15 +62,15 @@ abstract class RemoteConnection extends 
 
   protected def onConnected()
 
-  override def onTransportFailure(error: IOException) = {
+  override def on_transport_failure(error: IOException) = {
     if (!stopped) {
       if (stopping.get()) {
         transport.stop
       } else {
-        onFailure(error)
+        on_failure(error)
         if (callbackWhenConnected != null) {
           warn("connect attempt failed. will retry connection..")
-          dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^ {
+          dispatch_queue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^ {
             if (stopping.get()) {
               callbackWhenConnected.run
             } else {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala Wed Dec 22 17:37:50 2010
@@ -34,7 +34,7 @@ abstract class StoreBenchmarkSupport ext
 
   var store:Store = null
 
-  def createStore(flushDelay:Long):Store
+  def create_store(flushDelay:Long):Store
 
   /**
    * Handy helper to call an async method on the store and wait for
@@ -57,7 +57,7 @@ abstract class StoreBenchmarkSupport ext
 
 
   override protected def beforeAll() = {
-    store = createStore(5*1000)
+    store = create_store(5*1000)
     val tracker = new LoggingTracker("store startup")
     tracker.start(store)
     tracker.await
@@ -84,12 +84,12 @@ abstract class StoreBenchmarkSupport ext
 
   val queue_key_counter = new LongCounter
 
-  def addQueue(name:String):Long = {
+  def add_queue(name:String):Long = {
     var queueA = new QueueRecord
     queueA.key = queue_key_counter.incrementAndGet
     queueA.binding_kind = ascii("test")
     queueA.binding_data = ascii(name)
-    val rc:Boolean = CB( cb=> store.addQueue(queueA)(cb) )
+    val rc:Boolean = CB( cb=> store.add_queue(queueA)(cb) )
     expect(true)(rc)
     queueA.key
   }
@@ -103,11 +103,11 @@ abstract class StoreBenchmarkSupport ext
   }
 
 
-  def entry(queueKey:Long, queueSeq:Long, messageKey:Long=0) = {
+  def entry(queue_key:Long, entry_seq:Long, message_key:Long=0) = {
     var queueEntry = new QueueEntryRecord
-    queueEntry.queueKey = queueKey
-    queueEntry.queueSeq = queueSeq
-    queueEntry.messageKey = messageKey
+    queueEntry.queue_key = queue_key
+    queueEntry.entry_seq = entry_seq
+    queueEntry.message_key = message_key
     queueEntry
   }
 
@@ -126,51 +126,51 @@ abstract class StoreBenchmarkSupport ext
     }
   }
 
-  def populate(queueKey:Long, messages:List[String], firstSeq:Long=1) = {
-    var batch = store.createStoreUOW
+  def populate(queue_key:Long, messages:List[String], firstSeq:Long=1) = {
+    var batch = store.create_uow
     var msgKeys = ListBuffer[Long]()
     var nextSeq = firstSeq
 
     messages.foreach { message=>
       val msgKey = addMessage(batch, message)
       msgKeys += msgKey
-      batch.enqueue(entry(queueKey, nextSeq, msgKey))
+      batch.enqueue(entry(queue_key, nextSeq, msgKey))
       nextSeq += 1
     }
 
     val tracker = new TaskTracker()
     tracker.release(batch)
-    msgKeys.foreach { msgKey =>
-      store.flushMessage(msgKey) {}
+    msgKeys.foreach { msg_key =>
+      store.flush_message(msg_key) {}
     }
     tracker.await
     msgKeys
   }
 
   test("store enqueue and load latencey") {
-    val A = addQueue("A")
-    var messageKeys = storeMessages(A)
-    loadMessages(A, messageKeys)
+    val A = add_queue("A")
+    var message_keys = storeMessages(A)
+    loadMessages(A, message_keys)
   }
 
   def storeMessages(queue:Long) = {
 
     var seq = 0L
-    var messageKeys = ListBuffer[Long]()
+    var message_keys = ListBuffer[Long]()
 
     val content = payload("message\n", 1024)
     var metric = benchmarkCount(100000) {
       seq += 1
 
-      var batch = store.createStoreUOW
+      var batch = store.create_uow
       val message = addMessage(batch, content)
-      messageKeys += message
+      message_keys += message
       batch.enqueue(entry(queue, seq, message))
 
       val latch = new CountDownLatch(1)
       batch.setDisposer(^{latch.countDown} )
       batch.release
-      store.flushMessage(message) {}
+      store.flush_message(message) {}
 
       latch.await
 
@@ -178,15 +178,15 @@ abstract class StoreBenchmarkSupport ext
     println("enqueue metrics: "+metric)
     println("enqueue latency is: "+metric.latency(TimeUnit.MILLISECONDS)+" ms")
     println("enqueue rate is: "+metric.rate(TimeUnit.SECONDS)+" enqueues/s")
-    messageKeys.toList
+    message_keys.toList
   }
 
-  def loadMessages(queue:Long, messageKeys: List[Long]) = {
+  def loadMessages(queue:Long, message_keys: List[Long]) = {
 
-    var keys = messageKeys.toList
+    var keys = message_keys.toList
     val metric = benchmarkCount(keys.size) {
       val latch = new CountDownLatch(1)
-      store.loadMessage(keys.head) { msg=>
+      store.load_message(keys.head) { msg=>
         assert(msg.isDefined, "message key not found: "+keys.head)
         latch.countDown
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Wed Dec 22 17:37:50 2010
@@ -33,7 +33,7 @@ abstract class StoreFunSuiteSupport exte
 
   var store:Store = null
 
-  def createStore(flushDelay:Long):Store
+  def create_store(flushDelay:Long):Store
 
   /**
    * Handy helper to call an async method on the store and wait for
@@ -56,7 +56,7 @@ abstract class StoreFunSuiteSupport exte
 
 
   override protected def beforeAll() = {
-    store = createStore(5*1000)
+    store = create_store(5*1000)
     val tracker = new LoggingTracker("store startup")
     tracker.start(store)
     tracker.await
@@ -83,17 +83,17 @@ abstract class StoreFunSuiteSupport exte
 
   val queue_key_counter = new LongCounter
 
-  def addQueue(name:String):Long = {
-    var queueA = new QueueRecord
-    queueA.key = queue_key_counter.incrementAndGet
-    queueA.binding_kind = ascii("test")
-    queueA.binding_data = ascii(name)
-    val rc:Boolean = CB( cb=> store.addQueue(queueA)(cb) )
+  def add_queue(name:String):Long = {
+    var queue_a = new QueueRecord
+    queue_a.key = queue_key_counter.incrementAndGet
+    queue_a.binding_kind = ascii("test")
+    queue_a.binding_data = ascii(name)
+    val rc:Boolean = CB( cb=> store.add_queue(queue_a)(cb) )
     expect(true)(rc)
-    queueA.key
+    queue_a.key
   }
 
-  def addMessage(batch:StoreUOW, content:String):Long = {
+  def add_message(batch:StoreUOW, content:String):Long = {
     var message = new MessageRecord
     message.protocol = ascii("test-protocol")
     message.buffer = ascii(content).buffer
@@ -102,81 +102,81 @@ abstract class StoreFunSuiteSupport exte
   }
 
 
-  def entry(queueKey:Long, queueSeq:Long, messageKey:Long=0) = {
+  def entry(queue_key:Long, entry_seq:Long, message_key:Long=0) = {
     var queueEntry = new QueueEntryRecord
-    queueEntry.queueKey = queueKey
-    queueEntry.queueSeq = queueSeq
-    queueEntry.messageKey = messageKey
+    queueEntry.queue_key = queue_key
+    queueEntry.entry_seq = entry_seq
+    queueEntry.message_key = message_key
     queueEntry
   }
 
-  def populate(queueKey:Long, messages:List[String], firstSeq:Long=1) = {
-    var batch = store.createStoreUOW
-    var msgKeys = ListBuffer[Long]()
-    var nextSeq = firstSeq
+  def populate(queue_key:Long, messages:List[String], first_seq:Long=1) = {
+    var batch = store.create_uow
+    var msg_keys = ListBuffer[Long]()
+    var next_seq = first_seq
 
     messages.foreach { message=>
-      val msgKey = addMessage(batch, message)
-      msgKeys += msgKey
-      batch.enqueue(entry(queueKey, nextSeq, msgKey))
-      nextSeq += 1
+      val msgKey = add_message(batch, message)
+      msg_keys += msgKey
+      batch.enqueue(entry(queue_key, next_seq, msgKey))
+      next_seq += 1
     }
 
     val tracker = new TaskTracker()
     tracker.release(batch)
-    msgKeys.foreach { msgKey =>
-      store.flushMessage(msgKey) {}
+    msg_keys.foreach { msgKey =>
+      store.flush_message(msgKey) {}
     }
     tracker.await
-    msgKeys
+    msg_keys
   }
 
   test("load stored message") {
-    val A = addQueue("A")
-    val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+    val A = add_queue("A")
+    val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Option[MessageRecord] = CB( cb=> store.loadMessage(msgKeys.head)(cb) )
+    val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head)(cb) )
     expect(ascii("message 1").buffer) {
       rc.get.buffer
     }
   }
 
   test("add and list queues") {
-    val A = addQueue("A")
-    val B = addQueue("B")
-    val C = addQueue("C")
+    val A = add_queue("A")
+    val B = add_queue("B")
+    val C = add_queue("C")
 
     expectCB(List(A,B,C).toSeq) { cb=>
-      store.listQueues(cb)
+      store.list_queues(cb)
     }
   }
 
   test("get queue status") {
-    val A = addQueue("my queue name")
+    val A = add_queue("my queue name")
     populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Option[QueueRecord] = CB( cb=> store.getQueue(A)(cb) )
+    val rc:Option[QueueRecord] = CB( cb=> store.get_queue(A)(cb) )
     expect(ascii("my queue name")) {
       rc.get.binding_data.ascii
     }
   }
 
   test("list queue entries") {
-    val A = addQueue("A")
-    val msgKeys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+    val A = add_queue("A")
+    val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Seq[QueueEntryRecord] = CB( cb=> store.listQueueEntries(A,msgKeys.head, msgKeys.last)(cb) )
-    expect(msgKeys.toSeq) {
-      rc.map( _.messageKey )
+    val rc:Seq[QueueEntryRecord] = CB( cb=> store.list_queue_entries(A,msg_keys.head, msg_keys.last)(cb) )
+    expect(msg_keys.toSeq) {
+      rc.map( _.message_key )
     }
   }
 
   test("batch completes after a delay") {x}
   def x = {
-    val A = addQueue("A")
-    var batch = store.createStoreUOW
+    val A = add_queue("A")
+    var batch = store.create_uow
 
-    val m1 = addMessage(batch, "message 1")
+    val m1 = add_message(batch, "message 1")
     batch.enqueue(entry(A, 1, m1))
 
     val tracker = new TaskTracker()
@@ -190,16 +190,16 @@ abstract class StoreFunSuiteSupport exte
   }
 
   test("flush cancels the delay") {
-    val A = addQueue("A")
-    var batch = store.createStoreUOW
+    val A = add_queue("A")
+    var batch = store.create_uow
 
-    val m1 = addMessage(batch, "message 1")
+    val m1 = add_message(batch, "message 1")
     batch.enqueue(entry(A, 1, m1))
 
     val tracker = new TaskTracker()
     tracker.release(batch)
 
-    store.flushMessage(m1) {}
+    store.flush_message(m1) {}
 
     expect(true) {
       tracker.await(1, TimeUnit.SECONDS)

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/transport/VMTransportTest.java (from r1052004, activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/transport/VMTransportTest.java?p2=activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/transport/VMTransportTest.java&p1=activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java&r1=1052004&r2=1052005&rev=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/transport/VMTransportTest.java Wed Dec 22 17:37:50 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.transport.vm;
+package org.apache.activemq.apollo.broker.transport;
 
 import java.io.IOException;
 

Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala Wed Dec 22 17:37:50 2010
@@ -78,7 +78,7 @@ class CassandraClient() {
     import PBQueueEntryRecord._
     val pb = PBQueueEntryRecord.FACTORY.parseUnframed(v)
     val rc = new QueueEntryRecord
-    rc.messageKey = pb.getMessageKey
+    rc.message_key = pb.getMessageKey
     rc.attachment = pb.getAttachment
     rc.size = pb.getSize
     rc.redeliveries = pb.getRedeliveries.toShort
@@ -87,7 +87,7 @@ class CassandraClient() {
 
   implicit def encodeQueueEntryRecord(v: QueueEntryRecord): Array[Byte] = {
     val pb = new PBQueueEntryRecord.Bean
-    pb.setMessageKey(v.messageKey)
+    pb.setMessageKey(v.message_key)
     pb.setAttachment(v.attachment)
     pb.setSize(v.size)
     pb.setRedeliveries(v.redeliveries)
@@ -131,11 +131,11 @@ class CassandraClient() {
     }
   }
 
-  def removeQueue(queueKey: Long):Boolean = {
+  def removeQueue(queue_key: Long):Boolean = {
     withSession {
       session =>
-        session.remove(schema.entries \ queueKey)
-        session.remove(schema.queue_name \ queueKey)
+        session.remove(schema.entries \ queue_key)
+        session.remove(schema.queue_name \ queue_key)
     }
     true
   }
@@ -180,14 +180,14 @@ class CassandraClient() {
                 }
                 action.enqueues.foreach {
                   queueEntry =>
-                    val qid = queueEntry.queueKey
-                    val seq = queueEntry.queueSeq
+                    val qid = queueEntry.queue_key
+                    val seq = queueEntry.entry_seq
                     operations ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
                 }
                 action.dequeues.foreach {
                   queueEntry =>
-                    val qid = queueEntry.queueKey
-                    val seq = queueEntry.queueSeq
+                    val qid = queueEntry.queue_key
+                    val seq = queueEntry.entry_seq
                     operations ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
                 }
             }
@@ -210,22 +210,22 @@ class CassandraClient() {
     }
   }
 
-  def listQueueEntryGroups(queueKey: Long, limit: Int): Seq[QueueEntryRange] = {
+  def listQueueEntryGroups(queue_key: Long, limit: Int): Seq[QueueEntryRange] = {
     withSession {
       session =>
         var rc = ListBuffer[QueueEntryRange]()
         var group:QueueEntryRange = null
 
         // TODO: this is going to bring back lots of entries.. not good.
-        session.list(schema.entries \ queueKey).foreach { x=>
+        session.list(schema.entries \ queue_key).foreach { x=>
 
           val record:QueueEntryRecord = x.value
 
           if( group == null ) {
             group = new QueueEntryRange
-            group.firstQueueSeq = record.queueSeq
+            group.first_entry_seq = record.entry_seq
           }
-          group.lastQueueSeq = record.queueSeq
+          group.last_entry_seq = record.entry_seq
           group.count += 1
           group.size += record.size
           if( group.count == limit) {
@@ -241,13 +241,13 @@ class CassandraClient() {
     }
   }
 
-  def getQueueEntries(queueKey: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
+  def getQueueEntries(queue_key: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
     withSession {
       session =>
-        session.list(schema.entries \ queueKey, RangePredicate(firstSeq, lastSeq)).map { x=>
+        session.list(schema.entries \ queue_key, RangePredicate(firstSeq, lastSeq)).map { x=>
           val rc:QueueEntryRecord = x.value
-          rc.queueKey = queueKey
-          rc.queueSeq = x.name
+          rc.queue_key = queue_key
+          rc.entry_seq = x.name
           rc
         }
     }

Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala Wed Dec 22 17:37:50 2010
@@ -79,7 +79,7 @@ class CassandraStore extends DelayingSto
   protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
     blocking {
       client.store(uows)
-      dispatchQueue {
+      dispatch_queue {
         callback
       }
     }
@@ -88,16 +88,16 @@ class CassandraStore extends DelayingSto
   def configure(config: StoreDTO, reporter: Reporter):Unit = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
 
 
-  def storeStatusDTO(callback:(StoreStatusDTO)=>Unit) = dispatchQueue {
+  def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
     val rc = new SimpleStoreStatusDTO
-    rc.state = serviceState.toString
-    rc.state_since = serviceState.since
+    rc.state = service_state.toString
+    rc.state_since = service_state.since
     callback(rc)
   }
 
   def configure(config: CassandraStoreDTO, reporter: Reporter):Unit = {
     if ( CassandraStore.validate(config, reporter) < ERROR ) {
-      if( serviceState.isStarted ) {
+      if( service_state.is_started ) {
         // TODO: apply changes while he broker is running.
         reporter.report(WARN, "Updating cassandra store configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
       } else {
@@ -106,7 +106,7 @@ class CassandraStore extends DelayingSto
     }
   }
 
-  protected def _start(onCompleted: Runnable) = {
+  protected def _start(on_completed: Runnable) = {
     info("Starting cassandra store at: '%s'", config.hosts.toList.mkString(", "))
     blocking = Executors.newFixedThreadPool(20, new ThreadFactory(){
       def newThread(r: Runnable) = {
@@ -130,10 +130,10 @@ class CassandraStore extends DelayingSto
 
     client.start
     schedualDisplayStats
-    onCompleted.run
+    on_completed.run
   }
 
-  protected def _stop(onCompleted: Runnable) = {
+  protected def _stop(on_completed: Runnable) = {
     info("Stopping cassandra store at: '%s'", config.hosts.toList.mkString(", "))
     blocking.shutdown
     new Thread("casandra client shutdown") {
@@ -142,7 +142,7 @@ class CassandraStore extends DelayingSto
           warn("cassandra thread pool is taking a long time to shutdown.")
         }
         client.stop
-        onCompleted.run
+        on_completed.run
       }
     }.start
   }
@@ -156,13 +156,13 @@ class CassandraStore extends DelayingSto
   val storeLatency = new TimeCounter
   def schedualDisplayStats:Unit = {
     def displayStats = {
-      if( serviceState.isStarted ) {
+      if( service_state.is_started ) {
         val cl = storeLatency.apply(true)
         info("metrics: store latency: %,.3f ms", cl.avgTime(TimeUnit.MILLISECONDS))
         schedualDisplayStats
       }
     }
-    dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^{ displayStats })
+    dispatch_queue.dispatchAfter(5, TimeUnit.SECONDS, ^{ displayStats })
   }
 
   /**
@@ -179,51 +179,51 @@ class CassandraStore extends DelayingSto
   /**
    * Ges the next queue key identifier.
    */
-  def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
+  def get_last_queue_key(callback:(Option[Long])=>Unit):Unit = {
     // TODO:
     callback( Some(1L) )
   }
 
-  def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
+  def add_queue(record: QueueRecord)(callback: (Boolean) => Unit) = {
     blocking {
       client.addQueue(record)
       callback(true)
     }
   }
 
-  def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
+  def remove_queue(queueKey: Long)(callback: (Boolean) => Unit) = {
     blocking {
       callback(client.removeQueue(queueKey))
     }
   }
 
-  def getQueue(id: Long)(callback: (Option[QueueRecord]) => Unit) = {
+  def get_queue(id: Long)(callback: (Option[QueueRecord]) => Unit) = {
     blocking {
       callback( client.getQueue(id) )
     }
   }
 
-  def listQueues(callback: (Seq[Long]) => Unit) = {
+  def list_queues(callback: (Seq[Long]) => Unit) = {
     blocking {
       callback( client.listQueues )
     }
   }
 
-  def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
+  def load_message(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
     blocking {
       callback( client.loadMessage(id) )
     }
   }
 
 
-  def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
+  def list_queue_entry_ranges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
     blocking {
       callback( client.listQueueEntryGroups(queueKey, limit) )
     }
   }
 
 
-  def listQueueEntries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+  def list_queue_entries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
     blocking {
       callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
     }

Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/test/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/test/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStoreTest.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/test/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/test/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStoreTest.scala Wed Dec 22 17:37:50 2010
@@ -23,7 +23,7 @@ import org.apache.activemq.apollo.broker
  */
 class CassandraStoreTest extends StoreFunSuiteSupport with CassandraServerMixin {
 
-  def createStore(flushDelay:Long):Store = {
+  def create_store(flushDelay:Long):Store = {
     val rc = new CassandraStore
     rc.config.flush_delay = flushDelay
     rc

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala Wed Dec 22 17:37:50 2010
@@ -60,7 +60,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
   override def log: Log = HawtDBClient
 
-  def dispatchQueue = hawtDBStore.dispatchQueue
+  def dispatchQueue = hawtDBStore.dispatch_queue
 
 
   private val indexFileFactory = new TxPageFileFactory()
@@ -247,8 +247,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
             }
             action.dequeues.foreach {
               queueEntry =>
-                val queueKey = queueEntry.queueKey
-                val queueSeq = queueEntry.queueSeq
+                val queueKey = queueEntry.queue_key
+                val queueSeq = queueEntry.entry_seq
                 batch += new RemoveQueueEntry.Bean().setQueueKey(queueKey).setQueueSeq(queueSeq)
             }
         }
@@ -308,9 +308,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
           entryIndex.iterator.foreach { entry =>
             if( group == null ) {
               group = new QueueEntryRange
-              group.firstQueueSeq = entry.getKey.longValue
+              group.first_entry_seq = entry.getKey.longValue
             }
-            group.lastQueueSeq = entry.getKey.longValue
+            group.last_entry_seq = entry.getKey.longValue
             group.count += 1
             group.size += entry.getValue.getSize
             if( group.count == limit) {

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala Wed Dec 22 17:37:50 2010
@@ -71,7 +71,7 @@ class HawtDBStore extends DelayingStoreS
   var config:HawtDBStoreDTO = defaultConfig
   val client = new HawtDBClient(this)
 
-  val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatchQueue)
+  val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatch_queue)
   load_source.setEventHandler(^{drain_loads});
 
   override def toString = "hawtdb store"
@@ -83,7 +83,7 @@ class HawtDBStore extends DelayingStoreS
   protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
     executor_pool {
       client.store(uows, ^{
-        dispatchQueue {
+        dispatch_queue {
           callback
         }
       })
@@ -94,7 +94,7 @@ class HawtDBStore extends DelayingStoreS
 
   def configure(config: HawtDBStoreDTO, reporter: Reporter) = {
     if ( HawtDBStore.validate(config, reporter) < ERROR ) {
-      if( serviceState.isStarted ) {
+      if( service_state.is_started ) {
         // TODO: apply changes while he broker is running.
         reporter.report(WARN, "Updating hawtdb store configuration at runtime is not yet supported.  You must restart the broker for the change to take effect.")
       } else {
@@ -103,7 +103,7 @@ class HawtDBStore extends DelayingStoreS
     }
   }
 
-  protected def _start(onCompleted: Runnable) = {
+  protected def _start(on_completed: Runnable) = {
     info("Starting hawtdb store at: '%s'", config.directory)
     executor_pool = Executors.newFixedThreadPool(1, new ThreadFactory(){
       def newThread(r: Runnable) = {
@@ -122,7 +122,7 @@ class HawtDBStore extends DelayingStoreS
         scheduleCleanup(v)
         scheduleFlush(v)
         load_source.resume
-        onCompleted.run
+        on_completed.run
       })
     }
   }
@@ -136,7 +136,7 @@ class HawtDBStore extends DelayingStoreS
         }
       }
     }
-    dispatchQueue.dispatchAfter(client.index_flush_interval, TimeUnit.MILLISECONDS, ^ {try_flush})
+    dispatch_queue.dispatchAfter(client.index_flush_interval, TimeUnit.MILLISECONDS, ^ {try_flush})
   }
 
   def scheduleCleanup(version:Int): Unit = {
@@ -148,10 +148,10 @@ class HawtDBStore extends DelayingStoreS
         }
       }
     }
-    dispatchQueue.dispatchAfter(client.cleanup_interval, TimeUnit.MILLISECONDS, ^ {try_cleanup})
+    dispatch_queue.dispatchAfter(client.cleanup_interval, TimeUnit.MILLISECONDS, ^ {try_cleanup})
   }
 
-  protected def _stop(onCompleted: Runnable) = {
+  protected def _stop(on_completed: Runnable) = {
     info("Stopping hawtdb store at: '%s'", config.directory)
     schedule_version.incrementAndGet
     new Thread() {
@@ -161,7 +161,7 @@ class HawtDBStore extends DelayingStoreS
         executor_pool.awaitTermination(86400, TimeUnit.SECONDS)
         executor_pool = null
         client.stop
-        onCompleted.run
+        on_completed.run
       }
     }.start
   }
@@ -189,37 +189,37 @@ class HawtDBStore extends DelayingStoreS
   /**
    * Ges the last queue key identifier stored.
    */
-  def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
+  def get_last_queue_key(callback:(Option[Long])=>Unit):Unit = {
     executor_pool {
       callback(Some(client.rootBuffer.getLastQueueKey.longValue))
     }
   }
 
-  def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
+  def add_queue(record: QueueRecord)(callback: (Boolean) => Unit) = {
     executor_pool {
      client.addQueue(record, ^{ callback(true) })
     }
   }
 
-  def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
+  def remove_queue(queueKey: Long)(callback: (Boolean) => Unit) = {
     executor_pool {
       client.removeQueue(queueKey,^{ callback(true) })
     }
   }
 
-  def getQueue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
+  def get_queue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
     executor_pool {
       callback( client.getQueue(queueKey) )
     }
   }
 
-  def listQueues(callback: (Seq[Long]) => Unit) = {
+  def list_queues(callback: (Seq[Long]) => Unit) = {
     executor_pool {
       callback( client.listQueues )
     }
   }
 
-  def loadMessage(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
+  def load_message(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, { (result)=>
         end()
@@ -236,13 +236,13 @@ class HawtDBStore extends DelayingStoreS
     }
   }
 
-  def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
+  def list_queue_entry_ranges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
     executor_pool ^{
       callback( client.listQueueEntryGroups(queueKey, limit) )
     }
   }
 
-  def listQueueEntries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
+  def list_queue_entries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
     executor_pool ^{
       callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
     }
@@ -250,7 +250,7 @@ class HawtDBStore extends DelayingStoreS
 
   def poll_stats:Unit = {
     def displayStats = {
-      if( serviceState.isStarted ) {
+      if( service_state.is_started ) {
 
         flush_latency = flush_latency_counter(true)
         message_load_latency = message_load_latency_counter(true)
@@ -263,14 +263,14 @@ class HawtDBStore extends DelayingStoreS
       }
     }
 
-    dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
+    dispatch_queue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
   }
 
-  def storeStatusDTO(callback:(StoreStatusDTO)=>Unit) = dispatchQueue {
+  def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
     val rc = new HawtDBStoreStatusDTO
 
-    rc.state = serviceState.toString
-    rc.state_since = serviceState.since
+    rc.state = service_state.toString
+    rc.state_since = service_state.since
 
     rc.flush_latency = flush_latency
     rc.message_load_latency = message_load_latency

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala Wed Dec 22 17:37:50 2010
@@ -56,9 +56,9 @@ object Helpers {
 
   implicit def toQueueEntryRecord(pb: AddQueueEntry.Getter): QueueEntryRecord = {
     val rc = new QueueEntryRecord
-    rc.queueKey = pb.getQueueKey
-    rc.queueSeq = pb.getQueueSeq
-    rc.messageKey = pb.getMessageKey
+    rc.queue_key = pb.getQueueKey
+    rc.entry_seq = pb.getQueueSeq
+    rc.message_key = pb.getMessageKey
     rc.attachment = pb.getAttachment
     rc.size = pb.getSize
     rc.redeliveries = pb.getRedeliveries.toShort
@@ -67,9 +67,9 @@ object Helpers {
 
   implicit def fromQueueEntryRecord(v: QueueEntryRecord): AddQueueEntry.Bean = {
     val pb = new AddQueueEntry.Bean
-    pb.setQueueKey(v.queueKey)
-    pb.setQueueSeq(v.queueSeq)
-    pb.setMessageKey(v.messageKey)
+    pb.setQueueKey(v.queue_key)
+    pb.setQueueSeq(v.entry_seq)
+    pb.setMessageKey(v.message_key)
     pb.setAttachment(v.attachment)
     pb.setSize(v.size)
     pb.setRedeliveries(v.redeliveries)

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreBenchmark.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreBenchmark.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreBenchmark.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreBenchmark.scala Wed Dec 22 17:37:50 2010
@@ -25,7 +25,7 @@ import org.apache.activemq.apollo.broker
  */
 class HawtDBStoreBenchmark extends StoreBenchmarkSupport {
 
-  def createStore(flushDelay:Long):Store = {
+  def create_store(flushDelay:Long):Store = {
     val rc = new HawtDBStore
     rc.config.flush_delay = flushDelay
     rc

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreTest.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/test/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStoreTest.scala Wed Dec 22 17:37:50 2010
@@ -23,7 +23,7 @@ import org.apache.activemq.apollo.broker
  */
 class HawtDBStoreTest extends StoreFunSuiteSupport {
 
-  def createStore(flushDelay:Long):Store = {
+  def create_store(flushDelay:Long):Store = {
     val rc = new HawtDBStore
     rc.config.flush_delay = flushDelay
     rc

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Dec 22 17:37:50 2010
@@ -102,7 +102,7 @@ class StompProtocolHandler extends Proto
 
   override protected def log = StompProtocolHandler
 
-  protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
+  protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
 
   trait AckHandler {
     def track(delivery:Delivery):Unit
@@ -208,7 +208,7 @@ class StompProtocolHandler extends Proto
     override val exclusive:Boolean
   ) extends BaseRetained with DeliveryConsumer {
 
-    val dispatchQueue = StompProtocolHandler.this.dispatchQueue
+    val dispatch_queue = StompProtocolHandler.this.dispatchQueue
 
     override def connection = Some(StompProtocolHandler.this.connection)
 
@@ -237,10 +237,10 @@ class StompProtocolHandler extends Proto
       def consumer = StompConsumer.this
       var closed = false
 
-      val session = session_manager.open(producer.dispatchQueue)
+      val session = session_manager.open(producer.dispatch_queue)
 
       def close = {
-        assert(getCurrentQueue == producer.dispatchQueue)
+        assert(getCurrentQueue == producer.dispatch_queue)
         if( !closed ) {
           closed = true
           if( browser ) {
@@ -310,7 +310,7 @@ class StompProtocolHandler extends Proto
 
   var host:VirtualHost = null
 
-  private def queue = connection.dispatchQueue
+  private def queue = connection.dispatch_queue
 
   // uses by STOMP 1.0 clients
   var connection_ack_handlers = HashMap[AsciiBuffer, AckHandler]()
@@ -323,8 +323,8 @@ class StompProtocolHandler extends Proto
   var waiting_on:String = "client request"
   var config:StompDTO = _
 
-  override def setConnection(connection: BrokerConnection) = {
-    super.setConnection(connection)
+  override def set_connection(connection: BrokerConnection) = {
+    super.set_connection(connection)
     import collection.JavaConversions._
     config = connection.connector.config.protocols.find( _.isInstanceOf[StompDTO]).map(_.asInstanceOf[StompDTO]).getOrElse(new StompDTO)
   }
@@ -377,9 +377,9 @@ class StompProtocolHandler extends Proto
     throw new Break()
   }
 
-  override def onTransportConnected() = {
+  override def on_transport_connected() = {
 
-    session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){x=>
+    session_manager = new SinkMux[StompFrame]( MapSink(connection.transport_sink){x=>
       trace("sending frame: %s", x)
       x
     }, dispatchQueue, StompFrame)
@@ -388,7 +388,7 @@ class StompProtocolHandler extends Proto
     resumeRead
   }
 
-  override def onTransportDisconnected() = {
+  override def on_transport_disconnected() = {
     if( !closed ) {
       heart_beat_monitor.stop
       closed=true;
@@ -416,7 +416,7 @@ class StompProtocolHandler extends Proto
   }
 
 
-  override def onTransportCommand(command:Any):Unit = {
+  override def on_transport_command(command:AnyRef):Unit = {
     if( dead ) {
       // We stop processing client commands once we are dead
       return;
@@ -656,7 +656,7 @@ class StompProtocolHandler extends Proto
         val producer = new DeliveryProducer() {
           override def connection = Some(StompProtocolHandler.this.connection)
 
-          override def dispatchQueue = queue
+          override def dispatch_queue = queue
         }
 
         // don't process frames until producer is connected...
@@ -952,11 +952,11 @@ class StompProtocolHandler extends Proto
   }
 
 
-  override def onTransportFailure(error: IOException) = {
+  override def on_transport_failure(error: IOException) = {
     if( !connection.stopped ) {
       suspendRead("shutdown")
       debug(error, "Shutting connection down due to: %s", error)
-      super.onTransportFailure(error);
+      super.on_transport_failure(error);
     }
   }
 
@@ -1002,22 +1002,22 @@ class StompProtocolHandler extends Proto
       queue += proc
     }
 
-    def commit(onComplete: => Unit) = {
+    def commit(on_complete: => Unit) = {
 
       val uow = if( host.store!=null ) {
-        host.store.createStoreUOW
+        host.store.create_uow
       } else {
         null
       }
 
       queue.foreach{ _(uow) }
       if( uow!=null ) {
-        uow.onComplete(^{
-          onComplete
+        uow.on_complete(^{
+          on_complete
         })
         uow.release
       } else {
-        onComplete
+        on_complete
       }
 
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala Wed Dec 22 17:37:50 2010
@@ -35,7 +35,7 @@ class StompRemoteConsumer extends Remote
   var outboundSink: OverflowSink[StompFrame] = null
 
   def onConnected() = {
-    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink) {x => x})
+    outboundSink = new OverflowSink[StompFrame](MapSink(transport_sink) {x => x})
     outboundSink.refiller = ^ {}
 
     val stompDestination = if (destination.domain == Router.QUEUE_DOMAIN) {
@@ -59,7 +59,7 @@ class StompRemoteConsumer extends Remote
     outboundSink.offer(frame);
   }
 
-  override def onTransportCommand(command: Object) = {
+  override def on_transport_command(command: Object) = {
     var frame = command.asInstanceOf[StompFrame]
     frame match {
       case StompFrame(CONNECTED, headers, _, _) =>
@@ -73,16 +73,16 @@ class StompRemoteConsumer extends Remote
           }
 
       case StompFrame(ERROR, headers, content, _) =>
-        onFailure(new Exception("Server reported an error: " + frame.content));
+        on_failure(new Exception("Server reported an error: " + frame.content));
       case _ =>
-        onFailure(new Exception("Unexpected stomp command: " + frame.action));
+        on_failure(new Exception("Unexpected stomp command: " + frame.action));
     }
   }
 
   protected def messageReceived() {
       if (thinkTime > 0) {
         transport.suspendRead
-        dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+        dispatch_queue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
           rate.increment();
           if (!stopped) {
             transport.resumeRead
@@ -136,9 +136,9 @@ class StompRemoteProducer extends Remote
           // if we are not going to wait for an ack back from the server,
           // then jut send the next one...
           if (thinkTime > 0) {
-            dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
+            dispatch_queue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
           } else {
-            dispatchQueue << task
+            dispatch_queue << task
           }
         }
       }
@@ -146,7 +146,7 @@ class StompRemoteProducer extends Remote
   }
 
   override def onConnected() = {
-    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink) {x => x})
+    outboundSink = new OverflowSink[StompFrame](MapSink(transport_sink) {x => x})
     outboundSink.refiller = ^ {drain}
 
     if (destination.domain == Router.QUEUE_DOMAIN) {
@@ -158,7 +158,7 @@ class StompRemoteProducer extends Remote
     send_next
   }
 
-  override def onTransportCommand(command: Object) = {
+  override def on_transport_command(command: Object) = {
     var frame = command.asInstanceOf[StompFrame]
     frame match {
       case StompFrame(RECEIPT, headers, _, _) =>
@@ -168,19 +168,24 @@ class StompRemoteProducer extends Remote
 
       case StompFrame(CONNECTED, headers, _, _) =>
       case StompFrame(ERROR, headers, content, _) =>
-        onFailure(new Exception("Server reported an error: " + frame.content.utf8));
+        on_failure(new Exception("Server reported an error: " + frame.content.utf8));
       case _ =>
-        onFailure(new Exception("Unexpected stomp command: " + frame.action));
+        on_failure(new Exception("Unexpected stomp command: " + frame.action));
     }
   }
 }
 
+object Watchog extends Log
+
 trait Watchog extends RemoteConsumer {
+
+  import Watchog._
+
   var messageCount = 0
 
   def watchdog(lastMessageCount: Int): Unit = {
     val seconds = 10
-    dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
+    dispatch_queue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
       if (messageCount == lastMessageCount) {
         warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
         stop

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala Wed Dec 22 17:37:50 2010
@@ -28,21 +28,21 @@ object BaseService extends Log
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait BaseService extends Service with Logging {
+trait BaseService extends Service {
 
-  override protected def log:Log = BaseService
+  import BaseService._
 
   sealed class State {
 
     val since = System.currentTimeMillis
 
     override def toString = getClass.getSimpleName
-    def isCreated = false
-    def isStarting = false
-    def isStarted = false
-    def isStopping = false
-    def isStopped= false
-    def isFailed= false
+    def is_created = false
+    def is_starting = false
+    def is_started = false
+    def is_stopping = false
+    def is_stopped= false
+    def is_failed= false
   }
 
   trait CallbackSupport {
@@ -51,35 +51,35 @@ trait BaseService extends Service with L
     def done = { callbacks.foreach(_.run); callbacks=Nil }
   }
 
-  protected class CREATED extends State { override def isCreated = true  }
-  protected class STARTING extends State with CallbackSupport { override def isStarting = true  }
-  protected class FAILED extends State { override def isFailed = true  }
-  protected class STARTED extends State { override def isStarted = true  }
-  protected class STOPPING extends State with CallbackSupport { override def isStopping = true  }
-  protected class STOPPED extends State { override def isStopped = true  }
+  protected class CREATED extends State { override def is_created = true  }
+  protected class STARTING extends State with CallbackSupport { override def is_starting = true  }
+  protected class FAILED extends State { override def is_failed = true  }
+  protected class STARTED extends State { override def is_started = true  }
+  protected class STOPPING extends State with CallbackSupport { override def is_stopping = true  }
+  protected class STOPPED extends State { override def is_stopped = true  }
 
-  protected val dispatchQueue:DispatchQueue
+  protected val dispatch_queue:DispatchQueue
 
   final def start() = start(null)
   final def stop() = stop(null)
 
   @volatile
-  protected var _serviceState:State = new CREATED
+  protected var _service_state:State = new CREATED
 
-  def serviceState = _serviceState
+  def service_state = _service_state
 
   @volatile
   protected var _serviceFailure:Exception = null
   def serviceFailure = _serviceFailure
 
-  final def start(onCompleted:Runnable) = ^{
+  final def start(on_completed:Runnable) = ^{
     def do_start = {
       val state = new STARTING()
-      state << onCompleted
-      _serviceState = state
+      state << on_completed
+      _service_state = state
       try {
         _start(^ {
-          _serviceState = new STARTED
+          _service_state = new STARTED
           state.done
         })
       }
@@ -87,45 +87,45 @@ trait BaseService extends Service with L
         case e:Exception =>
           error(e, "Start failed due to %s", e)
           _serviceFailure = e
-          _serviceState = new FAILED
+          _service_state = new FAILED
           state.done
       }
     }
     def done = {
-      if( onCompleted!=null ) {
-        onCompleted.run
+      if( on_completed!=null ) {
+        on_completed.run
       }
     }
-    _serviceState match {
+    _service_state match {
       case state:CREATED =>
         do_start
       case state:STOPPED =>
         do_start
       case state:STARTING =>
-        state << onCompleted
+        state << on_completed
       case state:STARTED =>
         done
       case state =>
         done
         error("Start should not be called from state: %s", state);
     }
-  } |>>: dispatchQueue
+  } |>>: dispatch_queue
 
-  final def stop(onCompleted:Runnable) = {
+  final def stop(on_completed:Runnable) = {
     def stop_task = {
       def done = {
-        if( onCompleted!=null ) {
-          onCompleted.run
+        if( on_completed!=null ) {
+          on_completed.run
         }
       }
-      _serviceState match {
+      _service_state match {
         case state:STARTED =>
           val state = new STOPPING
-          state << onCompleted
-          _serviceState = state
+          state << on_completed
+          _service_state = state
           try {
             _stop(^ {
-              _serviceState = new STOPPED
+              _service_state = new STOPPED
               state.done
             })
           }
@@ -133,11 +133,11 @@ trait BaseService extends Service with L
             case e:Exception =>
               error(e, "Stop failed due to: %s", e)
               _serviceFailure = e
-              _serviceState = new FAILED
+              _service_state = new FAILED
               state.done
           }
         case state:STOPPING =>
-          state << onCompleted
+          state << on_completed
         case state:STOPPED =>
           done
         case state =>
@@ -145,10 +145,10 @@ trait BaseService extends Service with L
           error("Stop should not be called from state: %s", state);
       }
     }
-    ^{ stop_task } |>>: dispatchQueue
+    ^{ stop_task } |>>: dispatch_queue
   }
 
-  protected def _start(onCompleted:Runnable)
-  protected def _stop(onCompleted:Runnable)
+  protected def _start(on_completed:Runnable)
+  protected def _stop(on_completed:Runnable)
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Reporter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Reporter.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Reporter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Reporter.scala Wed Dec 22 17:37:50 2010
@@ -59,7 +59,9 @@ trait Reporter {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait LoggingReporter extends Logging with Reporter {
+case class LoggingReporter(log:Log) extends Reporter {
+  import log._
+
   override def report(level:ReporterLevel, message:String) = {
     level match {
       case INFO=>

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1052005&r1=1052004&r2=1052005&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Wed Dec 22 17:37:50 2010
@@ -45,7 +45,7 @@ case class RuntimeResource(parent:Broker
       result(NOT_FOUND)
     } else {
       Future[Option[T]] { cb=>
-        broker.dispatchQueue {
+        broker.dispatch_queue {
           func(broker, cb)
         }
       }.getOrElse(result(NOT_FOUND))
@@ -56,7 +56,7 @@ case class RuntimeResource(parent:Broker
     with_broker { case (broker, cb) =>
       broker.virtual_hosts.valuesIterator.find( _.id == id) match {
         case Some(virtualHost)=>
-          virtualHost.dispatchQueue {
+          virtualHost.dispatch_queue {
             func(virtualHost, cb)
           }
         case None=> cb(None)
@@ -72,8 +72,8 @@ case class RuntimeResource(parent:Broker
 
       result.id = broker.id
       result.current_time = System.currentTimeMillis
-      result.state = broker.serviceState.toString
-      result.state_since = broker.serviceState.since
+      result.state = broker.service_state.toString
+      result.state_since = broker.service_state.since
       result.config = broker.config
 
       broker.virtual_hosts.values.foreach{ host=>
@@ -109,8 +109,8 @@ case class RuntimeResource(parent:Broker
     with_virtual_host(id) { case (virtualHost,cb) =>
       val result = new VirtualHostStatusDTO
       result.id = virtualHost.id
-      result.state = virtualHost.serviceState.toString
-      result.state_since = virtualHost.serviceState.since
+      result.state = virtualHost.service_state.toString
+      result.state_since = virtualHost.service_state.since
       result.config = virtualHost.config
 
       virtualHost.router.routing_nodes.foreach { node=>
@@ -118,7 +118,7 @@ case class RuntimeResource(parent:Broker
       }
 
       if( virtualHost.store != null ) {
-        virtualHost.store.storeStatusDTO { x=>
+        virtualHost.store.get_store_status { x=>
           result.store = x
           cb(Some(result))
         }
@@ -206,7 +206,7 @@ case class RuntimeResource(parent:Broker
     cb(None)
   } else {
     val q = qo.get
-    q.dispatchQueue {
+    q.dispatch_queue {
       val rc = new QueueStatusDTO
       rc.id = q.id
       rc.binding = q.binding.binding_dto
@@ -296,8 +296,8 @@ case class RuntimeResource(parent:Broker
 
           val result = new ConnectorStatusDTO
           result.id = connector.id
-          result.state = connector.serviceState.toString
-          result.state_since = connector.serviceState.since
+          result.state = connector.service_state.toString
+          result.state_since = connector.service_state.since
           result.config = connector.config
 
           result.accepted = connector.accept_counter.get
@@ -334,7 +334,7 @@ case class RuntimeResource(parent:Broker
       broker.connectors.flatMap{ _.connections.get(id) }.headOption match {
         case None => cb(None)
         case Some(connection:BrokerConnection) =>
-          connection.dispatchQueue {
+          connection.dispatch_queue {
             cb(Some(connection.get_connection_status))
           }
       }