You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/02 17:58:50 UTC

svn commit: r1209582 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala

Author: chirino
Date: Fri Dec  2 16:58:49 2011
New Revision: 1209582

URL: http://svn.apache.org/viewvc?rev=1209582&view=rev
Log:
Fixes APLO-97 : NPE in HeartBeatMonitor when using an initial check delay

Patch provided by Stan Lewis.  Thanks!  Also reduce a bit of redundancy.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala?rev=1209582&r1=1209581&r2=1209582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala Fri Dec  2 16:58:49 2011
@@ -38,29 +38,48 @@ class HeartBeatMonitor() {
   var on_dead = ()=>{}
 
   var session = 0
+  
+  def schedule(interval:Long, func:() => Unit) = {
+    if ( this.session==session ) {
+      transport.getDispatchQueue.after(interval, TimeUnit.MILLISECONDS) {
+        if ( this.session==session ) {
+          func()
+        }
+      }
+    }
+  }
 
   def schedual_check_writes(session:Int):Unit = {
-    val last_write_counter = transport.getProtocolCodec.getWriteCounter()
-    transport.getDispatchQueue.after(write_interval/2, TimeUnit.MILLISECONDS) {
-      if( this.session == session ) {
-        if( last_write_counter==transport.getProtocolCodec.getWriteCounter ) {
+    var func = () => {
+      schedual_check_writes(session)
+    }
+    
+    Option(transport.getProtocolCodec).foreach { codec=>
+      val last_write_counter = codec.getWriteCounter()
+      func = () => {
+        if( last_write_counter==codec.getWriteCounter ) {
           on_keep_alive()
         }
         schedual_check_writes(session)
       }
     }
+    schedule(write_interval/2, func)
   }
 
   def schedual_check_reads(session:Int):Unit = {
-    val last_read_counter = transport.getProtocolCodec.getReadCounter()
-    transport.getDispatchQueue.after(read_interval, TimeUnit.MILLISECONDS) {
-      if( this.session == session ) {
-        if( last_read_counter==transport.getProtocolCodec.getReadCounter ) {
+    var func = () => {
+      schedual_check_reads(session)
+    }
+    Option(transport.getProtocolCodec).foreach { codec=>
+      val last_read_counter = codec.getReadCounter
+      func = () => {
+        if( last_read_counter==codec.getReadCounter ) {
           on_dead()
         }
         schedual_check_reads(session)
       }
     }
+    schedule(read_interval, func)
   }
 
   def start = {