You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/02 02:38:03 UTC

svn commit: r1498748 - in /activemq/activemq-cpp/branches/3.7.x: ./ activemq-cpp/src/main/activemq/core/kernels/ activemq-cpp/src/test-integration/activemq/test/ activemq-cpp/src/test-integration/activemq/test/openwire/ activemq-cpp/src/test/activemq/c...

Author: tabish
Date: Tue Jul  2 00:38:02 2013
New Revision: 1498748

URL: http://svn.apache.org/r1498748
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-494

Modified:
    activemq/activemq-cpp/branches/3.7.x/   (props changed)
    activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
    activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
    activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/TransactionTest.cpp
    activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/TransactionTest.h
    activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTransactionTest.h
    activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp
    activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h

Propchange: activemq/activemq-cpp/branches/3.7.x/
------------------------------------------------------------------------------
  Merged /activemq/activemq-cpp/trunk:r1494715-1498566,1498568-1498611,1498613-1498746

Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1498748&r1=1498747&r2=1498748&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Tue Jul  2 00:38:02 2013
@@ -97,6 +97,7 @@ namespace kernels {
         decaf::util::concurrent::Mutex listenerMutex;
         AtomicBoolean deliveringAcks;
         AtomicBoolean started;
+        AtomicBoolean closeSyncRegistered;
         Pointer<MessageDispatchChannel> unconsumedMessages;
         decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
         long long lastDeliveredSequenceId;
@@ -133,6 +134,7 @@ namespace kernels {
                                          listenerMutex(),
                                          deliveringAcks(),
                                          started(),
+                                         closeSyncRegistered(),
                                          unconsumedMessages(),
                                          dispatchedMessages(),
                                          lastDeliveredSequenceId(0),
@@ -340,7 +342,7 @@ namespace {
     class TransactionSynhcronization : public Synchronization {
     private:
 
-        ActiveMQConsumerKernel* consumer;
+        Pointer<ActiveMQConsumerKernel> consumer;
         ActiveMQConsumerKernelConfig* impl;
 
     private:
@@ -350,7 +352,7 @@ namespace {
 
     public:
 
-        TransactionSynhcronization(ActiveMQConsumerKernel* consumer, ActiveMQConsumerKernelConfig* impl) :
+        TransactionSynhcronization(Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
             Synchronization(), consumer(consumer), impl(impl) {
 
             if (consumer == NULL) {
@@ -377,11 +379,13 @@ namespace {
         virtual void afterCommit() {
             consumer->commit();
             consumer->setSynchronizationRegistered(false);
+            consumer.reset(NULL);
         }
 
         virtual void afterRollback() {
             consumer->rollback();
             consumer->setSynchronizationRegistered(false);
+            consumer.reset(NULL);
         }
     };
 
@@ -393,7 +397,7 @@ namespace {
     class CloseSynhcronization : public Synchronization {
     private:
 
-        ActiveMQConsumerKernel* consumer;
+        Pointer<ActiveMQConsumerKernel> consumer;
 
     private:
 
@@ -402,7 +406,7 @@ namespace {
 
     public:
 
-        CloseSynhcronization(ActiveMQConsumerKernel* consumer) : consumer(consumer) {
+        CloseSynhcronization(Pointer<ActiveMQConsumerKernel> consumer) : consumer(consumer) {
             if (consumer == NULL) {
                 throw NullPointerException(__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
             }
@@ -415,10 +419,12 @@ namespace {
 
         virtual void afterCommit() {
             consumer->doClose();
+            consumer.reset(NULL);
         }
 
         virtual void afterRollback() {
             consumer->doClose();
+            consumer.reset(NULL);
         }
     };
 
@@ -787,9 +793,12 @@ void ActiveMQConsumerKernel::close() {
         if (!this->isClosed()) {
 
             if (this->session->getTransactionContext() != NULL &&
-                this->session->getTransactionContext()->isInTransaction()) {
+                this->session->getTransactionContext()->isInTransaction() &&
+                this->internal->closeSyncRegistered.compareAndSet(false, true)) {
 
-                Pointer<Synchronization> sync(new CloseSynhcronization(this));
+                Pointer<ActiveMQConsumerKernel> self =
+                    this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+                Pointer<Synchronization> sync(new CloseSynhcronization(self));
                 this->session->getTransactionContext()->addSynchronization(sync);
             } else {
                 doClose();
@@ -1134,7 +1143,9 @@ void ActiveMQConsumerKernel::registerSyn
     this->session->doStartTransaction();
     if (!this->internal->synchronizationRegistered) {
         this->internal->synchronizationRegistered = true;
-        Pointer<Synchronization> sync(new TransactionSynhcronization(this, this->internal));
+        Pointer<ActiveMQConsumerKernel> self =
+            this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+        Pointer<Synchronization> sync(new TransactionSynhcronization(self, this->internal));
         this->session->getTransactionContext()->addSynchronization(sync);
     }
 }

Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1498748&r1=1498747&r2=1498748&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Tue Jul  2 00:38:02 2013
@@ -529,7 +529,7 @@ namespace kernels {
         Pointer<ActiveMQProducerKernel> lookupProducerKernel(Pointer<commands::ProducerId> id);
 
         /**
-         * @returns a Pointer to an ActiveMQProducerKernel using its ProducerId, or NULL.
+         * @returns a Pointer to an ActiveMQConsumerKernel using its ConsumerId, or NULL.
          */
         Pointer<ActiveMQConsumerKernel> lookupConsumerKernel(Pointer<commands::ConsumerId> id);
 

Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/TransactionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/TransactionTest.cpp?rev=1498748&r1=1498747&r2=1498748&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/TransactionTest.cpp (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/TransactionTest.cpp Tue Jul  2 00:38:02 2013
@@ -17,6 +17,7 @@
 
 #include "TransactionTest.h"
 
+#include <activemq/core/ActiveMQConnectionFactory.h>
 #include <activemq/util/CMSListener.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <stdexcept>
@@ -24,11 +25,20 @@
 using namespace std;
 using namespace cms;
 using namespace activemq;
+using namespace activemq::core;
 using namespace activemq::test;
 using namespace activemq::util;
 using namespace activemq::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
+TransactionTest::TransactionTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TransactionTest::~TransactionTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void TransactionTest::testSendReceiveTransactedBatches() {
 
     try {
@@ -248,3 +258,33 @@ void TransactionTest::testWithTTLSet() {
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void TransactionTest::testSessionCommitAfterConsumerClosed() {
+
+    ActiveMQConnectionFactory factory(getBrokerURL());
+    auto_ptr<cms::Connection> connection(factory.createConnection());
+
+    {
+        auto_ptr<cms::Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
+        auto_ptr<cms::Queue> queue(session->createQueue("testSessionCommitAfterConsumerClosed"));
+        auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
+
+        auto_ptr<cms::Message> message(session->createTextMessage("Hello"));
+        producer->send(message.get());
+        producer->close();
+        session->close();
+    }
+
+    auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED));
+    auto_ptr<cms::Queue> queue(session->createQueue("testSessionCommitAfterConsumerClosed"));
+    auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get()));
+
+    connection->start();
+
+    auto_ptr<cms::Message> message(consumer->receive(5000));
+    CPPUNIT_ASSERT(message.get() != NULL);
+
+    consumer->close();
+    session->commit();
+}

Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/TransactionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/TransactionTest.h?rev=1498748&r1=1498747&r2=1498748&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/TransactionTest.h (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/TransactionTest.h Tue Jul  2 00:38:02 2013
@@ -32,8 +32,8 @@ namespace test{
 
     public:
 
-        TransactionTest() {}
-        virtual ~TransactionTest() {}
+        TransactionTest();
+        virtual ~TransactionTest();
 
         virtual void setUp() {
             cmsProvider.reset(
@@ -45,6 +45,7 @@ namespace test{
         void testSendSessionClose();
         void testWithTTLSet();
         void testSendRollbackCommitRollback();
+        void testSessionCommitAfterConsumerClosed();
 
     };
 

Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTransactionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTransactionTest.h?rev=1498748&r1=1498747&r2=1498748&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTransactionTest.h (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTransactionTest.h Tue Jul  2 00:38:02 2013
@@ -31,6 +31,7 @@ namespace openwire{
         CPPUNIT_TEST( testSendRollback );
         CPPUNIT_TEST( testWithTTLSet );
         CPPUNIT_TEST( testSendRollbackCommitRollback );
+        CPPUNIT_TEST( testSessionCommitAfterConsumerClosed );
 //        CPPUNIT_TEST( testSendSessionClose );
         CPPUNIT_TEST_SUITE_END();
 

Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp?rev=1498748&r1=1498747&r2=1498748&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp Tue Jul  2 00:38:02 2013
@@ -42,8 +42,8 @@ using namespace decaf;
 using namespace decaf::lang;
 
 ////////////////////////////////////////////////////////////////////////////////
-namespace activemq{
-namespace core{
+namespace activemq {
+namespace core {
 
     class MyCMSMessageListener : public cms::MessageListener {
     public:
@@ -104,6 +104,14 @@ namespace core{
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
+ActiveMQSessionTest::ActiveMQSessionTest() : connection(), dTransport(), exListener() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSessionTest::~ActiveMQSessionTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionTest::testCreateManyConsumersAndSetListeners() {
 
     MyCMSMessageListener msgListener1;
@@ -541,7 +549,7 @@ void ActiveMQSessionTest::testTransactio
     CPPUNIT_ASSERT_EQUAL(MSG_COUNT, (int)msgListener1.messages.size());
 
     // This is what we are testing, since there was no commit, the session
-    // will rollback the transaction when this are closed.
+    // will rollback the transaction when this is closed.
     // session->commit();
 
     consumer1->close();
@@ -606,77 +614,118 @@ void ActiveMQSessionTest::testExpiration
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionTest::setUp()
-{
-    try
-    {
+void ActiveMQSessionTest::testTransactionCommitAfterConsumerClosed() {
+
+    static const int MSG_COUNT = 50;
+    MyCMSMessageListener msgListener1;
+
+    CPPUNIT_ASSERT(connection.get() != NULL);
+
+    // Create an Transacted Session
+    std::auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED));
+
+    // Create a Topic
+    std::auto_ptr<cms::Topic> topic1(session->createTopic("TestTopic1"));
+
+    CPPUNIT_ASSERT(topic1.get() != NULL);
+
+    // Create a consumer
+    std::auto_ptr<ActiveMQConsumer> consumer1(
+        dynamic_cast<ActiveMQConsumer*>( session->createConsumer( topic1.get() ) ) );
+
+    CPPUNIT_ASSERT(consumer1.get() != NULL);
+    CPPUNIT_ASSERT(consumer1->getMessageSelector() == "");
+    CPPUNIT_ASSERT(consumer1->receiveNoWait() == NULL);
+    CPPUNIT_ASSERT(consumer1->receive(5) == NULL);
+
+    consumer1->setMessageListener(&msgListener1);
+
+    for (int i = 0; i < MSG_COUNT; ++i) {
+        injectTextMessage("This is a Test 1", *topic1, *(consumer1->getConsumerId()));
+    }
+
+    msgListener1.asyncWaitForMessages(MSG_COUNT);
+    CPPUNIT_ASSERT_EQUAL(MSG_COUNT, (int)msgListener1.messages.size());
+
+    consumer1->close();
+    consumer1.reset();
+    session->commit();
+
+    Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
+    std::string text1 = msg1->getText();
+
+    CPPUNIT_ASSERT(text1 == "This is a Test 1");
+    msgListener1.clear();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionTest::setUp() {
+
+    try {
         ActiveMQConnectionFactory factory("mock://127.0.0.1:12345?wireFormat=openwire");
 
-        connection.reset( dynamic_cast< ActiveMQConnection*>( factory.createConnection() ) );
+        connection.reset(dynamic_cast<ActiveMQConnection*>(factory.createConnection()));
 
         // Get a pointer to the Mock Transport for Message injection.
         dTransport = dynamic_cast<transport::mock::MockTransport*>(
-            connection->getTransport().narrow( typeid( transport::mock::MockTransport ) ) );
-        CPPUNIT_ASSERT( dTransport != NULL );
+            connection->getTransport().narrow(typeid(transport::mock::MockTransport)));
+        CPPUNIT_ASSERT(dTransport != NULL);
 
-        connection->setExceptionListener( &exListener );
+        connection->setExceptionListener(&exListener);
         connection->start();
-    }
-    catch(...)
-    {
+    } catch (...) {
         bool exceptionThrown = false;
-
-        CPPUNIT_ASSERT( exceptionThrown );
+        CPPUNIT_ASSERT(exceptionThrown);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionTest::tearDown() {
-    connection.reset( NULL );
+    connection.reset(NULL);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionTest::injectTextMessage( const std::string message,
-                                             const cms::Destination& destination,
-                                             const commands::ConsumerId& id,
-                                             const long long timeStamp,
-                                             const long long timeToLive )
-{
-    Pointer<ActiveMQTextMessage> msg( new ActiveMQTextMessage() );
-
-    Pointer<ProducerId> producerId( new ProducerId() );
-    producerId->setConnectionId( id.getConnectionId() );
-    producerId->setSessionId( id.getSessionId() );
-    producerId->setValue( 1 );
-
-    Pointer<MessageId> messageId( new MessageId() );
-    messageId->setProducerId( producerId );
-    messageId->setProducerSequenceId( 2 );
+void ActiveMQSessionTest::injectTextMessage(const std::string message,
+                                            const cms::Destination& destination,
+                                            const commands::ConsumerId& id,
+                                            const long long timeStamp,
+                                            const long long timeToLive) {
+
+    Pointer<ActiveMQTextMessage> msg(new ActiveMQTextMessage());
+
+    Pointer<ProducerId> producerId(new ProducerId());
+    producerId->setConnectionId(id.getConnectionId());
+    producerId->setSessionId(id.getSessionId());
+    producerId->setValue(1);
+
+    Pointer<MessageId> messageId(new MessageId());
+    messageId->setProducerId(producerId);
+    messageId->setProducerSequenceId(2);
 
     // Init Message
-    msg->setText( message.c_str() );
-    msg->setCMSDestination( &destination );
-    msg->setCMSMessageID( "Id: 123456" );
-    msg->setMessageId( messageId );
+    msg->setText(message.c_str());
+    msg->setCMSDestination(&destination);
+    msg->setCMSMessageID("Id: 123456");
+    msg->setMessageId(messageId);
 
     long long expiration = 0LL;
 
-    if( timeStamp != 0 ) {
-        msg->setCMSTimestamp( timeStamp );
+    if (timeStamp != 0) {
+        msg->setCMSTimestamp(timeStamp);
 
-        if( timeToLive > 0LL ) {
+        if (timeToLive > 0LL) {
             expiration = timeToLive + timeStamp;
         }
     }
 
-    msg->setCMSExpiration( expiration );
+    msg->setCMSExpiration(expiration);
 
     // Send the Message
-    CPPUNIT_ASSERT( dTransport != NULL );
+    CPPUNIT_ASSERT(dTransport != NULL);
 
-    Pointer<MessageDispatch> dispatch( new MessageDispatch() );
-    dispatch->setMessage( msg );
-    dispatch->setConsumerId( Pointer<ConsumerId>( id.cloneDataStructure() ) );
+    Pointer<MessageDispatch> dispatch(new MessageDispatch());
+    dispatch->setMessage(msg);
+    dispatch->setConsumerId(Pointer<ConsumerId>(id.cloneDataStructure()));
 
-    dTransport->fireCommand( dispatch );
+    dTransport->fireCommand(dispatch);
 }

Modified: activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h?rev=1498748&r1=1498747&r2=1498748&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h (original)
+++ activemq/activemq-cpp/branches/3.7.x/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h Tue Jul  2 00:38:02 2013
@@ -36,13 +36,14 @@
 namespace activemq{
 namespace core{
 
-    class ActiveMQSessionTest : public CppUnit::TestFixture
-    {
+    class ActiveMQSessionTest : public CppUnit::TestFixture {
+
         CPPUNIT_TEST_SUITE( ActiveMQSessionTest );
         CPPUNIT_TEST( testAutoAcking );
         CPPUNIT_TEST( testClientAck );
         CPPUNIT_TEST( testTransactionCommitOneConsumer );
         CPPUNIT_TEST( testTransactionCommitTwoConsumer );
+        CPPUNIT_TEST( testTransactionCommitAfterConsumerClosed );
         CPPUNIT_TEST( testTransactionRollbackOneConsumer );
         CPPUNIT_TEST( testTransactionRollbackTwoConsumer );
         CPPUNIT_TEST( testTransactionCloseWithoutCommit );
@@ -76,20 +77,21 @@ namespace core{
         ActiveMQSessionTest(const ActiveMQSessionTest&);
         ActiveMQSessionTest& operator= (const ActiveMQSessionTest&);
 
-    public:    // CPPUNIT Method Overrides.
+    public:
+
+        virtual void setUp();
+        virtual void tearDown();
 
-        void setUp();
-        void tearDown();
-        void injectTextMessage( const std::string message,
-                                const cms::Destination& destination,
-                                const commands::ConsumerId& id,
-                                const long long timeStamp = -1,
-                                const long long timeToLive = -1 );
+        void injectTextMessage(const std::string message,
+                               const cms::Destination& destination,
+                               const commands::ConsumerId& id,
+                               const long long timeStamp = -1,
+                               const long long timeToLive = -1);
 
     public:
 
-        ActiveMQSessionTest() : connection(), dTransport(), exListener() {}
-        virtual ~ActiveMQSessionTest() {}
+        ActiveMQSessionTest();
+        virtual ~ActiveMQSessionTest();
 
         void testAutoAcking();
         void testClientAck();
@@ -99,6 +101,7 @@ namespace core{
         void testTransactionRollbackOneConsumer();
         void testTransactionRollbackTwoConsumer();
         void testTransactionCloseWithoutCommit();
+        void testTransactionCommitAfterConsumerClosed();
         void testExpiration();
 
     };