You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/03/21 18:07:11 UTC
svn commit: r1459402 - in /activemq/activemq-apollo/trunk: ./ apollo-amqp/
apollo-broker/src/test/scala/ apollo-itests/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
apollo-stomp/src/test/resources/ apollo-stomp/src/test/scala/org/apach...
Author: chirino
Date: Thu Mar 21 17:07:11 2013
New Revision: 1459402
URL: http://svn.apache.org/r1459402
Log:
Add a test to verify that the pending_stores counter goes down to zero once a queue is no longer blocking producers.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-itests/pom.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/pom.xml?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/pom.xml Thu Mar 21 17:07:11 2013
@@ -215,18 +215,7 @@
</execution>
</executions>
</plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>${maven-surefire-plugin-version}</version>
- <configuration>
- <parallel>classes</parallel>
- <perCoreThreadCount>false</perCoreThreadCount>
- <threadCount>1</threadCount>
- </configuration>
- </plugin>
-
+
</plugins>
</build>
<profiles>
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Thu Mar 21 17:07:11 2013
@@ -24,12 +24,13 @@ import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.dto.{DestMetricsDTO, AggregateDestMetricsDTO, QueueStatusDTO, TopicStatusDTO}
import collection.immutable.HashMap
import java.io.File
-import org.scalatest.{ParallelTestExecution, OneInstancePerTest}
+import org.scalatest.{Tag, FunSuite, ParallelTestExecution, OneInstancePerTest}
import java.util
import com.fasterxml.jackson.databind.ObjectMapper
import org.fusesource.hawtbuf.{ByteArrayOutputStream, Buffer}
import com.fasterxml.jackson.annotation.JsonInclude
import util.concurrent.CountDownLatch
+import java.util.concurrent.locks.{ReentrantReadWriteLock, Lock, ReadWriteLock}
object BrokerTestSupport {
import FutureResult._
@@ -113,9 +114,11 @@ object BrokerTestSupport {
}
-trait BrokerParallelTestExecution extends ParallelTestExecution {
+trait BrokerParallelTestExecution extends FunSuite with ParallelTestExecution {
self: BrokerFunSuiteSupport =>
+ var test_rw_lock = new ReentrantReadWriteLock();
+
override def newInstance = {
val rc = super.newInstance.asInstanceOf[BrokerFunSuiteSupport]
rc.before_and_after_all_object = self
@@ -124,6 +127,27 @@ trait BrokerParallelTestExecution extend
rc
}
+ def run_exclusive(testFun: => Unit):Unit = {
+ test_rw_lock.readLock().unlock()
+ test_rw_lock.writeLock().lock()
+ try {
+ testFun
+ } finally {
+ test_rw_lock.writeLock().unlock()
+ test_rw_lock.readLock().lock()
+ }
+ }
+
+ override protected def test(testName: String, testTags: Tag*)(testFun: => Unit) {
+ super.test(testName, testTags:_*) {
+ test_rw_lock.readLock().lock()
+ try {
+ testFun
+ } finally {
+ test_rw_lock.readLock().unlock()
+ }
+ }
+ }
}
/**
Modified: activemq/activemq-apollo/trunk/apollo-itests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/pom.xml?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/pom.xml Thu Mar 21 17:07:11 2013
@@ -154,9 +154,6 @@
<exclude>**/JmsQueueTransactionTest.*</exclude>
<exclude>**/JmsTopicTransactionTest.*</exclude>
</excludes>
- <parallel>classes</parallel>
- <perCoreThreadCount>false</perCoreThreadCount>
- <threadCount>1</threadCount>
</configuration>
</plugin>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Mar 21 17:07:11 2013
@@ -805,7 +805,6 @@ class StompProtocolHandler extends Proto
private def die[T](headers:HeaderMap, body:String):T = {
if( !dead ) {
dead = true
- waiting_on = ()=>"shutdown"
connection.transport.resumeRead
if( body.isEmpty ) {
@@ -846,7 +845,7 @@ class StompProtocolHandler extends Proto
override def on_transport_failure(error: IOException) = {
if( !closed ) {
- suspend_read("shutdown")
+ suspend_read(waiting_on())
connection_log.info("Shutting connection '%s' down due to: %s", security_context.remote_address, error)
disconnect(false)
}
@@ -870,12 +869,6 @@ class StompProtocolHandler extends Proto
}
transactions.clear()
- producer_routes.values().foreach{ route=>
- host.dispatch_queue {
- host.router.disconnect(route.addresses, route)
- }
- }
- producer_routes.clear
consumers.foreach { case (_,consumer)=>
val addresses = consumer.addresses
host.dispatch_queue {
@@ -890,8 +883,28 @@ class StompProtocolHandler extends Proto
}
})
trace("stomp protocol resources released")
+
+ waiting_on = ()=> {
+ val routes = producer_routes.values().flatMap { route =>
+ if( route.routing_items > 0 ) {
+ Some(route.dest+"("+route.routing_items+")")
+ } else {
+ None
+ }
+ }.mkString(", ")
+ "Delivery competition to: "+routes
+ }
+
on_routing_empty {
+ producer_routes.values().foreach{ route=>
+ host.dispatch_queue {
+ host.router.disconnect(route.addresses, route)
+ }
+ }
+ producer_routes.clear
+
if( delay ) {
+ waiting_on = ()=>"die delay"
queue.after(die_delay, TimeUnit.MILLISECONDS) {
connection.stop(NOOP)
}
@@ -1171,9 +1184,12 @@ class StompProtocolHandler extends Proto
var routing_items = 0
+ val deliveries_waiting_for_ack = new util.HashSet[Delivery]()
+
override def offer(delivery: Delivery): Boolean = {
if( full )
return false
+ deliveries_waiting_for_ack.add(delivery)
routing_size += delivery.size
routing_items += 1
val original_ack = delivery.ack
@@ -1184,6 +1200,7 @@ class StompProtocolHandler extends Proto
}
routing_items -= 1
routing_size -= delivery.size
+ deliveries_waiting_for_ack.remove(delivery)
if( routing_size==0 && !pending_routing_empty_callbacks.isEmpty) {
val t = pending_routing_empty_callbacks
pending_routing_empty_callbacks = ListBuffer()
@@ -1231,13 +1248,7 @@ class StompProtocolHandler extends Proto
}
}
- var producer_routes = new LRUCache[AsciiBuffer, StompProducerRoute](1) {
- override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
- host.dispatch_queue {
- host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
- }
- }
- }
+ var producer_routes = new java.util.HashMap[AsciiBuffer, StompProducerRoute]()
def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
val dest = get(frame.headers, DESTINATION).get
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml Thu Mar 21 17:07:11 2013
@@ -37,11 +37,12 @@
<queue id="drop.head.non" full_policy="drop head" tail_buffer="100k" persistent="false"/>
<queue id="drop.tail.non" full_policy="drop tail" tail_buffer="100k" persistent="false"/>
<queue id="noroundrobin.**" round_robin="false"/>
+ <queue id="pending_stores.**" swap="false"/>
<leveldb_store directory="${testdatadir}"/>
</virtual_host>
- <web_admin bind="http://0.0.0.0:61680"/>
+ <!--<web_admin bind="http://0.0.0.0:61680"/>-->
<connector id="tcp" bind="tcp://0.0.0.0:0"/>
<connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
<connector id="stomp-udp" bind="udp://0.0.0.0:0" protocol="stomp-udp"/>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala Thu Mar 21 17:07:11 2013
@@ -84,6 +84,20 @@ class StompClient extends ShouldMatchers
throw new EOFException()
}
+ def receive(timeout:Int): String = {
+ val original = socket.getSoTimeout
+ try {
+ socket.setSoTimeout(timeout)
+ receive()
+ } finally {
+ try {
+ socket.setSoTimeout(original)
+ } catch {
+ case _:Throwable =>
+ }
+ }
+ }
+
def receive(): String = {
var start = true;
val buffer = new BAOS()
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala Thu Mar 21 17:07:11 2013
@@ -19,11 +19,66 @@ package org.apache.activemq.apollo.stomp
import java.lang.String
import java.util.concurrent.TimeUnit._
import org.apache.activemq.apollo.broker._
+import java.net.SocketTimeoutException
+import java.util.concurrent.CountDownLatch
-class StompLevelDBParallelTest extends StompParallelTest with BrokerParallelTestExecution {
+class StompLevelDBParallelTest extends StompParallelTest {
override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+ def skip_if_not_leveldb = skip(!broker_config_uri.endsWith("-leveldb.xml"))
+
+ test("pending_stores stuck") {
+ skip_if_not_leveldb
+
+ def pending_stores = {
+ var rc = 0
+ val done =new CountDownLatch(1);
+ broker.default_virtual_host.store.get_store_status{ status =>
+ rc = status.pending_stores
+ done.countDown()
+ }
+ done.await()
+ rc
+ }
+
+ run_exclusive {
+ connect("1.1")
+ val dest = next_id("pending_stores.n")
+ var data: String = "x" * 1024
+ var done = false
+
+ within(10, SECONDS) {
+ pending_stores should be (0)
+ }
+
+ var sent = 0
+ while(!done) {
+ async_send("/queue/"+dest, data, "persistent:true\nreceipt:x\n")
+ try {
+ wait_for_receipt("x", timeout = 2000)
+ sent += 1
+ } catch {
+ case e:SocketTimeoutException =>
+ done = true
+ }
+ }
+
+ pending_stores should be (1)
+ close()
+ connect("1.1")
+ subscribe("mysub", "/queue/"+dest)
+ for( i <- 0 until sent) {
+ assert_received(data)
+ }
+
+ within(10, SECONDS) {
+ pending_stores should be (0)
+ }
+ }
+ }
+
+
test("(APLO-198) Apollo sometimes does not send all the messages in a queue") {
skip_if_using_store
connect("1.1")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Thu Mar 21 17:07:11 2013
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.Atomi
class StompParallelTest extends StompTestSupport with BrokerParallelTestExecution {
def skip_if_using_store = skip(broker_config_uri.endsWith("-bdb.xml") || broker_config_uri.endsWith("-leveldb.xml"))
+ def skip_if_not_using_store = skip(!(broker_config_uri.endsWith("-bdb.xml") || broker_config_uri.endsWith("-leveldb.xml")))
test("Stomp 1.0 CONNECT") {
connect("1.0")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala Thu Mar 21 17:07:11 2013
@@ -206,14 +206,14 @@ class StompTestSupport extends BrokerFun
})
}
- def wait_for_receipt(id: String, c: StompClient = client, discard_others: Boolean = false): Unit = {
+ def wait_for_receipt(id: String, c: StompClient = client, discard_others: Boolean = false, timeout:Int=10000): Unit = {
if (!discard_others) {
- val frame = c.receive()
+ val frame = c.receive(timeout)
frame should startWith("RECEIPT\n")
frame should include("receipt-id:" + id + "\n")
} else {
while (true) {
- val frame = c.receive()
+ val frame = c.receive(timeout)
if (frame.startsWith("RECEIPT\n") && frame.indexOf("receipt-id:" + id + "\n") >= 0) {
return
}
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Thu Mar 21 17:07:11 2013
@@ -315,9 +315,6 @@
<exclude>**/legacy/**</exclude>
<exclude>**/jaxb/**</exclude>
</excludes>
- <parallel>classes</parallel>
- <perCoreThreadCount>false</perCoreThreadCount>
- <threadCount>1</threadCount>
</configuration>
</plugin>