You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/20 00:01:46 UTC
svn commit: r1159793 - in
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker:
VirtualHost.scala store/PersistentLongCounter.scala
Author: chirino
Date: Fri Aug 19 22:01:45 2011
New Revision: 1159793
URL: http://svn.apache.org/viewvc?rev=1159793&view=rev
Log:
Persist the session counter across restarts so that client session ids are always unique.
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1159793&r1=1159792&r2=1159793&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Fri Aug 19 22:01:45 2011
@@ -30,8 +30,8 @@ import java.util.concurrent.atomic.Atomi
import org.apache.activemq.apollo.util.OptionSupport._
import org.apache.activemq.apollo.util.path.{Path, PathParser}
import security.{AclAuthorizer, JaasAuthenticator, Authenticator, Authorizer}
-import org.apache.activemq.apollo.broker.store.{ZeroCopyBufferAllocator, Store, StoreFactory}
import org.apache.activemq.apollo.dto._
+import store.{PersistentLongCounter, ZeroCopyBufferAllocator, Store, StoreFactory}
/**
* <p>
@@ -100,7 +100,7 @@ class VirtualHost(val broker: Broker, va
var store:Store = null
val queue_id_counter = new LongCounter()
- val session_counter = new AtomicLong(0)
+ val session_counter = new PersistentLongCounter("session_counter")
var authenticator:Authenticator = _
var authorizer:Authorizer = _
@@ -199,7 +199,21 @@ class VirtualHost(val broker: Broker, va
}
tracker.callback {
+
val tracker = new LoggingTracker("virtual host startup", console_log)
+
+ // The default host handles persisting the connection id counter.
+ if(store!=null) {
+ if(session_counter.get == 0) {
+ val task = tracker.task("load session counter")
+ session_counter.init(store) {
+ task.run()
+ }
+ } else {
+ session_counter.connect(store)
+ }
+ }
+
tracker.start(router)
tracker.callback(on_completed)
}
@@ -212,7 +226,11 @@ class VirtualHost(val broker: Broker, va
val tracker = new LoggingTracker("virtual host shutdown", console_log)
tracker.stop(router);
if( store!=null ) {
- tracker.stop(store);
+ val task = tracker.task("store session counter")
+ session_counter.disconnect{
+ tracker.stop(store);
+ task.run()
+ }
}
tracker.callback(on_completed)
}
Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala?rev=1159793&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala Fri Aug 19 22:01:45 2011
@@ -0,0 +1,80 @@
+package org.apache.activemq.apollo.broker.store
+
+import java.util.concurrent.atomic.AtomicLong
+import org.fusesource.hawtbuf.{DataByteArrayInputStream, AbstractVarIntSupport, DataByteArrayOutputStream, Buffer}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class PersistentLongCounter(name:String, increment:Long=1000) {
+
+ def encode(a1:Long):Buffer = {
+ val out = new DataByteArrayOutputStream(
+ AbstractVarIntSupport.computeVarLongSize(a1)
+ )
+ out.writeVarLong(a1)
+ out.toBuffer
+ }
+
+ def decode(bytes:Buffer):Long = {
+ val in = new DataByteArrayInputStream(bytes)
+ in.readVarLong()
+ }
+
+ @transient
+ var store:Store = _
+ val counter = new AtomicLong(0)
+ val limit = new AtomicLong(0)
+
+ val key = Buffer.utf8("long-counter:"+name);
+
+ def init(store:Store)(on_complete: =>Unit):Unit = {
+ connect(store)
+ store.get(key) { value =>
+ val c = value.map(decode(_)).getOrElse(0L)
+ counter.set(c)
+ limit.set(c+increment)
+ update(c+increment)(on_complete)
+ }
+ }
+
+ def disconnect(on_complete: =>Unit):Unit = {
+ update(get)(on_complete)
+ this.store = null
+ }
+
+ def connect(store:Store) = {
+ this.store = store
+ }
+
+ def get = counter.get
+
+ def incrementAndGet() = {
+ val rc = counter.incrementAndGet()
+ var done = false
+ while( !done ) {
+ val l = limit.get
+ if ( rc < l ) {
+ done = true
+ } else if ( limit.compareAndSet(l, l+increment) ) {
+ update(l + increment)()
+ }
+ }
+ rc
+ }
+
+ def update(value: Long)(on_complete: =>Unit) {
+ val s = store
+ if (s!=null) {
+ val uow = s.create_uow()
+ uow.put(key, encode(value))
+ uow.complete_asap()
+ uow.on_complete(on_complete)
+ uow.release()
+ }
+ }
+
+}
\ No newline at end of file