You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/06/03 21:50:52 UTC
svn commit: r1489130 - in /activemq/activemq-apollo/trunk:
apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/
apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/
apollo-stomp/src/main/scala/org/apache/activem...
Author: chirino
Date: Mon Jun 3 19:50:52 2013
New Revision: 1489130
URL: http://svn.apache.org/r1489130
Log:
Port a possible fix from ActiveMQ
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala?rev=1489130&r1=1489129&r2=1489130&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala Mon Jun 3 19:50:52 2013
@@ -233,6 +233,21 @@ case class RecordLog(directory: File, lo
def check_read_flush(end_offset: Long) = {}
+ private def read_buffer(data:Buffer, offset:Long) = {
+ var bb = data.toByteBuffer
+ var position = offset
+ while( bb.hasRemaining ) {
+ var count = channel.read(bb, position)
+ if( count == 0 ) {
+ throw new IOException("zero read at file '%s' offset: %d".format(file, position))
+ }
+ if( count < 0 ) {
+ throw new EOFException("File '%s' offset: %d".format(file, position))
+ }
+ position += count
+ }
+ }
+
def read(record_position: Long, length: Int) = {
val offset = record_position - position
assert(offset >= 0)
@@ -242,9 +257,7 @@ case class RecordLog(directory: File, lo
if (verify_checksums) {
val record = new Buffer(LOG_HEADER_SIZE + length)
- if (channel.read(record.toByteBuffer, offset) != record.length) {
- throw new IOException("short record at position: " + record_position + " in file: " + file + ", offset: " + offset)
- }
+ read_buffer(record, offset)
def record_is_not_changing = {
using(open) {
@@ -282,9 +295,7 @@ case class RecordLog(directory: File, lo
data
} else {
val record = new Buffer(length)
- if (channel.read(record.toByteBuffer, offset+LOG_HEADER_SIZE) != record.length) {
- throw new IOException("short record at position: " + record_position + " in file: " + file + ", offset: " + offset)
- }
+ read_buffer(record, offset+LOG_HEADER_SIZE)
record
}
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1489130&r1=1489129&r2=1489130&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Mon Jun 3 19:50:52 2013
@@ -1003,7 +1003,7 @@ class OpenwireProtocolHandler extends Pr
producer.dispatch_queue.assertExecuting()
retain
- override def toString = "openwire consumer session:"+info.getConsumerId+", remote address: "+security_context.remote_address+", downstream: "+downstream
+ override def toString = "openwire consumer session:"+info.getConsumerId+", connection: "+OpenwireProtocolHandler.this.connection.id+", "+downstream
val downstream = session_manager.open(producer.dispatch_queue)
var closed = false
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1489130&r1=1489129&r2=1489130&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Jun 3 19:50:52 2013
@@ -299,7 +299,6 @@ class StompProtocolHandler extends Proto
def close = { closed = true}
def track(session:Session[Delivery], msgid: AsciiBuffer, size:Int, ack:(DeliveryResult, StoreUOW)=>Unit) = {
- session_manager.delivered(session, size)
if( closed ) {
if( ack!=null ) {
ack(Undelivered, null)
@@ -350,10 +349,10 @@ class StompProtocolHandler extends Proto
// register on the connection since 1.0 acks may not include the subscription id
connection_ack_handlers += ( msgid -> this )
}
- if( initial_credit_window.auto_credit ) {
+ if( initial_credit_window.auto_credit) {
consumer_acks += msgid -> new TrackedAck(Some((session, size)), ack )
} else {
- session_manager.delivered(session, size)
+// session_manager.delivered(session, size)
}
}
}
@@ -379,7 +378,7 @@ class StompProtocolHandler extends Proto
for( (id, delivery) <- acked ) {
for( credit <- delivery.credit ) {
- session_manager.delivered(credit._1, credit._2)
+// session_manager.delivered(credit._1, credit._2)
credit_window_source.merge((1, credit._2))
delivery.credit = None
}
@@ -459,7 +458,7 @@ class StompProtocolHandler extends Proto
if( initial_credit_window.auto_credit ) {
consumer_acks += msgid -> new TrackedAck(Some((session, size)), ack)
} else {
- session_manager.delivered(session, size)
+// session_manager.delivered(session, size)
}
}
}
@@ -469,7 +468,7 @@ class StompProtocolHandler extends Proto
if( initial_credit_window.auto_credit ) {
for( delivery <- consumer_acks.get(msgid)) {
for( credit <- delivery.credit ) {
- session_manager.delivered(credit._1, credit._2)
+// session_manager.delivered(credit._1, credit._2)
credit_window_source.merge((1, credit._2))
delivery.credit = None
}
@@ -547,6 +546,7 @@ class StompProtocolHandler extends Proto
ack_id
}
+ session_manager.delivered(session, delivery.size)
ack_handler.track(session, ack_id, delivery.size, delivery.ack)
if( subscription_id != None ) {
@@ -603,7 +603,7 @@ class StompProtocolHandler extends Proto
l += match_from_tail
}
if( selector!=null ) {
- l += match_selector
+ l += match_selector
}
l.toArray
}
@@ -625,10 +625,10 @@ class StompProtocolHandler extends Proto
val downstream = session_manager.open(producer.dispatch_queue)
override def toString = {
- "StompConsumerSession("+
- "connection to: "+StompProtocolHandler.this.connection.transport.getRemoteAddress+", "
- "closed: "+closed+", "
- "downstream: "+downstream
+ "stomp consumer session("+
+ "connection: "+StompProtocolHandler.this.connection.id+", "+
+ "closed: "+closed+", "+
+ downstream+
")"
}
@@ -1525,8 +1525,8 @@ class StompProtocolHandler extends Proto
var exclusive = !browser && get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
var include_seq = get(headers, INCLUDE_SEQ)
val from_seq_opt = get(headers, FROM_SEQ)
-
-
+
+
def is_multi_destination = if( addresses.length > 1 ) {
true
} else {
@@ -1644,7 +1644,7 @@ class StompProtocolHandler extends Proto
} else {
die("The subscription '%s' not found.".format(id))
}
-
+
case Some(consumer)=>
// consumer gets disposed after all producer stop sending to it...
consumers -= id