You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/26 17:15:16 UTC

svn commit: r1390551 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala

Author: chirino
Date: Wed Sep 26 15:15:15 2012
New Revision: 1390551

URL: http://svn.apache.org/viewvc?rev=1390551&view=rev
Log:
Errors could occur in the store due to an invalid double dequeue from the store.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1390551&r1=1390550&r2=1390551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Wed Sep 26 15:15:15 2012
@@ -197,23 +197,26 @@ class QueueEntry(val queue:Queue, val se
   def load(space:MemorySpace) = state.swap_in(space)
   def remove = state.remove
 
-  def dequeue(uow: StoreUOW) = {
+  var queued = true
 
-    if (messageKey != -1) {
-      val storeBatch = if( uow == null ) {
-        queue.virtual_host.store.create_uow
-      } else {
-        uow
-      }
-      storeBatch.dequeue(toQueueEntryRecord)
-      if( uow == null ) {
-        storeBatch.release
+  def dequeue(uow: StoreUOW) = {
+    if ( queued ) {
+      if (messageKey != -1) {
+        val storeBatch = if( uow == null ) {
+          queue.virtual_host.store.create_uow
+        } else {
+          uow
+        }
+        storeBatch.dequeue(toQueueEntryRecord)
+        if( uow == null ) {
+          storeBatch.release
+        }
       }
+      queue.dequeue_item_counter += 1
+      queue.dequeue_size_counter += size
+      queue.dequeue_ts = queue.now
+      queued = false
     }
-
-    queue.dequeue_item_counter += 1
-    queue.dequeue_size_counter += size
-    queue.dequeue_ts = queue.now
   }
 
 
@@ -668,11 +671,6 @@ class QueueEntry(val queue:Queue, val se
 
         // We can drop after dispatch in some cases.
         if( queue.is_topic_queue  && parked.isEmpty && getPrevious.is_head ) {
-          if (messageKey != -1) {
-            val storeBatch = queue.virtual_host.store.create_uow
-            storeBatch.dequeue(toQueueEntryRecord)
-            storeBatch.release
-          }
           dequeue(null)
           remove
         }

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala?rev=1390551&r1=1390550&r2=1390551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala Wed Sep 26 15:15:15 2012
@@ -53,7 +53,7 @@ class MqttLoadTest extends MqttTestSuppo
   }
 }
 
-// This test is failing with: java.lang.AssertionError: assertion failed: locator_based.unary_$bang.$bar$bar(uow.have_locators)
-//class MqttLoadLevelDBTest extends MqttLoadTest {
-//  override def broker_config_uri = "xml:classpath:apollo-mqtt-leveldb.xml"
-//}
+//This test is failing with: java.lang.AssertionError: assertion failed: locator_based.unary_$bang.$bar$bar(uow.have_locators)
+class MqttLoadLevelDBTest extends MqttLoadTest {
+  override def broker_config_uri = "xml:classpath:apollo-mqtt-leveldb.xml"
+}