You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/26 17:15:16 UTC
svn commit: r1390551 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
Author: chirino
Date: Wed Sep 26 15:15:15 2012
New Revision: 1390551
URL: http://svn.apache.org/viewvc?rev=1390551&view=rev
Log:
Errors could occur in the store due to an invalid double dequeue from the store.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1390551&r1=1390550&r2=1390551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Wed Sep 26 15:15:15 2012
@@ -197,23 +197,26 @@ class QueueEntry(val queue:Queue, val se
def load(space:MemorySpace) = state.swap_in(space)
def remove = state.remove
- def dequeue(uow: StoreUOW) = {
+ var queued = true
- if (messageKey != -1) {
- val storeBatch = if( uow == null ) {
- queue.virtual_host.store.create_uow
- } else {
- uow
- }
- storeBatch.dequeue(toQueueEntryRecord)
- if( uow == null ) {
- storeBatch.release
+ def dequeue(uow: StoreUOW) = {
+ if ( queued ) {
+ if (messageKey != -1) {
+ val storeBatch = if( uow == null ) {
+ queue.virtual_host.store.create_uow
+ } else {
+ uow
+ }
+ storeBatch.dequeue(toQueueEntryRecord)
+ if( uow == null ) {
+ storeBatch.release
+ }
}
+ queue.dequeue_item_counter += 1
+ queue.dequeue_size_counter += size
+ queue.dequeue_ts = queue.now
+ queued = false
}
-
- queue.dequeue_item_counter += 1
- queue.dequeue_size_counter += size
- queue.dequeue_ts = queue.now
}
@@ -668,11 +671,6 @@ class QueueEntry(val queue:Queue, val se
// We can drop after dispatch in some cases.
if( queue.is_topic_queue && parked.isEmpty && getPrevious.is_head ) {
- if (messageKey != -1) {
- val storeBatch = queue.virtual_host.store.create_uow
- storeBatch.dequeue(toQueueEntryRecord)
- storeBatch.release
- }
dequeue(null)
remove
}
Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala?rev=1390551&r1=1390550&r2=1390551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala Wed Sep 26 15:15:15 2012
@@ -53,7 +53,7 @@ class MqttLoadTest extends MqttTestSuppo
}
}
-// This test is failing with: java.lang.AssertionError: assertion failed: locator_based.unary_$bang.$bar$bar(uow.have_locators)
-//class MqttLoadLevelDBTest extends MqttLoadTest {
-// override def broker_config_uri = "xml:classpath:apollo-mqtt-leveldb.xml"
-//}
+//This test is failing with: java.lang.AssertionError: assertion failed: locator_based.unary_$bang.$bar$bar(uow.have_locators)
+class MqttLoadLevelDBTest extends MqttLoadTest {
+ override def broker_config_uri = "xml:classpath:apollo-mqtt-leveldb.xml"
+}