You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/12/02 14:39:34 UTC
svn commit: r481559 - in
/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration:
./ ExpirationTest.cpp ExpirationTest.h
Author: nmittler
Date: Sat Dec 2 05:39:33 2006
New Revision: 481559
URL: http://svn.apache.org/viewvc?view=rev&rev=481559
Log:
[AMQCPP-14] Adding integration test for expiration
Added:
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.cpp
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.h
Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.cpp?view=auto&rev=481559
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.cpp (added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.cpp Sat Dec 2 05:39:33 2006
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ExpirationTest.h"
+
+#include <integration/common/IntegrationCommon.h>
+
+//CPPUNIT_TEST_SUITE_REGISTRATION( integration::expiration::ExpirationTest );
+
+#include <sstream>
+
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+#include <cms/Session.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace std;
+using namespace integration;
+using namespace integration::expiration;
+using namespace integration::common;
+
+class Producer : public Runnable {
+private:
+
+ Connection* connection;
+ Session* session;
+ Topic* destination;
+ MessageProducer* producer;
+ int numMessages;
+ long long timeToLive;
+
+public:
+
+ Producer( int numMessages, long long timeToLive ){
+ connection = NULL;
+ session = NULL;
+ destination = NULL;
+ producer = NULL;
+ this->numMessages = numMessages;
+ this->timeToLive = timeToLive;
+ }
+
+ virtual ~Producer(){
+ cleanup();
+ }
+
+ virtual void run() {
+ try {
+ // Create a ConnectionFactory
+ ActiveMQConnectionFactory* connectionFactory = new
+ ActiveMQConnectionFactory("tcp://localhost:61613");
+
+ // Create a Connection
+ connection = connectionFactory->createConnection();
+ connection->start();
+
+ string sss=connection->getClientId();
+ cout << sss << endl;
+
+ session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
+ destination = session->createTopic( "mytopic" );
+
+ producer = session->createProducer( destination );
+ producer->setDeliveryMode( DeliveryMode::PERSISTANT );
+
+ //unsigned long ttt=getcurt();
+ producer->setTimeToLive( 1);
+
+ // Create the Thread Id String
+ string threadIdStr = Integer::toString( Thread::getId() );
+
+ // Create a messages
+ string text = (string)"Hello world! from thread " + threadIdStr;
+
+ for( int ix=0; ix<numMessages; ++ix ){
+ TextMessage* message = session->createTextMessage( text );
+ producer->send( message );
+ delete message;
+ }
+
+ }catch ( CMSException& e ) {
+ e.printStackTrace();
+ }
+ }
+
+private:
+
+ void cleanup(){
+
+ // Destroy resources.
+ try{
+ if( destination != NULL ) delete destination;
+ }catch ( CMSException& e ) {}
+ destination = NULL;
+
+ try{
+ if( producer != NULL ) delete producer;
+ }catch ( CMSException& e ) {}
+ producer = NULL;
+
+ // Close open resources.
+ try{
+ if( session != NULL ) session->close();
+ if( connection != NULL ) connection->close();
+ }catch ( CMSException& e ) {}
+
+ try{
+ if( session != NULL ) delete session;
+ }catch ( CMSException& e ) {}
+ session = NULL;
+
+ try{
+ if( connection != NULL ) delete connection;
+ }catch ( CMSException& e ) {}
+ connection = NULL;
+ }
+};
+
+class Consumer : public MessageListener, public Runnable {
+
+private:
+
+ Connection* connection;
+ Session* session;
+ Topic* destination;
+ MessageConsumer* consumer;
+ long waitMillis;
+ int numReceived;
+
+public:
+
+ Consumer( long waitMillis ){
+ connection = NULL;
+ session = NULL;
+ destination = NULL;
+ consumer = NULL;
+ this->waitMillis = waitMillis;
+ numReceived = 0;
+ }
+
+ virtual ~Consumer(){
+ cleanup();
+ }
+
+ virtual int getNumReceived() const{
+ return numReceived;
+ }
+
+ virtual void run() {
+
+ try {
+
+ string user,passwd,sID;
+ user="default";
+ passwd="";
+ sID="lsgID";
+
+ // Create a ConnectionFactory
+ ActiveMQConnectionFactory* connectionFactory =
+ new ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
+
+ // Create a Connection
+ connection = connectionFactory->createConnection();
+ delete connectionFactory;
+ connection->start();
+
+ // Create a Session
+ session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
+
+ // Create the destination (Topic or Queue)
+ destination = session->createTopic( "mytopic?consumer.retroactive=true");
+
+ consumer = session->createDurableConsumer( destination , user ,"",false);
+
+ consumer->setMessageListener( this );
+
+ // Sleep while asynchronous messages come in.
+ Thread::sleep( waitMillis );
+
+ } catch (CMSException& e) {
+ e.printStackTrace();
+ }
+ }
+
+ virtual void onMessage( const Message* message ){
+
+ try
+ {
+ const TextMessage* textMessage =
+ dynamic_cast< const TextMessage* >( message );
+ string text = textMessage->getText();
+ numReceived++;
+ } catch (CMSException& e) {
+ e.printStackTrace();
+ }
+ }
+
+private:
+
+ void cleanup(){
+
+ // Destroy resources.
+ try{
+ if( destination != NULL ) delete destination;
+ }catch (CMSException& e) {}
+ destination = NULL;
+
+ try{
+ if( consumer != NULL ) delete consumer;
+ }catch (CMSException& e) {}
+ consumer = NULL;
+
+ // Close open resources.
+ try{
+ if( session != NULL ) session->close();
+ if( connection != NULL ) connection->close();
+ }catch (CMSException& e) {}
+
+ try{
+ if( session != NULL ) delete session;
+ }catch (CMSException& e) {}
+ session = NULL;
+
+ try{
+ if( connection != NULL ) delete connection;
+ }catch (CMSException& e) {}
+ connection = NULL;
+ }
+};
+
+void ExpirationTest::testExpired()
+{
+ Producer producer( 2, 1 );
+ Thread producerThread( &producer );
+ producerThread.start();
+ producerThread.join();
+
+ Consumer consumer( 2000 );
+ Thread consumerThread( &consumer );
+ consumerThread.start();
+ consumerThread.join();
+
+ CPPUNIT_ASSERT( consumer.getNumReceived() == 0 );
+}
+
+void ExpirationTest::testNotExpired()
+{
+ Producer producer( 2, 2000 );
+ Thread producerThread( &producer );
+ producerThread.start();
+ producerThread.join();
+
+ Consumer consumer( 2000 );
+ Thread consumerThread( &consumer );
+ consumerThread.start();
+ consumerThread.join();
+
+ CPPUNIT_ASSERT( consumer.getNumReceived() == 2 );
+}
+
Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.h?view=auto&rev=481559
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.h (added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.h Sat Dec 2 05:39:33 2006
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_EXPIRATION_EXPIRATIONTEST_H_
+#define _INTEGRATION_EXPIRATION_EXPIRATIONTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <activemq/concurrent/Mutex.h>
+
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/ConnectionFactory.h>
+#include <cms/Connection.h>
+#include <cms/Session.h>
+#include <cms/MessageProducer.h>
+
+namespace integration{
+namespace expiration{
+
+ class ExpirationTest : public CppUnit::TestFixture
+ {
+ CPPUNIT_TEST_SUITE( ExpirationTest );
+ //CPPUNIT_TEST( testExpired );
+ //CPPUNIT_TEST( testNotExpired );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ ExpirationTest(){}
+ virtual ~ExpirationTest(){}
+
+ virtual void testExpired();
+ virtual void testNotExpired();
+ };
+
+}}
+
+#endif /*_INTEGRATION_EXPIRATION_EXPIRATIONTEST_H_*/