You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/20 21:51:43 UTC
svn commit: r1363917 - in /activemq/activemq-apollo/trunk:
apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/
apollo-broker/src/test/scala/org/apache/activemq/apo...
Author: chirino
Date: Fri Jul 20 19:51:42 2012
New Revision: 1363917
URL: http://svn.apache.org/viewvc?rev=1363917&view=rev
Log:
Fix for APLO-201: Warning on reaching nack limit(DLQ): java.lang.AssertionError: assertion failed: uow.have_locators
These are Christian Posta's patches. Many thanks!
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala Fri Jul 20 19:51:42 2012
@@ -17,12 +17,12 @@
package org.apache.activemq.apollo.broker.store.bdb
import dto.BDBStoreDTO
-import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport}
+import org.apache.activemq.apollo.broker.store.{StoreTests, Store, StoreFunSuiteSupport}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class BDBStoreTest extends StoreFunSuiteSupport {
+class BDBStoreTest extends StoreTests {
def create_store(flushDelay:Long):Store = {
val rc = new BDBStore({
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Fri Jul 20 19:51:42 2012
@@ -252,16 +252,28 @@ trait DelayingStoreSupport extends Store
def have_locators:Boolean = {
actions.values.foreach{ a =>
// There must either be a dequeue or a message record for a enqueue request.
- if( !a.enqueues.isEmpty && ( a.message_record==null && a.dequeues.isEmpty ) ) {
- return false
- }
- if( locator_based && a.message_record==null && !a.dequeues.isEmpty ) {
- a.dequeues.foreach { d =>
- if ( d.message_locator.get() == null ) {
- return false
+ // if not, then there should be a message locator
+
+ if( locator_based && a.message_record==null) {
+ if(!a.dequeues.isEmpty ){
+ a.dequeues.foreach { d =>
+ if ( d.message_locator.get() == null ) {
+ return false
+ }
+ }
+ }
+ else if (!a.enqueues.isEmpty){
+ a.enqueues.foreach { e =>
+ if ( e.message_locator.get() == null ) {
+ return false
+ }
}
}
}
+ else if( !a.enqueues.isEmpty && ( a.message_record==null && a.dequeues.isEmpty ) ) {
+ return false
+ }
+
}
true
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Fri Jul 20 19:51:42 2012
@@ -37,6 +37,7 @@ abstract class StoreFunSuiteSupport exte
var store:Store = null
def create_store(flushDelay:Long):Store
+ protected def get_flush_delay(): Long = 5*1000
/**
* Handy helper to call an async method on the store and wait for
@@ -49,7 +50,7 @@ abstract class StoreFunSuiteSupport exte
override protected def beforeAll() = {
super.beforeAll()
data_directory.recursive_delete
- store = create_store(5*1000)
+ store = create_store(get_flush_delay())
val tracker = new LoggingTracker("store startup")
tracker.start(store)
tracker.await
@@ -133,131 +134,4 @@ abstract class StoreFunSuiteSupport exte
msg_keys
}
- test("add and list queues") {
- val A = add_queue("A")
- val B = add_queue("B")
- val C = add_queue("C")
-
- val seq:Seq[Long] = List(A,B,C).toSeq
- expectCB(seq) { cb=>
- store.list_queues(cb)
- }
- }
-
- test("export and import") {
- val A = add_queue("A")
- val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
-
- val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) )
- expect(ascii("message 1").buffer) {
- rc.get.buffer
- }
-
- val file = test_data_dir / "export.tgz"
- file.getParentFile.mkdirs()
- using( new BufferedOutputStream(new FileOutputStream(file))) { os =>
- // Export the data...
- expect(None) {
- sync_cb[Option[String]] { cb =>
- store.export_data(os, cb)
- }
- }
- }
-
- // purge the data..
- purge
-
- // There should ne no queues..
- expectCB(Seq[Long]()) { cb=>
- store.list_queues(cb)
- }
-
- // Import the data..
- using(new BufferedInputStream(new FileInputStream(file))) { is =>
- expect(None) {
- sync_cb[Option[String]] { cb =>
- store.import_data(is, cb)
- }
- }
- }
-
- // The data should be there now again..
- val queues:Seq[Long] = sync_cb(store.list_queues(_))
- expect(1)(queues.size)
- val entries:Seq[QueueEntryRecord] = sync_cb(cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb))
- expect(3) ( entries.size )
-
- }
-
- test("load stored message") {
- val A = add_queue("A")
- val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
-
- val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) )
- expect(ascii("message 1").buffer) {
- rc.get.buffer
- }
- }
-
- test("get queue status") {
- val A = add_queue("my queue name")
- populate(A, "message 1"::"message 2"::"message 3"::Nil)
-
- val rc:Option[QueueRecord] = sync_cb( cb=> store.get_queue(A)(cb) )
- expect(ascii("my queue name")) {
- rc.get.binding_data.ascii
- }
- }
-
- test("list queue entries") {
- val A = add_queue("A")
- val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
-
- val rc:Seq[QueueEntryRecord] = sync_cb( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb) )
- expect(msg_keys.toSeq.map(_._3)) {
- rc.map( _.entry_seq )
- }
- }
-
- test("batch completes after a delay") {x}
- def x = {
- val A = add_queue("A")
- var batch = store.create_uow
-
- val m1 = add_message(batch, "message 1")
- batch.enqueue(entry(A, 1, m1))
-
- val tracker = new TaskTracker()
- val task = tracker.task("uow complete")
- batch.on_complete(task.run)
- batch.release
-
- expect(false) {
- tracker.await(3, TimeUnit.SECONDS)
- }
- expect(true) {
- tracker.await(5, TimeUnit.SECONDS)
- }
- }
-
- test("flush cancels the delay") {
- val A = add_queue("A")
- var batch = store.create_uow
-
- val m1 = add_message(batch, "message 1")
- batch.enqueue(entry(A, 1, m1))
-
- val tracker = new TaskTracker()
- val task = tracker.task("uow complete")
- batch.on_complete(task.run)
- batch.release
-
- store.flush_message(m1._1) {}
-
- expect(true) {
- tracker.await(1, TimeUnit.SECONDS)
- }
- }
-
-
}
Added: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala?rev=1363917&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala Fri Jul 20 19:51:42 2012
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store
+
+import org.apache.activemq.apollo.util._
+import org.fusesource.hawtbuf.AsciiBuffer._
+import org.apache.activemq.apollo.util.FileSupport._
+import java.io.{FileInputStream, BufferedInputStream, FileOutputStream, BufferedOutputStream}
+import org.fusesource.hawtdispatch.TaskTracker
+import java.util.concurrent.TimeUnit
+
+
+/**
+ *
+ * @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
+ */
+abstract class StoreTests extends StoreFunSuiteSupport {
+
+ test("add and list queues") {
+ val A = add_queue("A")
+ val B = add_queue("B")
+ val C = add_queue("C")
+
+ val seq:Seq[Long] = List(A,B,C).toSeq
+ expectCB(seq) { cb=>
+ store.list_queues(cb)
+ }
+ }
+
+ test("export and import") {
+ val A = add_queue("A")
+ val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+ val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) )
+ expect(ascii("message 1").buffer) {
+ rc.get.buffer
+ }
+
+ val file = test_data_dir / "export.tgz"
+ file.getParentFile.mkdirs()
+ using( new BufferedOutputStream(new FileOutputStream(file))) { os =>
+ // Export the data...
+ expect(None) {
+ sync_cb[Option[String]] { cb =>
+ store.export_data(os, cb)
+ }
+ }
+ }
+
+ // purge the data..
+ purge
+
+ // There should ne no queues..
+ expectCB(Seq[Long]()) { cb=>
+ store.list_queues(cb)
+ }
+
+ // Import the data..
+ using(new BufferedInputStream(new FileInputStream(file))) { is =>
+ expect(None) {
+ sync_cb[Option[String]] { cb =>
+ store.import_data(is, cb)
+ }
+ }
+ }
+
+ // The data should be there now again..
+ val queues:Seq[Long] = sync_cb(store.list_queues(_))
+ expect(1)(queues.size)
+ val entries:Seq[QueueEntryRecord] = sync_cb(cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb))
+ expect(3) ( entries.size )
+
+ }
+
+ test("load stored message") {
+ val A = add_queue("A")
+ val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+ val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) )
+ expect(ascii("message 1").buffer) {
+ rc.get.buffer
+ }
+ }
+
+ test("get queue status") {
+ val A = add_queue("my queue name")
+ populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+ val rc:Option[QueueRecord] = sync_cb( cb=> store.get_queue(A)(cb) )
+ expect(ascii("my queue name")) {
+ rc.get.binding_data.ascii
+ }
+ }
+
+ test("list queue entries") {
+ val A = add_queue("A")
+ val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+ val rc:Seq[QueueEntryRecord] = sync_cb( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb) )
+ expect(msg_keys.toSeq.map(_._3)) {
+ rc.map( _.entry_seq )
+ }
+ }
+
+ test("batch completes after a delay") {x}
+ def x = {
+ val A = add_queue("A")
+ var batch = store.create_uow
+
+ val m1 = add_message(batch, "message 1")
+ batch.enqueue(entry(A, 1, m1))
+
+ val tracker = new TaskTracker()
+ val task = tracker.task("uow complete")
+ batch.on_complete(task.run)
+ batch.release
+
+ expect(false) {
+ tracker.await(3, TimeUnit.SECONDS)
+ }
+ expect(true) {
+ tracker.await(5, TimeUnit.SECONDS)
+ }
+ }
+
+ test("flush cancels the delay") {
+ val A = add_queue("A")
+ var batch = store.create_uow
+
+ val m1 = add_message(batch, "message 1")
+ batch.enqueue(entry(A, 1, m1))
+
+ val tracker = new TaskTracker()
+ val task = tracker.task("uow complete")
+ batch.on_complete(task.run)
+ batch.release
+
+ store.flush_message(m1._1) {}
+
+ expect(true) {
+ tracker.await(1, TimeUnit.SECONDS)
+ }
+ }
+}
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Fri Jul 20 19:51:42 2012
@@ -874,6 +874,9 @@ class LevelDBClient(store: LevelDBStore)
var locator_buffer: Buffer = null
action.enqueues.foreach {
entry =>
+ if (locator == null) {
+ locator = entry.message_locator.get().asInstanceOf[(Long, Int)]
+ }
assert(locator != null)
val (pos, len) = locator
if (locator_buffer == null) {
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala Fri Jul 20 19:51:42 2012
@@ -18,12 +18,12 @@ package org.apache.activemq.apollo.broke
*/
import dto.LevelDBStoreDTO
-import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport}
+import org.apache.activemq.apollo.broker.store.{StoreTests, Store, StoreFunSuiteSupport}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class LevelDBStoreTest extends StoreFunSuiteSupport {
+class LevelDBStoreTest extends StoreTests {
def create_store(flushDelay: Long): Store = {
new LevelDBStore({
Added: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala?rev=1363917&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala Fri Jul 20 19:51:42 2012
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store.leveldb
+
+import dto.LevelDBStoreDTO
+import org.apache.activemq.apollo.broker.store.{QueueEntryRecord, Store, StoreFunSuiteSupport}
+import org.fusesource.hawtdispatch.TaskTracker
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+
+/**
+ * <p>Tests specifically for APLO-201</p>
+ *
+ * @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
+ */
+class UowHaveLocatorsTest extends StoreFunSuiteSupport {
+
+
+ override protected def get_flush_delay() = 500
+
+
+ def create_store(flushDelay: Long): Store = {
+ new LevelDBStore({
+ val rc = new LevelDBStoreDTO
+ rc.directory = data_directory
+ rc.flush_delay = flushDelay
+ rc
+ })
+ }
+
+ test("APLO-201: Persistent Store: UOW with message locator and no message (previously flushed)"){
+ val queue = add_queue("A")
+ val batch = store.create_uow
+ val m1 = add_message(batch, "Hello!")
+ val queueEntryRecord: QueueEntryRecord = entry(queue, 1, m1)
+ batch.enqueue(queueEntryRecord)
+
+ var tracker = new TaskTracker()
+ var task = tracker.task("uow complete")
+ batch.on_complete(task.run)
+ batch.release
+
+ assert(queueEntryRecord.message_locator.get() == null)
+
+ expect(true) {
+ tracker.await(2, TimeUnit.SECONDS)
+ }
+ assert(queueEntryRecord.message_locator.get() != null)
+
+ val batch2 = store.create_uow
+ batch2.enqueue(queueEntryRecord)
+
+ tracker = new TaskTracker()
+ task = tracker.task("uow complete")
+ batch2.on_complete(task.run)
+ batch2.release
+
+ expect(true) {
+ tracker.await(2, TimeUnit.SECONDS)
+ }
+ }
+
+ // needed to get access to the DelayableUOW class
+ // note, this will be "locator_based" since we're using levelDB
+ class LocatorBasedStore(val configDto: LevelDBStoreDTO = new LevelDBStoreDTO) extends LevelDBStore(configDto){
+ def create_uow_delayable() = new DelayableUOW
+ }
+
+ test("Have message locators for locator-based store"){
+
+ val uow = (new LocatorBasedStore).create_uow_delayable()
+ val queueEntry = new QueueEntryRecord
+ queueEntry.message_key = 1L
+ queueEntry.message_locator = new AtomicReference[Object]
+
+ uow.enqueue(queueEntry)
+ assert(uow.have_locators == false)
+
+ uow.dequeue(queueEntry)
+ assert(uow.have_locators == false)
+
+ queueEntry.message_locator.set("test")
+ assert(uow.have_locators == true)
+
+ }
+
+ test("Have message locators for enqueues"){
+ val uow = (new LocatorBasedStore).create_uow_delayable()
+ val queueEntry = new QueueEntryRecord
+ queueEntry.message_key = 1L
+ queueEntry.message_locator = new AtomicReference[Object]
+ queueEntry.message_locator.set("test")
+
+ uow.enqueue(queueEntry)
+ assert(uow.have_locators == true)
+
+ }
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1363917&r1=1363916&r2=1363917&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Fri Jul 20 19:51:42 2012
@@ -2536,6 +2536,19 @@ class StompNackTest extends StompTestSup
class StompNackTestOnLevelDBTest extends StompNackTest {
override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+
+ test("NACKing without DLQ consumer (persistent)"){
+ connect("1.1")
+ sync_send("/queue/nacker.b", "this msg is persistent", "persistent:true\n")
+
+ subscribe("0", "/queue/nacker.b", "client", false, "", false)
+
+ var ack = assert_received("this msg is persistent", "0")
+ ack(false)
+ ack = assert_received("this msg is persistent", "0")
+ ack(false)
+ Thread.sleep(1000)
+ }
}
class StompDropPolicyTest extends StompTestSupport {