You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2007/03/11 15:17:15 UTC
svn commit: r516905 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration: ./
integration/connector/openwire/
Author: nmittler
Date: Sun Mar 11 07:17:14 2007
New Revision: 516905
URL: http://svn.apache.org/viewvc?view=rev&rev=516905
Log:
adding first cut at openwire integration tests
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireDurableTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireDurableTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleRollbackTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleRollbackTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?view=diff&rev=516905&r1=516904&r2=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am Sun Mar 11 07:17:14 2007
@@ -18,6 +18,12 @@
cc_sources = \
integration/IntegrationCommon.cpp \
integration/TestSupport.cpp \
+ integration/connector/openwire/OpenwireAsyncSenderTest.cpp \
+ integration/connector/openwire/OpenwireDurableTest.cpp \
+ integration/connector/openwire/OpenwireExpirationTest.cpp \
+ integration/connector/openwire/OpenwireSimpleRollbackTest.cpp \
+ integration/connector/openwire/OpenwireSimpleTest.cpp \
+ integration/connector/openwire/OpenwireTransactionTest.cpp \
integration/connector/stomp/AsyncSenderTest.cpp \
integration/connector/stomp/DurableTest.cpp \
integration/connector/stomp/ExpirationTest.cpp \
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.cpp Sun Mar 11 07:17:14 2007
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireAsyncSenderTest.h"
+
+#include <integration/IntegrationCommon.h>
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace integration;
+using namespace integration::connector::openwire;
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::openwire::OpenwireAsyncSenderTest );
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireAsyncSenderTest::OpenwireAsyncSenderTest()
+:
+ testSupport("tcp://localhost:61616?wireFormat=openwire&useAsyncSend=true")
+{
+ testSupport.initialize();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireAsyncSenderTest::~OpenwireAsyncSenderTest()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireAsyncSenderTest::test()
+{
+ try
+ {
+ if( IntegrationCommon::debug ) {
+ cout << "Starting activemqcms test (sending "
+ << IntegrationCommon::defaultMsgCount
+ << " messages per type and sleeping "
+ << IntegrationCommon::defaultDelay
+ << " milli-seconds) ...\n"
+ << endl;
+ }
+
+ // Create CMS Object for Comms
+ cms::Session* session = testSupport.getSession();
+ cms::Topic* topic = testSupport.getSession()->createTopic("mytopic");
+ cms::MessageConsumer* consumer =
+ session->createConsumer( topic );
+ consumer->setMessageListener( &testSupport );
+ cms::MessageProducer* producer =
+ session->createProducer( topic );
+
+ // Send some text messages
+ testSupport.produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount );
+
+ // Send some bytes messages.
+ testSupport.produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount );
+
+ // Wait for the messages to get here
+ testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
+
+ unsigned int numReceived = testSupport.getNumReceived();
+ if( IntegrationCommon::debug ) {
+ printf("received: %d\n", numReceived );
+ }
+ CPPUNIT_ASSERT(
+ numReceived == IntegrationCommon::defaultMsgCount * 2 );
+
+ if( IntegrationCommon::debug ) {
+ printf("Shutting Down\n" );
+ }
+ delete producer;
+ delete consumer;
+ delete topic;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.h?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireAsyncSenderTest.h Sun Mar 11 07:17:14 2007
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIREASYNCSENDERTEST_H_
+#define _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIREASYNCSENDERTEST_H_
+
+#include <integration/TestSupport.h>
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace integration{
+namespace connector{
+namespace openwire{
+
+ class OpenwireAsyncSenderTest : public CppUnit::TestFixture {
+
+ CPPUNIT_TEST_SUITE( OpenwireAsyncSenderTest );
+ CPPUNIT_TEST( test );
+ CPPUNIT_TEST_SUITE_END();
+
+ private:
+
+ TestSupport testSupport;
+
+ public:
+
+ OpenwireAsyncSenderTest();
+ virtual ~OpenwireAsyncSenderTest();
+
+ virtual void test();
+
+ };
+
+}}}
+
+#endif /*_INTEGRATION_CONNECTOR_OPENWIRE_OPENWIREASYNCSENDERTEST_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireDurableTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireDurableTest.cpp?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireDurableTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireDurableTest.cpp Sun Mar 11 07:17:14 2007
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireDurableTest.h"
+
+#include <integration/IntegrationCommon.h>
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::openwire::OpenwireDurableTest );
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace integration;
+using namespace integration::connector::openwire;
+
+OpenwireDurableTest::OpenwireDurableTest()
+:
+ testSupport("tcp://localhost:61616?wireFormat=openwire")
+{
+ testSupport.initialize();
+}
+
+OpenwireDurableTest::~OpenwireDurableTest()
+{}
+
+void OpenwireDurableTest::test()
+{
+ try
+ {
+ if( IntegrationCommon::debug ) {
+ cout << "Starting activemqcms durable test (sending "
+ << IntegrationCommon::defaultMsgCount
+ << " messages per type and sleeping "
+ << IntegrationCommon::defaultDelay
+ << " milli-seconds) ...\n"
+ << endl;
+ }
+
+ std::string subName = Guid().createGUID();
+
+ // Create CMS Object for Comms
+ cms::Session* session = testSupport.getSession();
+ cms::Topic* topic = session->createTopic(Guid::createGUIDString());
+ cms::MessageConsumer* consumer =
+ session->createDurableConsumer( topic, subName, "" );
+ consumer->setMessageListener( &testSupport );
+ cms::MessageProducer* producer =
+ session->createProducer( topic );
+
+ unsigned int sent;
+
+ // Send some text messages
+ sent = testSupport.produceTextMessages( *producer, 3 );
+
+ // Wait for all messages
+ testSupport.waitForMessages( sent );
+
+ unsigned int numReceived = testSupport.getNumReceived();
+
+ if( IntegrationCommon::debug ) {
+ printf("received: %d\n", numReceived );
+ }
+
+ CPPUNIT_ASSERT_EQUAL( sent, numReceived );
+
+ // Nuke the consumer
+ delete consumer;
+
+ // Send some text messages
+ sent += testSupport.produceTextMessages( *producer, 3 );
+
+ consumer = session->createDurableConsumer( topic, subName, "" );
+ consumer->setMessageListener( &testSupport );
+
+ // Send some text messages
+ sent += testSupport.produceTextMessages( *producer, 3 );
+
+ // Wait for all remaining messages
+ testSupport.waitForMessages( sent );
+
+ numReceived = testSupport.getNumReceived();
+ if( IntegrationCommon::debug ) {
+ printf("received: %d\n", numReceived );
+ }
+ CPPUNIT_ASSERT_EQUAL( sent, numReceived );
+
+ if( IntegrationCommon::debug ) {
+ printf("Shutting Down\n" );
+ }
+ delete producer;
+ delete consumer;
+ delete topic;
+ }
+ catch( ActiveMQException& ex ) {
+ CPPUNIT_ASSERT_MESSAGE( ex.getStackTraceString(), false );
+ }
+}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireDurableTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireDurableTest.h?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireDurableTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireDurableTest.h Sun Mar 11 07:17:14 2007
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIREDURABLETESTER_H_
+#define _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIREDURABLETESTER_H_
+
+#include <integration/TestSupport.h>
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace integration{
+namespace connector{
+namespace openwire{
+
+ class OpenwireDurableTest : public CppUnit::TestFixture
+ {
+ CPPUNIT_TEST_SUITE( OpenwireDurableTest );
+ CPPUNIT_TEST( test );
+ CPPUNIT_TEST_SUITE_END();
+
+ private:
+
+ TestSupport testSupport;
+
+ public:
+
+ OpenwireDurableTest();
+ virtual ~OpenwireDurableTest();
+
+ virtual void test();
+
+ };
+
+}}}
+
+#endif /*_INTEGRATION_CONNECTOR_OPENWIRE_OPENWIREDURABLETESTER_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.cpp?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.cpp Sun Mar 11 07:17:14 2007
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireExpirationTest.h"
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::openwire::OpenwireExpirationTest );
+
+#include <sstream>
+
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+#include <cms/Session.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace std;
+using namespace integration;
+using namespace integration::connector::openwire;
+
+
+
+OpenwireExpirationTest::Producer::Producer( string topic, int numMessages, long long timeToLive ){
+ connection = NULL;
+ session = NULL;
+ destination = NULL;
+ producer = NULL;
+ this->numMessages = numMessages;
+ this->timeToLive = timeToLive;
+ this->disableTimeStamps = false;
+ this->topic = topic;
+}
+
+OpenwireExpirationTest::Producer::~Producer(){
+ cleanup();
+}
+
+bool OpenwireExpirationTest::Producer::getDisableTimeStamps() const {
+ return disableTimeStamps;
+}
+
+void OpenwireExpirationTest::Producer::setDisableTimeStamps( bool value ) {
+ this->disableTimeStamps = value;
+}
+
+void OpenwireExpirationTest::Producer::run() {
+ try {
+ // Create a ConnectionFactory
+ ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61613?wireFormat=stomp");
+
+ // Create a Connection
+ connection = connectionFactory->createConnection();
+ delete connectionFactory;
+ connection->start();
+
+ string sss=connection->getClientId();
+ cout << sss << endl;
+
+ session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
+ destination = session->createTopic( topic );
+
+ producer = session->createProducer( destination );
+ producer->setDeliveryMode( DeliveryMode::PERSISTENT );
+ producer->setDisableMessageTimeStamp( disableTimeStamps );
+
+ //unsigned long ttt=getcurt();
+ producer->setTimeToLive( 1);
+
+ // Create the Thread Id String
+ string threadIdStr = Integer::toString( Thread::getId() );
+
+ // Create a messages
+ string text = (string)"Hello world! from thread " + threadIdStr;
+
+ for( int ix=0; ix<numMessages; ++ix ){
+ TextMessage* message = session->createTextMessage( text );
+ producer->send( message );
+ delete message;
+ }
+
+ }catch ( CMSException& e ) {
+ e.printStackTrace();
+ }
+ }
+
+void OpenwireExpirationTest::Producer::cleanup(){
+
+ // Destroy resources.
+ try{
+ if( destination != NULL ) delete destination;
+ }catch ( CMSException& e ) {}
+ destination = NULL;
+
+ try{
+ if( producer != NULL ) delete producer;
+ }catch ( CMSException& e ) {}
+ producer = NULL;
+
+ // Close open resources.
+ try{
+ if( session != NULL ) session->close();
+ if( connection != NULL ) connection->close();
+ }catch ( CMSException& e ) {}
+
+ try{
+ if( session != NULL ) delete session;
+ }catch ( CMSException& e ) {}
+ session = NULL;
+
+ try{
+ if( connection != NULL ) delete connection;
+ }catch ( CMSException& e ) {}
+ connection = NULL;
+}
+
+OpenwireExpirationTest::Consumer::Consumer( string topic, long waitMillis ){
+ connection = NULL;
+ session = NULL;
+ destination = NULL;
+ consumer = NULL;
+ this->waitMillis = waitMillis;
+ numReceived = 0;
+ this->topic = topic;
+}
+
+OpenwireExpirationTest::Consumer::~Consumer(){
+ cleanup();
+}
+
+int OpenwireExpirationTest::Consumer::getNumReceived() const{
+ return numReceived;
+}
+
+void OpenwireExpirationTest::Consumer::run() {
+
+ try {
+
+ string user,passwd,sID;
+ user="default";
+ passwd="";
+ sID="lsgID";
+
+ // Create a ConnectionFactory
+ ActiveMQConnectionFactory* connectionFactory =
+ new ActiveMQConnectionFactory("tcp://localhost:61613?wireFormat=stomp",user,passwd,sID);
+
+ // Create a Connection
+ connection = connectionFactory->createConnection();
+ delete connectionFactory;
+ connection->start();
+
+ // Create a Session
+ session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
+
+ // Create the destination (Topic or Queue)
+ string t = topic + "?consumer.retroactive=true";
+
+ destination = session->createTopic( t );
+
+ consumer = session->createConsumer( destination );
+
+ consumer->setMessageListener( this );
+
+ // Sleep while asynchronous messages come in.
+ Thread::sleep( waitMillis );
+
+ } catch (CMSException& e) {
+ e.printStackTrace();
+ }
+}
+
+void OpenwireExpirationTest::Consumer::onMessage( const Message* message ){
+
+ try
+ {
+ const TextMessage* textMessage =
+ dynamic_cast< const TextMessage* >( message );
+ string text = textMessage->getText();
+ numReceived++;
+ } catch (CMSException& e) {
+ e.printStackTrace();
+ }
+}
+
+void OpenwireExpirationTest::Consumer::cleanup(){
+
+ // Destroy resources.
+ try{
+ if( destination != NULL ) delete destination;
+ }catch (CMSException& e) {}
+ destination = NULL;
+
+ try{
+ if( consumer != NULL ) delete consumer;
+ }catch (CMSException& e) {}
+ consumer = NULL;
+
+ // Close open resources.
+ try{
+ if( session != NULL ) session->close();
+ if( connection != NULL ) connection->close();
+ }catch (CMSException& e) {}
+
+ try{
+ if( session != NULL ) delete session;
+ }catch (CMSException& e) {}
+ session = NULL;
+
+ try{
+ if( connection != NULL ) delete connection;
+ }catch (CMSException& e) {}
+ connection = NULL;
+}
+
+void OpenwireExpirationTest::testExpired()
+{
+ string topic = Guid().createGUID();
+ Producer producer( topic, 1, 1 );
+ Thread producerThread( &producer );
+ producerThread.start();
+ producerThread.join();
+
+ Thread::sleep( 100 );
+
+ Consumer consumer( topic, 2000 );
+ Thread consumerThread( &consumer );
+ consumerThread.start();
+ consumerThread.join();
+
+ Thread::sleep( 100 );
+
+ CPPUNIT_ASSERT_EQUAL( 0, consumer.getNumReceived() );
+}
+
+void OpenwireExpirationTest::testNotExpired()
+{
+ string topic = Guid().createGUID();
+ Producer producer( topic, 2, 2000 );
+ producer.setDisableTimeStamps( true );
+ Thread producerThread( &producer );
+ producerThread.start();
+ producerThread.join();
+
+ Consumer consumer( topic, 3000 );
+ Thread consumerThread( &consumer );
+ consumerThread.start();
+ consumerThread.join();
+
+ Thread::sleep( 50 );
+
+ CPPUNIT_ASSERT_EQUAL( 2, consumer.getNumReceived() );
+}
+
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.h?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireExpirationTest.h Sun Mar 11 07:17:14 2007
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIREEXPIRATIONTEST_H_
+#define _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIREEXPIRATIONTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/ConnectionFactory.h>
+#include <cms/Connection.h>
+#include <cms/Session.h>
+#include <cms/MessageProducer.h>
+
+#include <activemq/concurrent/Runnable.h>
+
+namespace integration{
+namespace connector{
+namespace openwire{
+
+ class OpenwireExpirationTest : public CppUnit::TestFixture
+ {
+ CPPUNIT_TEST_SUITE( OpenwireExpirationTest );
+ CPPUNIT_TEST( testExpired );
+ CPPUNIT_TEST( testNotExpired );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ static std::string messageTag;
+
+ public:
+
+ OpenwireExpirationTest(){}
+ virtual ~OpenwireExpirationTest(){}
+
+ virtual void testExpired();
+ virtual void testNotExpired();
+
+ public:
+
+ class Producer : public activemq::concurrent::Runnable {
+ private:
+
+ cms::Connection* connection;
+ cms::Session* session;
+ cms::Topic* destination;
+ cms::MessageProducer* producer;
+ int numMessages;
+ long long timeToLive;
+ bool disableTimeStamps;
+ std::string topic;
+
+ public:
+
+ Producer( std::string topic, int numMessages, long long timeToLive );
+
+ virtual ~Producer();
+
+ virtual bool getDisableTimeStamps() const;
+
+ virtual void setDisableTimeStamps( bool value );
+
+ virtual void run();
+
+ private:
+
+ void cleanup();
+ };
+
+ class Consumer : public cms::MessageListener, public activemq::concurrent::Runnable {
+
+ private:
+
+ cms::Connection* connection;
+ cms::Session* session;
+ cms::Topic* destination;
+ cms::MessageConsumer* consumer;
+ long waitMillis;
+ int numReceived;
+ std::string topic;
+
+ public:
+
+ Consumer( std::string topic, long waitMillis );
+
+ virtual ~Consumer();
+
+ virtual int getNumReceived() const;
+
+ virtual void run();
+
+ virtual void onMessage( const cms::Message* message );
+
+ private:
+
+ void cleanup();
+ };
+ };
+
+}}}
+
+#endif /*_INTEGRATION_CONNECTOR_OPENWIRE_OPENWIREEXPIRATIONTEST_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleRollbackTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleRollbackTest.cpp?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleRollbackTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleRollbackTest.cpp Sun Mar 11 07:17:14 2007
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireSimpleRollbackTest.h"
+
+#include <integration/IntegrationCommon.h>
+
+//CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::openwire::OpenwireSimpleRollbackTest );
+
+#include <sstream>
+
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+#include <activemq/util/Config.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+#include <cms/Session.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace std;
+using namespace integration;
+using namespace integration::connector::openwire;
+
+OpenwireSimpleRollbackTest::OpenwireSimpleRollbackTest()
+{
+ try
+ {
+ string url = "tcp://localhost:61616?wireFormat=openwire";
+ numReceived = 0;
+
+ // Default amount to send and receive
+ msgCount = 1;
+
+ // Create a Factory
+ connectionFactory = new ActiveMQConnectionFactory( url );
+
+ // Now create the connection
+ connection = connectionFactory->createConnection(
+ "", "", Guid().createGUIDString() );
+
+ // Set ourself as a recipient of Exceptions
+ connection->setExceptionListener( this );
+ connection->start();
+
+ // Create a Session
+ session = connection->createSession(
+ cms::Session::SESSION_TRANSACTED );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+OpenwireSimpleRollbackTest::~OpenwireSimpleRollbackTest()
+{
+ try
+ {
+ session->close();
+ connection->close();
+
+ delete session;
+ delete connection;
+ delete connectionFactory;
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
+
+void OpenwireSimpleRollbackTest::test()
+{
+ try
+ {
+ // Create CMS Object for Comms
+ cms::Topic* topic = session->createTopic(Guid::createGUIDString());
+ cms::MessageConsumer* consumer =
+ session->createConsumer( topic );
+ consumer->setMessageListener( this );
+ cms::MessageProducer* producer =
+ session->createProducer( topic );
+
+ cms::TextMessage* textMsg =
+ session->createTextMessage();
+
+ for( size_t ix = 0; ix < msgCount; ++ix )
+ {
+ ostringstream lcStream;
+ lcStream << "SimpleTest - Message #" << ix << ends;
+ textMsg->setText( lcStream.str() );
+ producer->send( textMsg );
+ }
+
+ delete textMsg;
+
+ Thread::sleep( 100 );
+
+ session->commit();
+
+ textMsg = session->createTextMessage();
+
+ for( size_t ix = 0; ix < msgCount; ++ix )
+ {
+ ostringstream lcStream;
+ lcStream << "SimpleTest - Message #" << ix << ends;
+ textMsg->setText( lcStream.str() );
+ producer->send( textMsg );
+ }
+
+ delete textMsg;
+
+ Thread::sleep( 500 );
+
+ session->rollback();
+
+ Thread::sleep( 500 );
+
+ textMsg = session->createTextMessage();
+ textMsg->setText( "SimpleTest - Message after Rollback" );
+ producer->send( textMsg );
+ delete textMsg;
+
+ Thread::sleep( 15000 );
+
+ CPPUNIT_ASSERT( true );
+
+ textMsg = session->createTextMessage();
+ textMsg->setText( "SimpleTest - Message after Rollback" );
+ producer->send( textMsg );
+ delete textMsg;
+
+ if( IntegrationCommon::debug ) {
+ printf( "Shutting Down\n" );
+ }
+
+ delete producer;
+ delete consumer;
+ delete topic;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+void OpenwireSimpleRollbackTest::onException( const cms::CMSException& error AMQCPP_UNUSED)
+{
+ bool AbstractTester = false;
+ CPPUNIT_ASSERT( AbstractTester );
+}
+
+void OpenwireSimpleRollbackTest::onMessage( const cms::Message* message )
+{
+ try
+ {
+ // Got a text message.
+ const cms::TextMessage* txtMsg =
+ dynamic_cast<const cms::TextMessage*>(message);
+
+ if( txtMsg != NULL )
+ {
+ if( IntegrationCommon::debug ) {
+ printf("received text msg: %s\n", txtMsg->getText().c_str() );
+ }
+
+ numReceived++;
+
+ // Signal that we got one
+ synchronized( &mutex )
+ {
+ mutex.notifyAll();
+ }
+
+ return;
+ }
+
+ // Got a bytes msg.
+ const cms::BytesMessage* bytesMsg =
+ dynamic_cast<const cms::BytesMessage*>(message);
+
+ if( bytesMsg != NULL )
+ {
+ const unsigned char* bytes = bytesMsg->getBodyBytes();
+
+ string transcode( (const char*)bytes, bytesMsg->getBodyLength() );
+
+ if( IntegrationCommon::debug ) {
+ printf("Received Bytes Message: %s", transcode.c_str() );
+ }
+
+ numReceived++;
+
+ // Signal that we got one
+ synchronized( &mutex )
+ {
+ mutex.notifyAll();
+ }
+
+ return;
+ }
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleRollbackTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleRollbackTest.h?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleRollbackTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleRollbackTest.h Sun Mar 11 07:17:14 2007
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIRESIMPLEROLLBACKTEST_H_
+#define _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIRESIMPLEROLLBACKTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <activemq/concurrent/Mutex.h>
+
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/ConnectionFactory.h>
+#include <cms/Connection.h>
+#include <cms/Session.h>
+#include <cms/MessageProducer.h>
+
+namespace integration{
+namespace connector{
+namespace openwire{
+
+ class OpenwireSimpleRollbackTest : public CppUnit::TestFixture,
+ public cms::ExceptionListener,
+ public cms::MessageListener
+ {
+ CPPUNIT_TEST_SUITE( OpenwireSimpleRollbackTest );
+ CPPUNIT_TEST( test );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ OpenwireSimpleRollbackTest();
+ virtual ~OpenwireSimpleRollbackTest();
+
+ virtual void test(void);
+
+ virtual void onException( const cms::CMSException& error );
+ virtual void onMessage( const cms::Message* message );
+
+ private:
+
+ cms::ConnectionFactory* connectionFactory;
+ cms::Connection* connection;
+ cms::Session* session;
+
+ unsigned int numReceived;
+ unsigned int msgCount;
+ activemq::concurrent::Mutex mutex;
+
+ };
+
+}}}
+
+#endif /*_INTEGRATION_CONNECTOR_OPENWIRE_OPENWIRESIMPLEROLLBACKTEST_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp Sun Mar 11 07:17:14 2007
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireSimpleTest.h"
+#include <integration/IntegrationCommon.h>
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::openwire::OpenwireSimpleTest );
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace integration;
+using namespace integration::connector::openwire;
+
+OpenwireSimpleTest::OpenwireSimpleTest()
+{
+}
+
+OpenwireSimpleTest::~OpenwireSimpleTest()
+{
+}
+
+void OpenwireSimpleTest::testAutoAck()
+{
+ try
+ {
+ TestSupport testSupport("tcp://localhost:61616?wireFormat=openwire");
+ testSupport.initialize();
+
+ if( IntegrationCommon::debug ) {
+ cout << "Starting activemqcms test (sending "
+ << IntegrationCommon::defaultMsgCount
+ << " messages per type and sleeping "
+ << IntegrationCommon::defaultDelay
+ << " milli-seconds) ...\n"
+ << endl;
+ }
+
+ // Create CMS Object for Comms
+ cms::Session* session = testSupport.getSession();
+ cms::Topic* topic = session->createTopic("mytopic");
+ cms::MessageConsumer* consumer =
+ session->createConsumer( topic );
+ consumer->setMessageListener( &testSupport );
+ cms::MessageProducer* producer =
+ session->createProducer( topic );
+
+ // Send some text messages
+ testSupport.produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount );
+
+ // Send some bytes messages.
+ testSupport.produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount );
+
+ // Wait for the messages to get here
+ testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
+
+ unsigned int numReceived = testSupport.getNumReceived();
+ if( IntegrationCommon::debug ) {
+ printf("received: %d\n", numReceived );
+ }
+ CPPUNIT_ASSERT(
+ numReceived == IntegrationCommon::defaultMsgCount * 2 );
+
+ if( IntegrationCommon::debug ) {
+ printf("Shutting Down\n" );
+ }
+ delete producer;
+ delete consumer;
+ delete topic;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+void OpenwireSimpleTest::testClientAck()
+{
+ try
+ {
+ TestSupport testSupport("tcp://localhost:61616?wireFormat=openwire", cms::Session::CLIENT_ACKNOWLEDGE );
+ testSupport.initialize();
+
+ if( IntegrationCommon::debug ) {
+ cout << "Starting activemqcms test (sending "
+ << IntegrationCommon::defaultMsgCount
+ << " messages per type and sleeping "
+ << IntegrationCommon::defaultDelay
+ << " milli-seconds) ...\n"
+ << endl;
+ }
+
+ // Create CMS Object for Comms
+ cms::Session* session = testSupport.getSession();
+ cms::Topic* topic = session->createTopic("mytopic");
+ cms::MessageConsumer* consumer =
+ session->createConsumer( topic );
+ consumer->setMessageListener( &testSupport );
+ cms::MessageProducer* producer =
+ session->createProducer( topic );
+
+ // Send some text messages
+ testSupport.produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount );
+
+ // Send some bytes messages.
+ testSupport.produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount );
+
+ // Wait for the messages to get here
+ testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
+
+ unsigned int numReceived = testSupport.getNumReceived();
+ if( IntegrationCommon::debug ) {
+ printf("received: %d\n", numReceived );
+ }
+ CPPUNIT_ASSERT(
+ numReceived == IntegrationCommon::defaultMsgCount * 2 );
+
+ if( IntegrationCommon::debug ) {
+ printf("Shutting Down\n" );
+ }
+ delete producer;
+ delete consumer;
+ delete topic;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h Sun Mar 11 07:17:14 2007
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIRESIMPLETESTER_H_
+#define _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIRESIMPLETESTER_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <integration/TestSupport.h>
+
+namespace integration{
+namespace connector{
+namespace openwire{
+
+ class OpenwireSimpleTest : public CppUnit::TestFixture
+ {
+ CPPUNIT_TEST_SUITE( OpenwireSimpleTest );
+ CPPUNIT_TEST( testAutoAck );
+ CPPUNIT_TEST( testClientAck );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ OpenwireSimpleTest();
+ virtual ~OpenwireSimpleTest();
+
+ virtual void testAutoAck();
+ virtual void testClientAck();
+
+ };
+
+}}}
+
+#endif /*_INTEGRATION_CONNECTOR_OPENWIRE_OPENWIRESIMPLETESTER_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.cpp Sun Mar 11 07:17:14 2007
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireTransactionTest.h"
+#include <integration/IntegrationCommon.h>
+
+//CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::openwire::OpenwireTransactionTest );
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace integration;
+using namespace integration::connector::openwire;
+
+OpenwireTransactionTest::OpenwireTransactionTest()
+:
+ testSupport( "tcp://127.0.0.1:61616?wireFormat=openwire", cms::Session::SESSION_TRANSACTED )
+{
+ testSupport.initialize();
+}
+
+OpenwireTransactionTest::~OpenwireTransactionTest()
+{}
+
+void OpenwireTransactionTest::test()
+{
+ try
+ {
+ if( IntegrationCommon::debug ) {
+ cout << "Starting activemqcms transactional test (sending "
+ << IntegrationCommon::defaultMsgCount
+ << " messages per type and sleeping "
+ << IntegrationCommon::defaultDelay
+ << " milli-seconds) ...\n"
+ << endl;
+ }
+
+ // Create CMS Object for Comms
+ cms::Session* session = testSupport.getSession();
+ cms::Topic* topic = session->createTopic("mytopic");
+ cms::MessageConsumer* consumer =
+ session->createConsumer( topic );
+ consumer->setMessageListener( &testSupport );
+ cms::MessageProducer* producer =
+ session->createProducer( topic );
+
+ // Send some text messages
+ testSupport.produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount );
+
+ session->commit();
+
+ // Send some bytes messages.
+ testSupport.produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount );
+
+ session->commit();
+
+ // Wait till we get all the messages
+ testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
+
+ unsigned int numReceived = testSupport.getNumReceived();
+ if( IntegrationCommon::debug ) {
+ printf("received: %d\n", numReceived );
+ }
+ CPPUNIT_ASSERT(
+ numReceived == IntegrationCommon::defaultMsgCount * 2 );
+
+ testSupport.setNumReceived( 0 );
+
+ // Send some text messages
+ testSupport.produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount );
+
+ session->rollback();
+
+ // Wait till we get all the messages
+ testSupport.waitForMessages( IntegrationCommon::defaultMsgCount );
+
+ numReceived = testSupport.getNumReceived();
+ if( IntegrationCommon::debug ) {
+ printf("received: %d\n", numReceived );
+ }
+ CPPUNIT_ASSERT(
+ numReceived == IntegrationCommon::defaultMsgCount );
+
+ if( IntegrationCommon::debug ) {
+ printf("Shutting Down\n" );
+ }
+ delete producer;
+ delete consumer;
+ delete topic;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h?view=auto&rev=516905
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTransactionTest.h Sun Mar 11 07:17:14 2007
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIRETRANSACTIONTESTER_H_
+#define _INTEGRATION_CONNECTOR_OPENWIRE_OPENWIRETRANSACTIONTESTER_H_
+
+#include <integration/TestSupport.h>
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace integration{
+namespace connector{
+namespace openwire{
+
+ class OpenwireTransactionTest : public CppUnit::TestFixture
+ {
+ CPPUNIT_TEST_SUITE( OpenwireTransactionTest );
+ CPPUNIT_TEST( test );
+ CPPUNIT_TEST_SUITE_END();
+
+ private:
+
+ TestSupport testSupport;
+
+ public:
+
+ OpenwireTransactionTest();
+ virtual ~OpenwireTransactionTest();
+
+ virtual void test();
+
+ };
+
+}}}
+
+#endif /*_INTEGRATION_CONNECTOR_OPENWIRE_OPENWIRETRANSACTIONTESTER_H_*/