You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/20 00:01:46 UTC

svn commit: r1159793 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: VirtualHost.scala store/PersistentLongCounter.scala

Author: chirino
Date: Fri Aug 19 22:01:45 2011
New Revision: 1159793

URL: http://svn.apache.org/viewvc?rev=1159793&view=rev
Log:
Persist the session counter across restarts so that client session ids are always unique.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1159793&r1=1159792&r2=1159793&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Fri Aug 19 22:01:45 2011
@@ -30,8 +30,8 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.activemq.apollo.util.OptionSupport._
 import org.apache.activemq.apollo.util.path.{Path, PathParser}
 import security.{AclAuthorizer, JaasAuthenticator, Authenticator, Authorizer}
-import org.apache.activemq.apollo.broker.store.{ZeroCopyBufferAllocator, Store, StoreFactory}
 import org.apache.activemq.apollo.dto._
+import store.{PersistentLongCounter, ZeroCopyBufferAllocator, Store, StoreFactory}
 
 /**
  * <p>
@@ -100,7 +100,7 @@ class VirtualHost(val broker: Broker, va
   var store:Store = null
   val queue_id_counter = new LongCounter()
 
-  val session_counter = new AtomicLong(0)
+  val session_counter = new PersistentLongCounter("session_counter")
 
   var authenticator:Authenticator = _
   var authorizer:Authorizer = _
@@ -199,7 +199,21 @@ class VirtualHost(val broker: Broker, va
     }
 
     tracker.callback {
+
       val tracker = new LoggingTracker("virtual host startup", console_log)
+
+      // The default host handles persisting the connection id counter.
+      if(store!=null) {
+        if(session_counter.get == 0) {
+          val task = tracker.task("load session counter")
+          session_counter.init(store) {
+            task.run()
+          }
+        } else {
+          session_counter.connect(store)
+        }
+      }
+
       tracker.start(router)
       tracker.callback(on_completed)
     }
@@ -212,7 +226,11 @@ class VirtualHost(val broker: Broker, va
     val tracker = new LoggingTracker("virtual host shutdown", console_log)
     tracker.stop(router);
     if( store!=null ) {
-      tracker.stop(store);
+      val task = tracker.task("store session counter")
+      session_counter.disconnect{
+        tracker.stop(store);
+        task.run()
+      }
     }
     tracker.callback(on_completed)
   }

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala?rev=1159793&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala Fri Aug 19 22:01:45 2011
@@ -0,0 +1,80 @@
+package org.apache.activemq.apollo.broker.store
+
+import java.util.concurrent.atomic.AtomicLong
+import org.fusesource.hawtbuf.{DataByteArrayInputStream, AbstractVarIntSupport, DataByteArrayOutputStream, Buffer}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class PersistentLongCounter(name:String, increment:Long=1000) {
+
+  def encode(a1:Long):Buffer = {
+    val out = new DataByteArrayOutputStream(
+      AbstractVarIntSupport.computeVarLongSize(a1)
+    )
+    out.writeVarLong(a1)
+    out.toBuffer
+  }
+
+  def decode(bytes:Buffer):Long = {
+    val in = new DataByteArrayInputStream(bytes)
+    in.readVarLong()
+  }
+
+  @transient
+  var store:Store = _
+  val counter = new AtomicLong(0)
+  val limit = new AtomicLong(0)
+
+  val key = Buffer.utf8("long-counter:"+name);
+
+  def init(store:Store)(on_complete: =>Unit):Unit = {
+    connect(store)
+    store.get(key) { value =>
+      val c = value.map(decode(_)).getOrElse(0L)
+      counter.set(c)
+      limit.set(c+increment)
+      update(c+increment)(on_complete)
+    }
+  }
+
+  def disconnect(on_complete: =>Unit):Unit = {
+    update(get)(on_complete)
+    this.store = null
+  }
+
+  def connect(store:Store) = {
+    this.store = store
+  }
+
+  def get = counter.get
+
+  def incrementAndGet() = {
+    val rc = counter.incrementAndGet()
+    var done = false
+    while( !done ) {
+      val l = limit.get
+      if ( rc < l ) {
+        done = true
+      } else if ( limit.compareAndSet(l, l+increment) ) {
+        update(l + increment)()
+      }
+    }
+    rc
+  }
+
+  def update(value: Long)(on_complete: =>Unit) {
+    val s = store
+    if (s!=null) {
+      val uow = s.create_uow()
+      uow.put(key, encode(value))
+      uow.complete_asap()
+      uow.on_complete(on_complete)
+      uow.release()
+    }
+  }
+
+}
\ No newline at end of file