You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/28 19:54:03 UTC
svn commit: r1391573 - in
/activemq/activemq-apollo/trunk/apollo-openwire/src:
main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
Author: chirino
Date: Fri Sep 28 17:54:02 2012
New Revision: 1391573
URL: http://svn.apache.org/viewvc?rev=1391573&view=rev
Log:
Fixes: APLO-237 Producer transaction using open wire causes hawtdispatch queue assertion error.
Many thanks to Christian Posta for his contributed test case.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1391573&r1=1391572&r2=1391573&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri Sep 28 17:54:02 2012
@@ -1210,7 +1210,7 @@ class OpenwireProtocolHandler extends Pr
}
if( uow!=null ) {
- uow.on_complete(onComplete)
+ uow.on_complete(dispatchQueue{ onComplete })
uow.release
} else {
onComplete
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala?rev=1391573&r1=1391572&r2=1391573&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala Fri Sep 28 17:54:02 2012
@@ -24,7 +24,7 @@ import javax.jms.{TextMessage, Message,
*/
class TransactionTest extends OpenwireTestSupport {
- test("Simple JMS Transaction Test") {
+ test("Simple JMS Consumer Transaction Test") {
connect()
val dest = queue(next_id("example"))
@@ -59,4 +59,45 @@ class TransactionTest extends OpenwireTe
consumer_session.commit
}
-}
\ No newline at end of file
+ test("Simple JMS Producer Transaction Test"){
+ connect()
+ val dest = queue(next_id("example"))
+
+ val producer_session = default_connection.createSession(true, Session.SESSION_TRANSACTED)
+ val producer = producer_session.createProducer(dest)
+
+ val messages = List(producer_session.createTextMessage("one"), producer_session.createTextMessage("two"), producer_session.createTextMessage("three"))
+
+ producer.send(messages(0))
+
+ val consumer_session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val consumer = consumer_session.createConsumer(dest)
+
+ // should not have anything here
+ var m = consumer.receive(1000).asInstanceOf[TextMessage]
+ m should be (null)
+
+ // commit so consumer can see it
+ producer_session.commit()
+
+ m = consumer.receive(1000).asInstanceOf[TextMessage]
+ m should not be (null)
+ m.getText should equal(messages(0).getText)
+
+ producer.send(messages(1))
+ producer_session.rollback()
+ producer.send(messages(2))
+ producer_session.commit()
+
+ val m3 = consumer.receive(1000).asInstanceOf[TextMessage]
+ m3 should not be (null)
+ m3.getText should equal(messages(2).getText)
+
+ }
+
+}
+
+class OpenwireLevelDBTransactionTest extends TransactionTest {
+ override def broker_config_uri = "xml:classpath:apollo-openwire-leveldb.xml"
+
+}