You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/03/21 18:07:11 UTC

svn commit: r1459402 - in /activemq/activemq-apollo/trunk: ./ apollo-amqp/ apollo-broker/src/test/scala/ apollo-itests/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/resources/ apollo-stomp/src/test/scala/org/apach...

Author: chirino
Date: Thu Mar 21 17:07:11 2013
New Revision: 1459402

URL: http://svn.apache.org/r1459402
Log:
Add a test to verify that the pending_stores counter goes down to zero once a queue is no longer blocking producers.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-itests/pom.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/pom.xml?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/pom.xml Thu Mar 21 17:07:11 2013
@@ -215,18 +215,7 @@
           </execution>
         </executions>
       </plugin>
-      
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <version>${maven-surefire-plugin-version}</version>
-        <configuration>
-          <parallel>classes</parallel>
-          <perCoreThreadCount>false</perCoreThreadCount>
-          <threadCount>1</threadCount> 
-        </configuration>
-      </plugin>
-      
+            
     </plugins>
   </build>
   <profiles>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Thu Mar 21 17:07:11 2013
@@ -24,12 +24,13 @@ import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.dto.{DestMetricsDTO, AggregateDestMetricsDTO, QueueStatusDTO, TopicStatusDTO}
 import collection.immutable.HashMap
 import java.io.File
-import org.scalatest.{ParallelTestExecution, OneInstancePerTest}
+import org.scalatest.{Tag, FunSuite, ParallelTestExecution, OneInstancePerTest}
 import java.util
 import com.fasterxml.jackson.databind.ObjectMapper
 import org.fusesource.hawtbuf.{ByteArrayOutputStream, Buffer}
 import com.fasterxml.jackson.annotation.JsonInclude
 import util.concurrent.CountDownLatch
+import java.util.concurrent.locks.{ReentrantReadWriteLock, Lock, ReadWriteLock}
 
 object BrokerTestSupport {
   import FutureResult._
@@ -113,9 +114,11 @@ object BrokerTestSupport {
 
 }
 
-trait BrokerParallelTestExecution extends ParallelTestExecution {
+trait BrokerParallelTestExecution extends FunSuite with ParallelTestExecution {
   self: BrokerFunSuiteSupport =>
 
+  var test_rw_lock = new ReentrantReadWriteLock();
+
   override def newInstance = {
     val rc = super.newInstance.asInstanceOf[BrokerFunSuiteSupport]
     rc.before_and_after_all_object = self
@@ -124,6 +127,27 @@ trait BrokerParallelTestExecution extend
     rc
   }
 
+  def run_exclusive(testFun: => Unit):Unit = {
+    test_rw_lock.readLock().unlock()
+    test_rw_lock.writeLock().lock()
+    try {
+      testFun
+    } finally {
+      test_rw_lock.writeLock().unlock()
+      test_rw_lock.readLock().lock()
+    }
+  }
+
+  override protected def test(testName: String, testTags: Tag*)(testFun: => Unit) {
+    super.test(testName, testTags:_*) {
+      test_rw_lock.readLock().lock()
+      try {
+        testFun
+      } finally {
+        test_rw_lock.readLock().unlock()
+      }
+    }
+  }
 }
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-itests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/pom.xml?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/pom.xml Thu Mar 21 17:07:11 2013
@@ -154,9 +154,6 @@
             <exclude>**/JmsQueueTransactionTest.*</exclude>
             <exclude>**/JmsTopicTransactionTest.*</exclude>
           </excludes>
-          <parallel>classes</parallel>
-          <perCoreThreadCount>false</perCoreThreadCount>
-          <threadCount>1</threadCount> 
         </configuration>
       </plugin>
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Mar 21 17:07:11 2013
@@ -805,7 +805,6 @@ class StompProtocolHandler extends Proto
   private def die[T](headers:HeaderMap, body:String):T = {
     if( !dead ) {
       dead = true
-      waiting_on = ()=>"shutdown"
       connection.transport.resumeRead
 
       if( body.isEmpty ) {
@@ -846,7 +845,7 @@ class StompProtocolHandler extends Proto
 
   override def on_transport_failure(error: IOException) = {
     if( !closed ) {
-      suspend_read("shutdown")
+      suspend_read(waiting_on())
       connection_log.info("Shutting connection '%s'  down due to: %s", security_context.remote_address, error)
       disconnect(false)
     }
@@ -870,12 +869,6 @@ class StompProtocolHandler extends Proto
       }
       transactions.clear()
 
-      producer_routes.values().foreach{ route=>
-        host.dispatch_queue {
-          host.router.disconnect(route.addresses, route)
-        }
-      }
-      producer_routes.clear
       consumers.foreach { case (_,consumer)=>
         val addresses = consumer.addresses
         host.dispatch_queue {
@@ -890,8 +883,28 @@ class StompProtocolHandler extends Proto
         }
       })
       trace("stomp protocol resources released")
+
+      waiting_on = ()=> {
+        val routes = producer_routes.values().flatMap { route =>
+          if( route.routing_items > 0 ) {
+            Some(route.dest+"("+route.routing_items+")")
+          } else {
+            None
+          }
+        }.mkString(", ")
+        "Delivery competition to: "+routes
+      }
+
       on_routing_empty {
+        producer_routes.values().foreach{ route=>
+          host.dispatch_queue {
+            host.router.disconnect(route.addresses, route)
+          }
+        }
+        producer_routes.clear
+
         if( delay ) {
+          waiting_on = ()=>"die delay"
           queue.after(die_delay, TimeUnit.MILLISECONDS) {
             connection.stop(NOOP)
           }
@@ -1171,9 +1184,12 @@ class StompProtocolHandler extends Proto
 
     var routing_items = 0
 
+    val deliveries_waiting_for_ack = new util.HashSet[Delivery]()
+
     override def offer(delivery: Delivery): Boolean = {
       if( full )
         return false
+      deliveries_waiting_for_ack.add(delivery)
       routing_size += delivery.size
       routing_items += 1
       val original_ack = delivery.ack
@@ -1184,6 +1200,7 @@ class StompProtocolHandler extends Proto
         }
         routing_items -= 1
         routing_size -= delivery.size
+        deliveries_waiting_for_ack.remove(delivery)
         if( routing_size==0 && !pending_routing_empty_callbacks.isEmpty) {
           val t = pending_routing_empty_callbacks
           pending_routing_empty_callbacks = ListBuffer()
@@ -1231,13 +1248,7 @@ class StompProtocolHandler extends Proto
     }
   }
 
-  var producer_routes = new LRUCache[AsciiBuffer, StompProducerRoute](1) {
-    override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
-      host.dispatch_queue {
-        host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
-      }
-    }
-  }
+  var producer_routes = new java.util.HashMap[AsciiBuffer, StompProducerRoute]()
 
   def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
     val dest = get(frame.headers, DESTINATION).get

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml Thu Mar 21 17:07:11 2013
@@ -37,11 +37,12 @@
     <queue id="drop.head.non" full_policy="drop head" tail_buffer="100k" persistent="false"/>
     <queue id="drop.tail.non" full_policy="drop tail" tail_buffer="100k" persistent="false"/>
     <queue id="noroundrobin.**" round_robin="false"/>
+    <queue id="pending_stores.**" swap="false"/>
 
     <leveldb_store directory="${testdatadir}"/>
   </virtual_host>
 
-  <web_admin bind="http://0.0.0.0:61680"/>
+  <!--<web_admin bind="http://0.0.0.0:61680"/>-->
   <connector id="tcp" bind="tcp://0.0.0.0:0"/>
   <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
   <connector id="stomp-udp" bind="udp://0.0.0.0:0" protocol="stomp-udp"/>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala Thu Mar 21 17:07:11 2013
@@ -84,6 +84,20 @@ class StompClient extends ShouldMatchers
     throw new EOFException()
   }
 
+  def receive(timeout:Int): String = {
+    val original = socket.getSoTimeout
+    try {
+      socket.setSoTimeout(timeout)
+      receive()
+    } finally {
+      try {
+        socket.setSoTimeout(original)
+      } catch {
+        case _:Throwable =>
+      }
+    }
+  }
+
   def receive(): String = {
     var start = true;
     val buffer = new BAOS()

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala Thu Mar 21 17:07:11 2013
@@ -19,11 +19,66 @@ package org.apache.activemq.apollo.stomp
 import java.lang.String
 import java.util.concurrent.TimeUnit._
 import org.apache.activemq.apollo.broker._
+import java.net.SocketTimeoutException
+import java.util.concurrent.CountDownLatch
 
-class StompLevelDBParallelTest extends StompParallelTest with BrokerParallelTestExecution {
+class StompLevelDBParallelTest extends StompParallelTest {
 
   override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
 
+  def skip_if_not_leveldb = skip(!broker_config_uri.endsWith("-leveldb.xml"))
+
+  test("pending_stores stuck") {
+    skip_if_not_leveldb
+
+    def pending_stores = {
+      var rc = 0
+      val done =new CountDownLatch(1);
+      broker.default_virtual_host.store.get_store_status{ status =>
+        rc = status.pending_stores
+        done.countDown()
+      }
+      done.await()
+      rc
+    }
+
+    run_exclusive {
+      connect("1.1")
+      val dest = next_id("pending_stores.n")
+      var data: String = "x" * 1024
+      var done = false
+
+      within(10, SECONDS) {
+        pending_stores should be (0)
+      }
+
+      var sent = 0
+      while(!done) {
+        async_send("/queue/"+dest, data, "persistent:true\nreceipt:x\n")
+        try {
+          wait_for_receipt("x", timeout = 2000)
+          sent += 1
+        } catch {
+          case e:SocketTimeoutException =>
+            done = true
+        }
+      }
+
+      pending_stores should be (1)
+      close()
+      connect("1.1")
+      subscribe("mysub", "/queue/"+dest)
+      for( i <- 0 until sent) {
+        assert_received(data)
+      }
+
+      within(10, SECONDS) {
+        pending_stores should be (0)
+      }
+    }
+  }
+
+
   test("(APLO-198) Apollo sometimes does not send all the messages in a queue") {
     skip_if_using_store
     connect("1.1")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Thu Mar 21 17:07:11 2013
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.Atomi
 class StompParallelTest extends StompTestSupport with BrokerParallelTestExecution {
 
   def skip_if_using_store = skip(broker_config_uri.endsWith("-bdb.xml") || broker_config_uri.endsWith("-leveldb.xml"))
+  def skip_if_not_using_store = skip(!(broker_config_uri.endsWith("-bdb.xml") || broker_config_uri.endsWith("-leveldb.xml")))
 
   test("Stomp 1.0 CONNECT") {
     connect("1.0")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala Thu Mar 21 17:07:11 2013
@@ -206,14 +206,14 @@ class StompTestSupport extends BrokerFun
     })
   }
 
-  def wait_for_receipt(id: String, c: StompClient = client, discard_others: Boolean = false): Unit = {
+  def wait_for_receipt(id: String, c: StompClient = client, discard_others: Boolean = false, timeout:Int=10000): Unit = {
     if (!discard_others) {
-      val frame = c.receive()
+      val frame = c.receive(timeout)
       frame should startWith("RECEIPT\n")
       frame should include("receipt-id:" + id + "\n")
     } else {
       while (true) {
-        val frame = c.receive()
+        val frame = c.receive(timeout)
         if (frame.startsWith("RECEIPT\n") && frame.indexOf("receipt-id:" + id + "\n") >= 0) {
           return
         }

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1459402&r1=1459401&r2=1459402&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Thu Mar 21 17:07:11 2013
@@ -315,9 +315,6 @@
               <exclude>**/legacy/**</exclude>
               <exclude>**/jaxb/**</exclude>
             </excludes>
-            <parallel>classes</parallel>
-            <perCoreThreadCount>false</perCoreThreadCount>
-            <threadCount>1</threadCount> 
           </configuration>
         </plugin>