You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/20 21:51:43 UTC

svn commit: r1363917 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-broker/src/test/scala/org/apache/activemq/apo...

Author: chirino
Date: Fri Jul 20 19:51:42 2012
New Revision: 1363917

URL: http://svn.apache.org/viewvc?rev=1363917&view=rev
Log:
Fix for APLO-201: Warning on reaching nack limit(DLQ): java.lang.AssertionError: assertion failed: uow.have_locators

These are Christian Posta's patches.  Many thanks!

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala Fri Jul 20 19:51:42 2012
@@ -17,12 +17,12 @@
 package org.apache.activemq.apollo.broker.store.bdb
 
 import dto.BDBStoreDTO
-import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport}
+import org.apache.activemq.apollo.broker.store.{StoreTests, Store, StoreFunSuiteSupport}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class BDBStoreTest extends StoreFunSuiteSupport {
+class BDBStoreTest extends StoreTests {
 
   def create_store(flushDelay:Long):Store = {
     val rc = new BDBStore({

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Fri Jul 20 19:51:42 2012
@@ -252,16 +252,28 @@ trait DelayingStoreSupport extends Store
     def have_locators:Boolean = {
       actions.values.foreach{ a =>
         // There must either be a dequeue or a message record for a enqueue request.
-        if( !a.enqueues.isEmpty && ( a.message_record==null && a.dequeues.isEmpty ) ) {
-          return false 
-        }
-        if( locator_based && a.message_record==null && !a.dequeues.isEmpty ) {
-          a.dequeues.foreach { d =>
-            if ( d.message_locator.get() == null ) {
-              return false
+        // if not, then there should be a message locator
+
+        if( locator_based && a.message_record==null) {
+          if(!a.dequeues.isEmpty ){
+            a.dequeues.foreach { d =>
+              if ( d.message_locator.get() == null ) {
+                return false
+              }
+            }
+          }
+          else if (!a.enqueues.isEmpty){
+            a.enqueues.foreach { e =>
+              if ( e.message_locator.get() == null ) {
+                return false
+              }
             }
           }
         }
+        else if( !a.enqueues.isEmpty && ( a.message_record==null && a.dequeues.isEmpty ) ) {
+          return false
+        }
+
       }
       true  
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Fri Jul 20 19:51:42 2012
@@ -37,6 +37,7 @@ abstract class StoreFunSuiteSupport exte
   var store:Store = null
 
   def create_store(flushDelay:Long):Store
+  protected def get_flush_delay(): Long = 5*1000
 
   /**
    * Handy helper to call an async method on the store and wait for
@@ -49,7 +50,7 @@ abstract class StoreFunSuiteSupport exte
   override protected def beforeAll() = {
     super.beforeAll()
     data_directory.recursive_delete
-    store = create_store(5*1000)
+    store = create_store(get_flush_delay())
     val tracker = new LoggingTracker("store startup")
     tracker.start(store)
     tracker.await
@@ -133,131 +134,4 @@ abstract class StoreFunSuiteSupport exte
     msg_keys
   }
 
-  test("add and list queues") {
-    val A = add_queue("A")
-    val B = add_queue("B")
-    val C = add_queue("C")
-
-    val seq:Seq[Long] = List(A,B,C).toSeq
-    expectCB(seq) { cb=>
-      store.list_queues(cb)
-    }
-  }
-
-  test("export and import") {
-    val A = add_queue("A")
-    val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
-
-    val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) )
-    expect(ascii("message 1").buffer) {
-      rc.get.buffer
-    }
-
-    val file = test_data_dir / "export.tgz"
-    file.getParentFile.mkdirs()
-    using( new BufferedOutputStream(new FileOutputStream(file))) { os =>
-      // Export the data...
-      expect(None) {
-        sync_cb[Option[String]] { cb =>
-          store.export_data(os, cb)
-        }
-      }
-    }
-
-    // purge the data..
-    purge
-
-    // There should ne no queues..
-    expectCB(Seq[Long]()) { cb=>
-      store.list_queues(cb)
-    }
-
-    // Import the data..
-    using(new BufferedInputStream(new FileInputStream(file))) { is =>
-      expect(None) {
-        sync_cb[Option[String]] { cb =>
-          store.import_data(is, cb)
-        }
-      }
-    }
-
-    // The data should be there now again..
-    val queues:Seq[Long] = sync_cb(store.list_queues(_))
-    expect(1)(queues.size)
-    val entries:Seq[QueueEntryRecord] = sync_cb(cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb))
-    expect(3) ( entries.size  )
-
-  }
-
-  test("load stored message") {
-    val A = add_queue("A")
-    val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
-
-    val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) )
-    expect(ascii("message 1").buffer) {
-      rc.get.buffer
-    }
-  }
-
-  test("get queue status") {
-    val A = add_queue("my queue name")
-    populate(A, "message 1"::"message 2"::"message 3"::Nil)
-
-    val rc:Option[QueueRecord] = sync_cb( cb=> store.get_queue(A)(cb) )
-    expect(ascii("my queue name")) {
-      rc.get.binding_data.ascii
-    }
-  }
-
-  test("list queue entries") {
-    val A = add_queue("A")
-    val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
-
-    val rc:Seq[QueueEntryRecord] = sync_cb( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb) )
-    expect(msg_keys.toSeq.map(_._3)) {
-      rc.map( _.entry_seq )
-    }
-  }
-
-  test("batch completes after a delay") {x}
-  def x = {
-    val A = add_queue("A")
-    var batch = store.create_uow
-
-    val m1 = add_message(batch, "message 1")
-    batch.enqueue(entry(A, 1, m1))
-
-    val tracker = new TaskTracker()
-    val task = tracker.task("uow complete")
-    batch.on_complete(task.run)
-    batch.release
-
-    expect(false) {
-      tracker.await(3, TimeUnit.SECONDS)
-    }
-    expect(true) {
-      tracker.await(5, TimeUnit.SECONDS)
-    }
-  }
-
-  test("flush cancels the delay") {
-    val A = add_queue("A")
-    var batch = store.create_uow
-
-    val m1 = add_message(batch, "message 1")
-    batch.enqueue(entry(A, 1, m1))
-
-    val tracker = new TaskTracker()
-    val task = tracker.task("uow complete")
-    batch.on_complete(task.run)
-    batch.release
-
-    store.flush_message(m1._1) {}
-
-    expect(true) {
-      tracker.await(1, TimeUnit.SECONDS)
-    }
-  }
-
-
 }

Added: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala?rev=1363917&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala Fri Jul 20 19:51:42 2012
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store
+
+import org.apache.activemq.apollo.util._
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.apache.activemq.apollo.util.FileSupport._
+import java.io.{FileInputStream, BufferedInputStream, FileOutputStream, BufferedOutputStream}
+import org.fusesource.hawtdispatch.TaskTracker
+import java.util.concurrent.TimeUnit
+
+
+/**
+ *
+ * @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
+ */
+abstract class StoreTests extends StoreFunSuiteSupport {
+
+  test("add and list queues") {
+    val A = add_queue("A")
+    val B = add_queue("B")
+    val C = add_queue("C")
+
+    val seq:Seq[Long] = List(A,B,C).toSeq
+    expectCB(seq) { cb=>
+      store.list_queues(cb)
+    }
+  }
+
+  test("export and import") {
+    val A = add_queue("A")
+    val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+    val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) )
+    expect(ascii("message 1").buffer) {
+      rc.get.buffer
+    }
+
+    val file = test_data_dir / "export.tgz"
+    file.getParentFile.mkdirs()
+    using( new BufferedOutputStream(new FileOutputStream(file))) { os =>
+    // Export the data...
+      expect(None) {
+        sync_cb[Option[String]] { cb =>
+          store.export_data(os, cb)
+        }
+      }
+    }
+
+    // purge the data..
+    purge
+
+    // There should ne no queues..
+    expectCB(Seq[Long]()) { cb=>
+      store.list_queues(cb)
+    }
+
+    // Import the data..
+    using(new BufferedInputStream(new FileInputStream(file))) { is =>
+      expect(None) {
+        sync_cb[Option[String]] { cb =>
+          store.import_data(is, cb)
+        }
+      }
+    }
+
+    // The data should be there now again..
+    val queues:Seq[Long] = sync_cb(store.list_queues(_))
+    expect(1)(queues.size)
+    val entries:Seq[QueueEntryRecord] = sync_cb(cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb))
+    expect(3) ( entries.size  )
+
+  }
+
+  test("load stored message") {
+    val A = add_queue("A")
+    val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+    val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) )
+    expect(ascii("message 1").buffer) {
+      rc.get.buffer
+    }
+  }
+
+  test("get queue status") {
+    val A = add_queue("my queue name")
+    populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+    val rc:Option[QueueRecord] = sync_cb( cb=> store.get_queue(A)(cb) )
+    expect(ascii("my queue name")) {
+      rc.get.binding_data.ascii
+    }
+  }
+
+  test("list queue entries") {
+    val A = add_queue("A")
+    val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+    val rc:Seq[QueueEntryRecord] = sync_cb( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb) )
+    expect(msg_keys.toSeq.map(_._3)) {
+      rc.map( _.entry_seq )
+    }
+  }
+
+  test("batch completes after a delay") {x}
+  def x = {
+    val A = add_queue("A")
+    var batch = store.create_uow
+
+    val m1 = add_message(batch, "message 1")
+    batch.enqueue(entry(A, 1, m1))
+
+    val tracker = new TaskTracker()
+    val task = tracker.task("uow complete")
+    batch.on_complete(task.run)
+    batch.release
+
+    expect(false) {
+      tracker.await(3, TimeUnit.SECONDS)
+    }
+    expect(true) {
+      tracker.await(5, TimeUnit.SECONDS)
+    }
+  }
+
+  test("flush cancels the delay") {
+    val A = add_queue("A")
+    var batch = store.create_uow
+
+    val m1 = add_message(batch, "message 1")
+    batch.enqueue(entry(A, 1, m1))
+
+    val tracker = new TaskTracker()
+    val task = tracker.task("uow complete")
+    batch.on_complete(task.run)
+    batch.release
+
+    store.flush_message(m1._1) {}
+
+    expect(true) {
+      tracker.await(1, TimeUnit.SECONDS)
+    }
+  }
+}

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Fri Jul 20 19:51:42 2012
@@ -874,6 +874,9 @@ class LevelDBClient(store: LevelDBStore)
                       var locator_buffer: Buffer = null
                       action.enqueues.foreach {
                         entry =>
+                          if (locator == null) {
+                            locator = entry.message_locator.get().asInstanceOf[(Long, Int)]
+                          }
                           assert(locator != null)
                           val (pos, len) = locator
                           if (locator_buffer == null) {

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala Fri Jul 20 19:51:42 2012
@@ -18,12 +18,12 @@ package org.apache.activemq.apollo.broke
  */
 
 import dto.LevelDBStoreDTO
-import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport}
+import org.apache.activemq.apollo.broker.store.{StoreTests, Store, StoreFunSuiteSupport}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class LevelDBStoreTest extends StoreFunSuiteSupport {
+class LevelDBStoreTest extends StoreTests {
 
   def create_store(flushDelay: Long): Store = {
     new LevelDBStore({

Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala?rev=1363917&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala Fri Jul 20 19:51:42 2012
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb
+
+import dto.LevelDBStoreDTO
+import org.apache.activemq.apollo.broker.store.{QueueEntryRecord, Store, StoreFunSuiteSupport}
+import org.fusesource.hawtdispatch.TaskTracker
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+
+/**
+ * <p>Tests specifically for APLO-201</p>
+ *
+ * @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
+ */
+class UowHaveLocatorsTest extends StoreFunSuiteSupport {
+
+
+  override protected def get_flush_delay() = 500
+
+
+  def create_store(flushDelay: Long): Store = {
+    new LevelDBStore({
+      val rc = new LevelDBStoreDTO
+      rc.directory = data_directory
+      rc.flush_delay = flushDelay
+      rc
+    })
+  }
+
+  test("APLO-201: Persistent Store: UOW with message locator and no message (previously flushed)"){
+    val queue = add_queue("A")
+    val batch = store.create_uow
+    val m1 = add_message(batch, "Hello!")
+    val queueEntryRecord: QueueEntryRecord =  entry(queue, 1, m1)
+    batch.enqueue(queueEntryRecord)
+
+    var tracker = new TaskTracker()
+    var task = tracker.task("uow complete")
+    batch.on_complete(task.run)
+    batch.release
+
+    assert(queueEntryRecord.message_locator.get() == null)
+
+    expect(true) {
+      tracker.await(2, TimeUnit.SECONDS)
+    }
+    assert(queueEntryRecord.message_locator.get() != null)
+
+    val batch2 = store.create_uow
+    batch2.enqueue(queueEntryRecord)
+
+    tracker = new TaskTracker()
+    task = tracker.task("uow complete")
+    batch2.on_complete(task.run)
+    batch2.release
+
+    expect(true) {
+      tracker.await(2, TimeUnit.SECONDS)
+    }
+  }
+
+  // needed to get access to the DelayableUOW class
+  // note, this will be "locator_based" since we're using levelDB
+  class LocatorBasedStore(val configDto: LevelDBStoreDTO = new LevelDBStoreDTO) extends LevelDBStore(configDto){
+    def create_uow_delayable() = new DelayableUOW
+  }
+
+  test("Have message locators for locator-based store"){
+
+    val uow = (new LocatorBasedStore).create_uow_delayable()
+    val queueEntry = new QueueEntryRecord
+    queueEntry.message_key = 1L
+    queueEntry.message_locator = new AtomicReference[Object]
+
+    uow.enqueue(queueEntry)
+    assert(uow.have_locators == false)
+
+    uow.dequeue(queueEntry)
+    assert(uow.have_locators == false)
+
+    queueEntry.message_locator.set("test")
+    assert(uow.have_locators == true)
+
+  }
+
+  test("Have message locators for enqueues"){
+    val uow = (new LocatorBasedStore).create_uow_delayable()
+    val queueEntry = new QueueEntryRecord
+    queueEntry.message_key = 1L
+    queueEntry.message_locator = new AtomicReference[Object]
+    queueEntry.message_locator.set("test")
+
+    uow.enqueue(queueEntry)
+    assert(uow.have_locators == true)
+
+  }
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Fri Jul 20 19:51:42 2012
@@ -2536,6 +2536,19 @@ class StompNackTest extends StompTestSup
 
 class StompNackTestOnLevelDBTest extends StompNackTest {
   override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+
+  test("NACKing without DLQ consumer (persistent)"){
+    connect("1.1")
+    sync_send("/queue/nacker.b", "this msg is persistent", "persistent:true\n")
+
+    subscribe("0", "/queue/nacker.b", "client", false, "", false)
+
+    var ack = assert_received("this msg is persistent", "0")
+    ack(false)
+    ack = assert_received("this msg is persistent", "0")
+    ack(false)
+    Thread.sleep(1000)
+  }
 }
 
 class StompDropPolicyTest extends StompTestSupport {