You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/02 17:58:50 UTC
svn commit: r1209582 -
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala
Author: chirino
Date: Fri Dec 2 16:58:49 2011
New Revision: 1209582
URL: http://svn.apache.org/viewvc?rev=1209582&view=rev
Log:
Fixes APLO-97 : NPE in HeartBeatMonitor when using an initial check delay
Patch provided by Stan Lewis. Thanks! Also reduce a bit of redundancy.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala?rev=1209582&r1=1209581&r2=1209582&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala Fri Dec 2 16:58:49 2011
@@ -38,29 +38,48 @@ class HeartBeatMonitor() {
var on_dead = ()=>{}
var session = 0
+
+ def schedule(interval:Long, func:() => Unit) = {
+ if ( this.session==session ) {
+ transport.getDispatchQueue.after(interval, TimeUnit.MILLISECONDS) {
+ if ( this.session==session ) {
+ func()
+ }
+ }
+ }
+ }
def schedual_check_writes(session:Int):Unit = {
- val last_write_counter = transport.getProtocolCodec.getWriteCounter()
- transport.getDispatchQueue.after(write_interval/2, TimeUnit.MILLISECONDS) {
- if( this.session == session ) {
- if( last_write_counter==transport.getProtocolCodec.getWriteCounter ) {
+ var func = () => {
+ schedual_check_writes(session)
+ }
+
+ Option(transport.getProtocolCodec).foreach { codec=>
+ val last_write_counter = codec.getWriteCounter()
+ func = () => {
+ if( last_write_counter==codec.getWriteCounter ) {
on_keep_alive()
}
schedual_check_writes(session)
}
}
+ schedule(write_interval/2, func)
}
def schedual_check_reads(session:Int):Unit = {
- val last_read_counter = transport.getProtocolCodec.getReadCounter()
- transport.getDispatchQueue.after(read_interval, TimeUnit.MILLISECONDS) {
- if( this.session == session ) {
- if( last_read_counter==transport.getProtocolCodec.getReadCounter ) {
+ var func = () => {
+ schedual_check_reads(session)
+ }
+ Option(transport.getProtocolCodec).foreach { codec=>
+ val last_read_counter = codec.getReadCounter
+ func = () => {
+ if( last_read_counter==codec.getReadCounter ) {
on_dead()
}
schedual_check_reads(session)
}
}
+ schedule(read_interval, func)
}
def start = {