You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/28 19:54:03 UTC

svn commit: r1391573 - in /activemq/activemq-apollo/trunk/apollo-openwire/src: main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala

Author: chirino
Date: Fri Sep 28 17:54:02 2012
New Revision: 1391573

URL: http://svn.apache.org/viewvc?rev=1391573&view=rev
Log:
Fixes: APLO-237 Producer transaction using open wire causes hawtdispatch queue assertion error.

Many thanks to Christian Posta for his contributed test case.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1391573&r1=1391572&r2=1391573&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri Sep 28 17:54:02 2012
@@ -1210,7 +1210,7 @@ class OpenwireProtocolHandler extends Pr
       }
 
       if( uow!=null ) {
-        uow.on_complete(onComplete)
+        uow.on_complete(dispatchQueue{ onComplete })
         uow.release
       } else {
         onComplete

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala?rev=1391573&r1=1391572&r2=1391573&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala Fri Sep 28 17:54:02 2012
@@ -24,7 +24,7 @@ import javax.jms.{TextMessage, Message, 
  */
 class TransactionTest extends OpenwireTestSupport {
 
-  test("Simple JMS Transaction Test") {
+  test("Simple JMS Consumer Transaction Test") {
     connect()
     val dest = queue(next_id("example"))
 
@@ -59,4 +59,45 @@ class TransactionTest extends OpenwireTe
     consumer_session.commit
   }
 
-}
\ No newline at end of file
+  test("Simple JMS Producer Transaction Test"){
+    connect()
+    val dest = queue(next_id("example"))
+
+    val producer_session = default_connection.createSession(true, Session.SESSION_TRANSACTED)
+    val producer = producer_session.createProducer(dest)
+
+    val messages = List(producer_session.createTextMessage("one"), producer_session.createTextMessage("two"), producer_session.createTextMessage("three"))
+
+    producer.send(messages(0))
+
+    val consumer_session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val consumer = consumer_session.createConsumer(dest)
+
+    // should not have anything here
+    var m = consumer.receive(1000).asInstanceOf[TextMessage]
+    m should be (null)
+
+    // commit so consumer can see it
+    producer_session.commit()
+
+    m = consumer.receive(1000).asInstanceOf[TextMessage]
+    m should not be (null)
+    m.getText should equal(messages(0).getText)
+
+    producer.send(messages(1))
+    producer_session.rollback()
+    producer.send(messages(2))
+    producer_session.commit()
+
+    val m3 = consumer.receive(1000).asInstanceOf[TextMessage]
+    m3 should not be (null)
+    m3.getText should equal(messages(2).getText)
+
+  }
+
+}
+
+class OpenwireLevelDBTransactionTest extends TransactionTest {
+  override def broker_config_uri = "xml:classpath:apollo-openwire-leveldb.xml"
+
+}