You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/02/21 19:23:17 UTC
svn commit: r1448756 - in /activemq/activemq-apollo/trunk:
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/...
Author: chirino
Date: Thu Feb 21 18:23:16 2013
New Revision: 1448756
URL: http://svn.apache.org/r1448756
Log:
Remove the extra uow retain/release debugging since those bugs have been stored.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Thu Feb 21 18:23:16 2013
@@ -1250,14 +1250,14 @@ class AmqpProtocolHandler extends Protoc
def commit(on_complete: => Unit) = {
if( host.store!=null ) {
- val uow = host.store.create_uow(toString)
+ val uow = host.store.create_uow
// println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
uow.on_complete {
// println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
on_complete
}
queue.foreach{ _._1(uow) }
- uow.release(toString)
+ uow.release
} else {
queue.foreach{ _._1(null) }
on_complete
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Feb 21 18:23:16 2013
@@ -591,8 +591,8 @@ class Queue(val router: LocalRouter, val
def is_topic_queue = resource_kind eq TopicQueueKind
- def create_uow(owner:String):StoreUOW = if(virtual_host.store==null) null else virtual_host.store.create_uow(owner)
- def create_uow(owner:String, uow:StoreUOW):StoreUOW = if(uow==null) create_uow(owner) else {uow.retain(owner); uow}
+ def create_uow:StoreUOW = if(virtual_host.store==null) null else virtual_host.store.create_uow
+ def create_uow(uow:StoreUOW):StoreUOW = if(uow==null) create_uow else {uow.retain; uow}
object messages extends Sink[(Session[Delivery], Delivery)] {
def stall_check = {}
@@ -692,7 +692,7 @@ class Queue(val router: LocalRouter, val
}
if( delivery.persistent && tune_persistent ) {
assert(delivery.uow!=null)
- delivery.uow.release(binding.binding_kind+":"+id+":offer")
+ delivery.uow.release
}
return true
}
@@ -746,7 +746,7 @@ class Queue(val router: LocalRouter, val
// release the store batch...
if (uow != null) {
- uow.release(binding.binding_kind+":"+id+":offer")
+ uow.release
}
@@ -844,7 +844,7 @@ class Queue(val router: LocalRouter, val
// remove the expired message if it has not been
// acquired.
if( !state.is_acquired ) {
- val uow = create_uow(binding.binding_kind+":"+id+":swap")
+ val uow = create_uow
entry.dequeue(uow)
expired(uow, entry) {
if( entry.isLinked ) {
@@ -856,7 +856,7 @@ class Queue(val router: LocalRouter, val
// remove the expired message if it has not been
// acquired.
if( !state.is_acquired ) {
- val uow = create_uow(binding.binding_kind+":"+id+":swap")
+ val uow = create_uow
entry.dequeue(uow)
expired(uow, entry) {
if( entry.isLinked ) {
@@ -1088,7 +1088,7 @@ class Queue(val router: LocalRouter, val
delivery.uow = if(delivery.storeKey == -1) {
null
} else {
- create_uow(binding.binding_kind+":"+id+":dlq", original_uow)
+ create_uow(original_uow)
}
delivery.expiration=0
@@ -1107,7 +1107,7 @@ class Queue(val router: LocalRouter, val
val (delivery, callback) = value;
callback(delivery.uow)
if( delivery.uow!=null ) {
- delivery.uow.release(binding.binding_kind+":"+id+":dlq")
+ delivery.uow.release
delivery.uow = null
}
}
@@ -1148,11 +1148,11 @@ class Queue(val router: LocalRouter, val
case Consumed =>
entry.ack(uow)
case Expired=>
- val actual = create_uow(binding.binding_kind+":"+id+":ack", uow)
+ val actual = create_uow(uow)
expired(actual, entry.entry) {
entry.ack(actual)
}
- actual.release(binding.binding_kind+":"+id+":ack")
+ actual.release
case Delivered =>
entry.increment_nack
entry.entry.redelivered
@@ -1172,7 +1172,7 @@ class Queue(val router: LocalRouter, val
}
}
if( uow!=null ) {
- uow.release(binding.binding_kind+":"+id+":ack-merge:"+entry.entry.seq)
+ uow.release
}
}
}
@@ -1220,7 +1220,7 @@ class Queue(val router: LocalRouter, val
delivery.message.retain
}
if( delivery.persistent && tune_persistent ) {
- delivery.uow = create_uow(binding.binding_kind+":"+id+":offer", delivery.uow)
+ delivery.uow = create_uow(delivery.uow)
}
val rc = downstream.offer(delivery)
assert(rc, "session should accept since it was not full")
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Thu Feb 21 18:23:16 2013
@@ -205,9 +205,9 @@ class QueueEntry(val queue:Queue, val se
def dequeue(uow: StoreUOW) = {
if ( queued ) {
if (messageKey != -1) {
- val actual_uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":dequeue", uow)
+ val actual_uow = queue.create_uow(uow)
actual_uow.dequeue(toQueueEntryRecord)
- actual_uow.release(queue.binding.binding_kind+":"+queue.id+":dequeue")
+ actual_uow.release
}
queue.dequeue_item_counter += 1
queue.dequeue_size_counter += size
@@ -465,7 +465,7 @@ class QueueEntry(val queue:Queue, val se
switch_to_swapped
} else {
swapping_out=true
- val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":swap_out")
+ val uow = queue.create_uow
// Are we swapping out a non-persistent message?
val flush = if( delivery.storeKey == -1 ) {
@@ -483,7 +483,7 @@ class QueueEntry(val queue:Queue, val se
uow.complete_asap
}
}
- uow.release(queue.binding.binding_kind+":"+queue.id+":swap_out")
+ uow.release
}
}
}
@@ -570,14 +570,14 @@ class QueueEntry(val queue:Queue, val se
queue.assert_executing
if( !is_acquired && expiration != 0 && expiration <= queue.now ) {
- val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":dispatch-expired")
+ val uow = queue.create_uow
entry.dequeue(uow)
queue.expired(uow, entry) {
if( isLinked ) {
remove
}
}
- uow.release(queue.binding.binding_kind+":"+queue.id+":dispatch-expired")
+ uow.release
return true
}
@@ -669,7 +669,7 @@ class QueueEntry(val queue:Queue, val se
acquiredDelivery.ack = (consumed, uow)=> {
if( uow!=null ) {
- uow.retain(queue.binding.binding_kind+":"+queue.id+":ack-merge:"+seq)
+ uow.retain
}
queue.ack_source.merge((acquiredQueueEntry, consumed, uow))
}
@@ -840,14 +840,14 @@ class QueueEntry(val queue:Queue, val se
queue.assert_executing
if( !is_acquired && expiration != 0 && expiration <= queue.now ) {
- val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":expire")
+ val uow = queue.create_uow
entry.dequeue(uow)
queue.expired(uow, entry) {
if( isLinked ) {
remove
}
}
- uow.release(queue.binding.binding_kind+":"+queue.id+":expire")
+ uow.release
return true
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Thu Feb 21 18:23:16 2013
@@ -294,7 +294,7 @@ abstract class DeliveryProducerRoute(rou
false
} else {
if (delivery.uow != null) {
- delivery.uow.retain("route:"+dispatch_queue.getLabel+":offer")
+ delivery.uow.retain
}
if ( !is_connected ) {
overflow = delivery
@@ -346,7 +346,7 @@ abstract class DeliveryProducerRoute(rou
if ( target.consumer.is_persistent && copy.persistent && store != null) {
if (copy.uow == null) {
- copy.uow = store.create_uow("route:"+dispatch_queue.getLabel+":offer")
+ copy.uow = store.create_uow
}
if( copy.storeKey == -1L ) {
@@ -376,7 +376,7 @@ abstract class DeliveryProducerRoute(rou
private def release(delivery: Delivery): Unit = {
if (delivery.uow != null) {
- delivery.uow.release("route:"+dispatch_queue.getLabel+":offer")
+ delivery.uow.release
}
if( delivery.message!=null ) {
delivery.message.release
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Thu Feb 21 18:23:16 2013
@@ -108,37 +108,12 @@ trait DelayingStoreSupport extends Store
// Implementation of the StoreBatch interface
//
/////////////////////////////////////////////////////////////////////
- def create_uow(owner:String) = {
- val rc = new DelayableUOW
- rc.owners.add(owner)
- rc
- }
+ def create_uow = new DelayableUOW
class DelayableUOW extends BaseRetained with StoreUOW {
override def toString: String = uow_id.toString
- val owners = scala.collection.mutable.HashSet[String]()
-
- def release(owner: String) {
- this.synchronized {
- if( !owners.remove(owner) ) {
- warn("UOW owner already removed! "+owner)
- }
- }
- super.release()
- }
-
- def retain(owner: String) {
- this.synchronized {
- if( !owners.add(owner) ) {
- warn("UOW owner already added! "+owner)
- }
- }
- owners.add(owner)
- super.retain()
- }
-
class MessageAction {
var msg= 0L
@@ -387,7 +362,7 @@ trait DelayingStoreSupport extends Store
out.println("--- Pending Stores Details ---")
out.println("flush_source suspended: "+flush_source.isSuspended)
pending_stores.valuesIterator.foreach{ action =>
- out.println("uow: %d, state:%s, owners:%s".format(action.uow.uow_id, action.uow.state, action.uow.owners))
+ out.println("uow: %d, state:%s".format(action.uow.uow_id, action.uow.state))
}
writer.toString
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala Thu Feb 21 18:23:16 2013
@@ -86,11 +86,11 @@ case class PersistentLongCounter(name:St
def update(value: Long)(on_complete: =>Unit) {
val s = store
if (s!=null) {
- val uow = s.create_uow(toString)
+ val uow = s.create_uow
uow.put(key, encode(value))
uow.complete_asap()
uow.on_complete(on_complete)
- uow.release(toString)
+ uow.release
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Thu Feb 21 18:23:16 2013
@@ -123,7 +123,7 @@ trait Store extends ServiceTrait {
* Creates a store uow which is used to perform persistent
* operations as unit of work.
*/
- def create_uow(owner:String):StoreUOW
+ def create_uow:StoreUOW
/**
* Removes all previously stored data.
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala Thu Feb 21 18:23:16 2013
@@ -35,10 +35,7 @@ import org.fusesource.hawtbuf.Buffer
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait StoreUOW {
-
- def release(owner:String);
- def retain(owner:String);
+trait StoreUOW extends Retained {
/**
* Stores a message. Messages a reference counted, so make sure you also
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Thu Feb 21 18:23:16 2013
@@ -110,7 +110,7 @@ abstract class StoreFunSuiteSupport exte
}
def populate(queue_key:Long, messages:List[String], first_seq:Long=1) = {
- var batch = store.create_uow("")
+ var batch = store.create_uow
var msg_keys = ListBuffer[(Long, AtomicReference[Object], Long)]()
var next_seq = first_seq
@@ -125,7 +125,7 @@ abstract class StoreFunSuiteSupport exte
val task = tracker.task("uow complete")
batch.on_complete(task.run)
- batch.release("")
+ batch.release
msg_keys.foreach { msgKey =>
store.flush_message(msgKey._1) {}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala Thu Feb 21 18:23:16 2013
@@ -119,7 +119,7 @@ abstract class StoreTests extends StoreF
test("batch completes after a delay") {x}
def x = {
val A = add_queue("A")
- var batch = store.create_uow("")
+ var batch = store.create_uow
val m1 = add_message(batch, "message 1")
batch.enqueue(entry(A, 1, m1))
@@ -127,7 +127,7 @@ abstract class StoreTests extends StoreF
val tracker = new TaskTracker("unknown", 0)
val task = tracker.task("uow complete")
batch.on_complete(task.run)
- batch.release("")
+ batch.release
expect(false) {
tracker.await(3, TimeUnit.SECONDS)
@@ -139,7 +139,7 @@ abstract class StoreTests extends StoreF
test("flush cancels the delay") {
val A = add_queue("A")
- var batch = store.create_uow("")
+ var batch = store.create_uow
val m1 = add_message(batch, "message 1")
batch.enqueue(entry(A, 1, m1))
@@ -147,7 +147,7 @@ abstract class StoreTests extends StoreF
val tracker = new TaskTracker("unknown", 0)
val task = tracker.task("uow complete")
batch.on_complete(task.run)
- batch.release("")
+ batch.release
store.flush_message(m1._1) {}
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala Thu Feb 21 18:23:16 2013
@@ -45,7 +45,7 @@ class UowHaveLocatorsTest extends StoreF
test("APLO-201: Persistent Store: UOW with message locator and no message (previously flushed)"){
val queue = add_queue("A")
- val batch = store.create_uow("")
+ val batch = store.create_uow
val m1 = add_message(batch, "Hello!")
val queueEntryRecord: QueueEntryRecord = entry(queue, 1, m1)
batch.enqueue(queueEntryRecord)
@@ -53,7 +53,7 @@ class UowHaveLocatorsTest extends StoreF
var tracker = new TaskTracker("uknown", 0)
var task = tracker.task("uow complete")
batch.on_complete(task.run)
- batch.release("")
+ batch.release
assert(queueEntryRecord.message_locator.get() == null)
@@ -62,13 +62,13 @@ class UowHaveLocatorsTest extends StoreF
}
assert(queueEntryRecord.message_locator.get() != null)
- val batch2 = store.create_uow("")
+ val batch2 = store.create_uow
batch2.enqueue(queueEntryRecord)
tracker = new TaskTracker("uknown", 0)
task = tracker.task("uow complete")
batch2.on_complete(task.run)
- batch2.release("")
+ batch2.release
expect(true) {
tracker.await(2, TimeUnit.SECONDS)
Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala Thu Feb 21 18:23:16 2013
@@ -466,7 +466,7 @@ object MqttSessionManager {
case class StoreStrategy(store:Store, client_id:UTF8Buffer) extends StorageStrategy {
val session_key = new UTF8Buffer("mqtt:"+client_id)
def update(cb: =>Unit) = {
- val uow = store.create_uow(toString)
+ val uow = store.create_uow
val session_pb = new SessionPB.Bean
session_pb.setClientId(client_id)
received_message_ids.foreach(session_pb.addReceivedMessageIds(_))
@@ -485,11 +485,11 @@ object MqttSessionManager {
cb
}
}
- uow.release(toString)
+ uow.release
}
def destroy(cb: =>Unit) {
- val uow = store.create_uow(toString)
+ val uow = store.create_uow
uow.put(session_key, null)
val current = getCurrentQueue
uow.on_complete {
@@ -498,7 +498,7 @@ object MqttSessionManager {
cb
}
}
- uow.release(toString)
+ uow.release
}
def create(store:Store, client_id:UTF8Buffer) = {
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Feb 21 18:23:16 2013
@@ -653,7 +653,7 @@ class OpenwireProtocolHandler extends Pr
val route = OpenwireDeliveryProducerRoute(addresses)
if( uow!=null ) {
- uow.retain(toString)
+ uow.retain
}
// don't process frames until producer is connected...
suspend_read("connecting producer route")
@@ -671,7 +671,7 @@ class OpenwireProtocolHandler extends Pr
}
}
if( uow!=null ) {
- uow.release(toString)
+ uow.release
}
}
}
@@ -1243,7 +1243,7 @@ class OpenwireProtocolHandler extends Pr
def commit(onComplete: => Unit) = {
val uow = if( host.store!=null ) {
- host.store.create_uow(toString)
+ host.store.create_uow
} else {
null
}
@@ -1254,7 +1254,7 @@ class OpenwireProtocolHandler extends Pr
if( uow!=null ) {
uow.on_complete(dispatchQueue{ onComplete })
- uow.release(toString)
+ uow.release
} else {
onComplete
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Feb 21 18:23:16 2013
@@ -1231,7 +1231,7 @@ class StompProtocolHandler extends Proto
}
}
- var producer_routes = new LRUCache[AsciiBuffer, StompProducerRoute](10) {
+ var producer_routes = new LRUCache[AsciiBuffer, StompProducerRoute](1) {
override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
host.dispatch_queue {
host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
@@ -1249,7 +1249,7 @@ class StompProtocolHandler extends Proto
val route = new StompProducerRoute(trimmed_dest) // don't process frames until producer is connected...
suspend_read("Connecting to destination")
if( uow !=null ) {
- uow.retain(toString+":connecting")
+ uow.retain
}
host.dispatch_queue {
val rc = host.router.connect(route.addresses, route, security_context)
@@ -1266,7 +1266,7 @@ class StompProtocolHandler extends Proto
}
}
if( uow !=null ) {
- uow.release(toString+":connecting")
+ uow.release
}
}
}
@@ -1703,14 +1703,14 @@ class StompProtocolHandler extends Proto
def commit(on_complete: => Unit) = {
if( host.store!=null ) {
- val uow = host.store.create_uow(toString+":commit")
+ val uow = host.store.create_uow
// println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
uow.on_complete {
// println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
on_complete
}
queue.foreach{ _._1(uow) }
- uow.release(toString+":commit")
+ uow.release
} else {
queue.foreach{ _._1(null) }
on_complete
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml Thu Feb 21 18:23:16 2013
@@ -41,7 +41,7 @@
<leveldb_store directory="${testdatadir}"/>
</virtual_host>
- <!--<web_admin bind="http://0.0.0.0:61680"/>-->
+ <web_admin bind="http://0.0.0.0:61680"/>
<connector id="tcp" bind="tcp://0.0.0.0:0"/>
<connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
<connector id="stomp-udp" bind="udp://0.0.0.0:0" protocol="stomp-udp"/>
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js Thu Feb 21 18:23:16 2013
@@ -229,9 +229,24 @@ App.LoginController = Em.Controller.crea
var was_logged_in = this.get('is_logged_in')
var kind = this.get('kind')
App.ajax("GET", "/session/whoami", function(data) {
- App.LoginController.set('content', data);
- if( App.LoginController.get('is_logged_in') ) {
- App.refresh();
+ if( data.length==0 ) {
+ App.ajax("GET", "/broker", function(broker) {
+ data.push({kind:"UserPrincipal", name:"<anonymous>"});
+ App.LoginController.set('content', data);
+ if( App.LoginController.get('is_logged_in') ) {
+ App.refresh();
+ }
+ }, function(error){
+ App.LoginController.set('content', data);
+ if( App.LoginController.get('is_logged_in') ) {
+ App.refresh();
+ }
+ });
+ } else {
+ App.LoginController.set('content', data);
+ if( App.LoginController.get('is_logged_in') ) {
+ App.refresh();
+ }
}
},
function(xhr, status, thrown) {
@@ -778,7 +793,7 @@ App.ConfigurationController = Ember.Cont
App.ConfigurationController.set("files", json);
},
function(xhr, status, thrown) {
- if( xhr.status == 401 ) {
+ if( xhr.status == 401 || xhr.status == 404 ) {
App.ConfigurationController.set("files", null);
} else {
App.default_error_handler(xhr, status, thrown)