You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/06/03 21:50:52 UTC

svn commit: r1489130 - in /activemq/activemq-apollo/trunk: apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activem...

Author: chirino
Date: Mon Jun  3 19:50:52 2013
New Revision: 1489130

URL: http://svn.apache.org/r1489130
Log:
Port a possible fix from ActiveMQ

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala?rev=1489130&r1=1489129&r2=1489130&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala Mon Jun  3 19:50:52 2013
@@ -233,6 +233,21 @@ case class RecordLog(directory: File, lo
 
     def check_read_flush(end_offset: Long) = {}
 
+    private def read_buffer(data:Buffer, offset:Long) = {
+      var bb = data.toByteBuffer
+      var position = offset
+      while( bb.hasRemaining  ) {
+        var count = channel.read(bb, position)
+        if( count == 0 ) {
+          throw new IOException("zero read at file '%s' offset: %d".format(file, position))
+        }
+        if( count < 0 ) {
+          throw new EOFException("File '%s' offset: %d".format(file, position))
+        }
+        position += count
+      }
+    }
+
     def read(record_position: Long, length: Int) = {
       val offset = record_position - position
       assert(offset >= 0)
@@ -242,9 +257,7 @@ case class RecordLog(directory: File, lo
       if (verify_checksums) {
 
         val record = new Buffer(LOG_HEADER_SIZE + length)
-        if (channel.read(record.toByteBuffer, offset) != record.length) {
-          throw new IOException("short record at position: " + record_position + " in file: " + file + ", offset: " + offset)
-        }
+        read_buffer(record, offset)
 
         def record_is_not_changing = {
           using(open) {
@@ -282,9 +295,7 @@ case class RecordLog(directory: File, lo
         data
       } else {
         val record = new Buffer(length)
-        if (channel.read(record.toByteBuffer, offset+LOG_HEADER_SIZE) != record.length) {
-          throw new IOException("short record at position: " + record_position + " in file: " + file + ", offset: " + offset)
-        }
+        read_buffer(record, offset+LOG_HEADER_SIZE)
         record
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1489130&r1=1489129&r2=1489130&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Mon Jun  3 19:50:52 2013
@@ -1003,7 +1003,7 @@ class OpenwireProtocolHandler extends Pr
       producer.dispatch_queue.assertExecuting()
       retain
 
-      override def toString = "openwire consumer session:"+info.getConsumerId+", remote address: "+security_context.remote_address+", downstream: "+downstream
+      override def toString = "openwire consumer session:"+info.getConsumerId+", connection: "+OpenwireProtocolHandler.this.connection.id+", "+downstream
 
       val downstream = session_manager.open(producer.dispatch_queue)
       var closed = false

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1489130&r1=1489129&r2=1489130&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Jun  3 19:50:52 2013
@@ -299,7 +299,6 @@ class StompProtocolHandler extends Proto
       def close = { closed  = true}
 
       def track(session:Session[Delivery], msgid: AsciiBuffer, size:Int, ack:(DeliveryResult, StoreUOW)=>Unit) = {
-        session_manager.delivered(session, size)
         if( closed ) {
           if( ack!=null ) {
             ack(Undelivered, null)
@@ -350,10 +349,10 @@ class StompProtocolHandler extends Proto
             // register on the connection since 1.0 acks may not include the subscription id
             connection_ack_handlers += ( msgid -> this )
           }
-          if( initial_credit_window.auto_credit ) {
+          if( initial_credit_window.auto_credit) {
             consumer_acks += msgid -> new TrackedAck(Some((session, size)), ack )
           } else {
-            session_manager.delivered(session, size)
+//            session_manager.delivered(session, size)
           }
         }
       }
@@ -379,7 +378,7 @@ class StompProtocolHandler extends Proto
 
           for( (id, delivery) <- acked ) {
             for( credit <- delivery.credit ) {
-              session_manager.delivered(credit._1, credit._2)
+//              session_manager.delivered(credit._1, credit._2)
               credit_window_source.merge((1, credit._2))
               delivery.credit = None
             }
@@ -459,7 +458,7 @@ class StompProtocolHandler extends Proto
           if( initial_credit_window.auto_credit ) {
             consumer_acks += msgid -> new TrackedAck(Some((session, size)), ack)
           } else {
-            session_manager.delivered(session, size)
+//            session_manager.delivered(session, size)
           }
         }
       }
@@ -469,7 +468,7 @@ class StompProtocolHandler extends Proto
         if( initial_credit_window.auto_credit ) {
           for( delivery <- consumer_acks.get(msgid)) {
             for( credit <- delivery.credit ) {
-              session_manager.delivered(credit._1, credit._2)
+//              session_manager.delivered(credit._1, credit._2)
               credit_window_source.merge((1, credit._2))
               delivery.credit = None
             }
@@ -547,6 +546,7 @@ class StompProtocolHandler extends Proto
           ack_id
         }
 
+        session_manager.delivered(session, delivery.size)
         ack_handler.track(session, ack_id, delivery.size, delivery.ack)
 
         if( subscription_id != None ) {
@@ -603,7 +603,7 @@ class StompProtocolHandler extends Proto
         l += match_from_tail
       }
       if( selector!=null ) {
-        l += match_selector 
+        l += match_selector
       }
       l.toArray
     }
@@ -625,10 +625,10 @@ class StompProtocolHandler extends Proto
       val downstream = session_manager.open(producer.dispatch_queue)
 
       override def toString = {
-        "StompConsumerSession("+
-          "connection to: "+StompProtocolHandler.this.connection.transport.getRemoteAddress+", "
-          "closed: "+closed+", "
-          "downstream: "+downstream
+        "stomp consumer session("+
+          "connection: "+StompProtocolHandler.this.connection.id+", "+
+          "closed: "+closed+", "+
+          downstream+
         ")"
       }
 
@@ -1525,8 +1525,8 @@ class StompProtocolHandler extends Proto
     var exclusive = !browser && get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
     var include_seq = get(headers, INCLUDE_SEQ)
     val from_seq_opt = get(headers, FROM_SEQ)
-    
-    
+
+
     def is_multi_destination = if( addresses.length > 1 ) {
       true
     } else {
@@ -1644,7 +1644,7 @@ class StompProtocolHandler extends Proto
         } else {
           die("The subscription '%s' not found.".format(id))
         }
-        
+
       case Some(consumer)=>
         // consumer gets disposed after all producer stop sending to it...
         consumers -= id