You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/03/16 15:56:20 UTC

svn commit: r923787 [2/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/ main/cms/ test-integration/ test-integration/activemq/test/ test-integration/activemq/test/openwire/ test/activemq/core/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp?rev=923787&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp Tue Mar 16 14:56:19 2010
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "QueueBrowserTest.h"
+
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/util/CMSListener.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <cms/Message.h>
+#include <cms/TextMessage.h>
+#include <cms/Session.h>
+
+#include <decaf/lang/Integer.h>
+
+#include <memory>
+
+using namespace activemq;
+using namespace activemq::test;
+using namespace activemq::core;
+using namespace decaf;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+QueueBrowserTest::QueueBrowserTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+QueueBrowserTest::~QueueBrowserTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void QueueBrowserTest::testReceiveBrowseReceive() {
+
+    cms::Session* session( cmsProvider->getSession() );
+
+    std::auto_ptr<cms::Queue> queue( session->createQueue("testReceiveBrowseReceive") );
+
+    std::auto_ptr<cms::MessageConsumer> consumer( session->createConsumer( queue.get() ) );
+    std::auto_ptr<cms::MessageProducer> producer( session->createProducer( queue.get() ) );
+
+    std::auto_ptr<cms::TextMessage> message1( session->createTextMessage( "First Message" ) );
+    std::auto_ptr<cms::TextMessage> message2( session->createTextMessage( "Second Message" ) );
+    std::auto_ptr<cms::TextMessage> message3( session->createTextMessage( "Third Message" ) );
+
+    // lets consume any outstanding messages from previous test runs
+    cms::Message* message;
+    while( ( message = consumer->receive( 1000 ) ) != NULL ) {
+        delete message;
+    }
+
+    producer->send( message1.get() );
+    producer->send( message2.get() );
+    producer->send( message3.get() );
+
+    // Get the first.
+    std::auto_ptr<cms::TextMessage> inbound(
+        dynamic_cast<cms::TextMessage*>( consumer->receive( 1000 ) ) );
+    CPPUNIT_ASSERT( inbound.get() != NULL );
+    CPPUNIT_ASSERT_EQUAL( message1->getText(), inbound->getText() );
+    consumer->close();
+
+    std::auto_ptr<cms::QueueBrowser> browser( session->createBrowser( queue.get() ) );
+    cms::MessageEnumeration* enumeration = browser->getEnumeration();
+
+    // browse the second
+    CPPUNIT_ASSERT_MESSAGE( "should have received the second message", enumeration->hasMoreMessages() );
+    inbound.reset( dynamic_cast<cms::TextMessage*>( enumeration->nextMessage() ) );
+    CPPUNIT_ASSERT( inbound.get() != NULL );
+    CPPUNIT_ASSERT_EQUAL( message2->getText(), inbound->getText() );
+
+    // browse the third.
+    CPPUNIT_ASSERT_MESSAGE( "should have received the third message", enumeration->hasMoreMessages() );
+    inbound.reset( dynamic_cast<cms::TextMessage*>( enumeration->nextMessage() ) );
+    CPPUNIT_ASSERT( inbound.get() != NULL );
+    CPPUNIT_ASSERT_EQUAL( message3->getText(), inbound->getText() );
+
+    // There should be no more.
+    bool tooMany = false;
+    while( enumeration->hasMoreMessages() ) {
+        tooMany = true;
+    }
+    CPPUNIT_ASSERT_MESSAGE( "Should not have browsed any more messages", !tooMany );
+    browser->close();
+
+    // Re-open the consumer
+    consumer.reset( session->createConsumer( queue.get() ) );
+    // Receive the second.
+    inbound.reset( dynamic_cast<cms::TextMessage*>( consumer->receive( 1000 ) ) );
+    CPPUNIT_ASSERT( inbound.get() != NULL );
+    CPPUNIT_ASSERT_EQUAL( message2->getText(), inbound->getText() );
+    // Receive the third.
+    inbound.reset( dynamic_cast<cms::TextMessage*>( consumer->receive( 1000 ) ) );
+    CPPUNIT_ASSERT( inbound.get() != NULL );
+    CPPUNIT_ASSERT_EQUAL( message3->getText(), inbound->getText() );
+
+    consumer->close();
+    browser->close();
+    producer->close();
+    cmsProvider->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void QueueBrowserTest::testBrowseReceive() {
+
+    std::auto_ptr<cms::TextMessage> inbound;
+
+    cms::Session* session( cmsProvider->getSession() );
+
+    std::auto_ptr<cms::Queue> queue( session->createQueue("testBrowseReceive") );
+
+    std::auto_ptr<cms::TextMessage> message1( session->createTextMessage( "First Message" ) );
+
+    std::auto_ptr<cms::MessageProducer> producer( session->createProducer( queue.get() ) );
+
+    producer->send( message1.get() );
+
+    // create browser first
+    std::auto_ptr<cms::QueueBrowser> browser( session->createBrowser( queue.get() ) );
+    cms::MessageEnumeration* enumeration = browser->getEnumeration();
+
+    // create consumer
+    std::auto_ptr<cms::MessageConsumer> consumer( session->createConsumer( queue.get() ) );
+
+    // browse the first message
+    CPPUNIT_ASSERT_MESSAGE( "should have received the first message", enumeration->hasMoreMessages() );
+    inbound.reset( dynamic_cast<cms::TextMessage*>( enumeration->nextMessage() ) );
+    CPPUNIT_ASSERT( inbound.get() != NULL );
+    CPPUNIT_ASSERT_EQUAL( message1->getText(), inbound->getText() );
+
+    // Receive the first message.
+    inbound.reset( dynamic_cast<cms::TextMessage*>( consumer->receive( 1000 ) ) );
+    CPPUNIT_ASSERT( inbound.get() != NULL );
+    CPPUNIT_ASSERT_EQUAL( message1->getText(), inbound->getText() );
+
+    consumer->close();
+    browser->close();
+    producer->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void QueueBrowserTest::testQueueBrowserWith2Consumers() {
+
+    static const int numMessages = 100;
+
+    ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>( cmsProvider->getConnection() );
+    CPPUNIT_ASSERT( connection != NULL );
+    connection->setAlwaysSyncSend( false );
+
+    std::auto_ptr<cms::Session> session( connection->createSession( cms::Session::CLIENT_ACKNOWLEDGE ) );
+    std::auto_ptr<cms::Queue> queue( session->createQueue("testQueueBrowserWith2Consumers") );
+
+    std::auto_ptr<cms::Queue> queuePrefetch10(
+        session->createQueue("testQueueBrowserWith2Consumers?consumer.prefetchSize=10") );
+    std::auto_ptr<cms::Queue> queuePrefetch1(
+        session->createQueue("testQueueBrowserWith2Consumers?consumer.prefetchSize=1") );
+
+    std::auto_ptr<ActiveMQConnection> connection2( dynamic_cast<ActiveMQConnection*>(
+                    cmsProvider->getConnectionFactory()->createConnection() ) );
+    connection2->start();
+
+    std::auto_ptr<cms::Session> session2( connection2->createSession( cms::Session::AUTO_ACKNOWLEDGE ) );
+
+    std::auto_ptr<cms::MessageProducer> producer( session->createProducer( queue.get() ) );
+    std::auto_ptr<cms::MessageConsumer> consumer( session->createConsumer( queuePrefetch10.get() ) );
+
+    producer->setDeliveryMode( cms::DeliveryMode::NON_PERSISTENT );
+
+    for( int i = 0; i < numMessages; i++ ) {
+        std::auto_ptr<cms::TextMessage> message(
+            session->createTextMessage( std::string( "Message: " ) + Integer::toString( i ) ) );
+        producer->send( message.get() );
+    }
+
+    std::auto_ptr<cms::QueueBrowser> browser( session2->createBrowser( queuePrefetch1.get() ) );
+    cms::MessageEnumeration* browserView = browser->getEnumeration();
+
+    std::vector<cms::Message*> messages;
+    for( int i = 0; i < numMessages; i++ ) {
+        cms::Message* m1 = consumer->receive( 5000 );
+        CPPUNIT_ASSERT_MESSAGE( std::string( "m1 is null for index: " ) + Integer::toString( i ), m1 != NULL );
+        messages.push_back( m1 );
+    }
+
+    for( int i = 0 ; i < numMessages && browserView->hasMoreMessages(); i++ ) {
+        cms::Message* m1 = messages[i];
+        cms::Message* m2 = browserView->nextMessage();
+        CPPUNIT_ASSERT_MESSAGE( std::string( "m2 is null for index: " ) + Integer::toString( i ), m2 != NULL );
+        CPPUNIT_ASSERT( m1->getCMSMessageID() == m2->getCMSMessageID() );
+        delete m2;
+    }
+
+    CPPUNIT_ASSERT_MESSAGE( "nothing left in the browser", !browserView->hasMoreMessages() );
+    CPPUNIT_ASSERT_MESSAGE( "consumer finished", consumer->receiveNoWait() == NULL );
+
+    for( std::size_t ix = 0; ix < messages.size(); ++ix ) {
+        cms::Message* msg = messages[ix];
+        msg->acknowledge();
+        delete msg;
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h?rev=923787&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h Tue Mar 16 14:56:19 2010
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_QUEUEBROWSERTEST_H_
+#define _ACTIVEMQ_TEST_QUEUEBROWSERTEST_H_
+
+#include <activemq/test/CMSTestFixture.h>
+#include <activemq/util/IntegrationCommon.h>
+
+namespace activemq {
+namespace test {
+
+    class QueueBrowserTest : public CMSTestFixture {
+    public:
+
+        QueueBrowserTest();
+        virtual ~QueueBrowserTest();
+
+        void testReceiveBrowseReceive();
+        void testBrowseReceive();
+        void testQueueBrowserWith2Consumers();
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_TEST_QUEUEBROWSERTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.cpp?rev=923787&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.cpp Tue Mar 16 14:56:19 2010
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireQueueBrowserTest.h"
+
+using namespace activemq;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireQueueBrowserTest::OpenwireQueueBrowserTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireQueueBrowserTest::~OpenwireQueueBrowserTest() {
+}
+

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h?rev=923787&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h Tue Mar 16 14:56:19 2010
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIREQUEUEBROWSERTEST_H_
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIREQUEUEBROWSERTEST_H_
+
+#include <activemq/test/QueueBrowserTest.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+    class OpenwireQueueBrowserTest : public QueueBrowserTest {
+    private:
+
+        CPPUNIT_TEST_SUITE( OpenwireQueueBrowserTest );
+        CPPUNIT_TEST( testReceiveBrowseReceive );
+        CPPUNIT_TEST( testBrowseReceive );
+        CPPUNIT_TEST( testQueueBrowserWith2Consumers );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        OpenwireQueueBrowserTest();
+        virtual ~OpenwireQueueBrowserTest();
+
+        virtual std::string getBrokerURL() const {
+            return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();// +
+                         //   "?transport.commandTracingEnabled=true";
+        }
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIREQUEUEBROWSERTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp Tue Mar 16 14:56:19 2010
@@ -178,7 +178,7 @@ void ActiveMQConnectionFactoryTest::test
         std::auto_ptr<ActiveMQProducer> producer( dynamic_cast<ActiveMQProducer*>(
             session->createProducer( NULL ) ) );
 
-        CPPUNIT_ASSERT( producer->getProducerInfo().getWindowSize() == 65536 );
+        CPPUNIT_ASSERT( producer->getProducerInfo()->getWindowSize() == 65536 );
 
         return;
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp Tue Mar 16 14:56:19 2010
@@ -173,13 +173,13 @@ void ActiveMQSessionTest::testAutoAcking
     consumer1->setMessageListener( &msgListener1 );
     consumer2->setMessageListener( &msgListener2 );
 
-    injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
+    injectTextMessage( "This is a Test 1" , *topic1, *( consumer1->getConsumerId() ) );
 
     msgListener1.asyncWaitForMessages( 1 );
 
     CPPUNIT_ASSERT( msgListener1.messages.size() == 1 );
 
-    injectTextMessage( "This is a Test 2" , *topic2, consumer2->getConsumerId() );
+    injectTextMessage( "This is a Test 2" , *topic2, *( consumer2->getConsumerId() ) );
 
     msgListener2.asyncWaitForMessages( 1 );
 
@@ -234,7 +234,7 @@ void ActiveMQSessionTest::testClientAck(
     consumer1->setMessageListener( &msgListener1 );
     consumer2->setMessageListener( &msgListener2 );
 
-    injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
+    injectTextMessage( "This is a Test 1" , *topic1, *( consumer1->getConsumerId() ) );
 
     msgListener1.asyncWaitForMessages( 1 );
 
@@ -242,7 +242,7 @@ void ActiveMQSessionTest::testClientAck(
 
     msgListener1.messages[0]->acknowledge();
 
-    injectTextMessage( "This is a Test 2" , *topic2, consumer2->getConsumerId() );
+    injectTextMessage( "This is a Test 2" , *topic2, *( consumer2->getConsumerId() ) );
 
     msgListener2.asyncWaitForMessages( 1 );
 
@@ -292,7 +292,7 @@ void ActiveMQSessionTest::testTransactio
     consumer1->setMessageListener( &msgListener1 );
 
     for( int i = 0; i < MSG_COUNT; ++i ) {
-        injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
+        injectTextMessage( "This is a Test 1" , *topic1, *( consumer1->getConsumerId() ) );
     }
 
     msgListener1.asyncWaitForMessages( MSG_COUNT );
@@ -349,13 +349,13 @@ void ActiveMQSessionTest::testTransactio
     consumer1->setMessageListener( &msgListener1 );
     consumer2->setMessageListener( &msgListener2 );
 
-    injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
+    injectTextMessage( "This is a Test 1" , *topic1, *( consumer1->getConsumerId() ) );
 
     msgListener1.asyncWaitForMessages( 1 );
 
     CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener1.messages.size() );
 
-    injectTextMessage( "This is a Test 2" , *topic2, consumer2->getConsumerId() );
+    injectTextMessage( "This is a Test 2" , *topic2, *( consumer2->getConsumerId() ) );
 
     msgListener2.asyncWaitForMessages( 1 );
 
@@ -409,7 +409,7 @@ void ActiveMQSessionTest::testTransactio
     for( unsigned int i = 0; i < msgCount; ++i ) {
         std::ostringstream stream;
         stream << "This is test message #" << i << std::ends;
-        injectTextMessage( stream.str() , *topic1, consumer1->getConsumerId() );
+        injectTextMessage( stream.str() , *topic1, *( consumer1->getConsumerId() ) );
     }
 
     msgListener1.asyncWaitForMessages( msgCount );
@@ -470,13 +470,13 @@ void ActiveMQSessionTest::testTransactio
     for( unsigned int i = 0; i < msgCount; ++i ) {
         std::ostringstream stream;
         stream << "This is test message #" << i << std::ends;
-        injectTextMessage( stream.str() , *topic1, consumer1->getConsumerId() );
+        injectTextMessage( stream.str() , *topic1, *( consumer1->getConsumerId() ) );
     }
 
     for( unsigned int i = 0; i < msgCount; ++i ) {
         std::ostringstream stream;
         stream << "This is test message #" << i << std::ends;
-        injectTextMessage( stream.str() , *topic2, consumer2->getConsumerId() );
+        injectTextMessage( stream.str() , *topic2, *( consumer2->getConsumerId() ) );
     }
 
     msgListener1.asyncWaitForMessages( msgCount );
@@ -535,7 +535,7 @@ void ActiveMQSessionTest::testTransactio
     consumer1->setMessageListener( &msgListener1 );
 
     for( int i = 0; i < MSG_COUNT; ++i ) {
-        injectTextMessage( "This is a Test 1" , *topic1, consumer1->getConsumerId() );
+        injectTextMessage( "This is a Test 1" , *topic1, *( consumer1->getConsumerId() ) );
     }
 
     msgListener1.asyncWaitForMessages( MSG_COUNT );
@@ -585,7 +585,7 @@ void ActiveMQSessionTest::testExpiration
 
     injectTextMessage( "This is a Test 1" ,
                        *topic1,
-                       consumer1->getConsumerId(),
+                       *( consumer1->getConsumerId() ),
                        decaf::lang::System::currentTimeMillis(),
                        50 );
 
@@ -595,7 +595,7 @@ void ActiveMQSessionTest::testExpiration
 
     injectTextMessage( "This is a Test 2" ,
                        *topic2,
-                       consumer2->getConsumerId(),
+                       *( consumer2->getConsumerId() ),
                        decaf::lang::System::currentTimeMillis() - 100,
                        1 );