You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by nm...@apache.org on 2006/07/06 17:41:16 UTC

svn commit: r419587 - /incubator/activemq/trunk/activemq-cpp/src/examples/main.cpp

Author: nmittler
Date: Thu Jul  6 08:41:15 2006
New Revision: 419587

URL: http://svn.apache.org/viewvc?rev=419587&view=rev
Log:
Adding example of usage for activemq-cpp - will be displayed on the wiki

Added:
    incubator/activemq/trunk/activemq-cpp/src/examples/main.cpp

Added: incubator/activemq/trunk/activemq-cpp/src/examples/main.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/examples/main.cpp?rev=419587&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/examples/main.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/examples/main.cpp Thu Jul  6 08:41:15 2006
@@ -0,0 +1,243 @@
+
+// START SNIPPET: demo
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <cms/Connection.h>
+#include <cms/Session.h>
+#include <cms/TextMessage.h>
+#include <cms/ExceptionListener.h>
+#include <cms/MessageListener.h>
+#include <stdlib.h>
+
+using namespace activemq::core;
+using namespace activemq::concurrent;
+using namespace cms;
+using namespace std;
+
+class HelloWorldProducer : public Runnable {
+private:
+	
+	Connection* connection;
+	Session* session;
+	Destination* destination;
+	MessageProducer* producer;
+	int numMessages;
+
+public:
+	
+	HelloWorldProducer( int numMessages ){
+		connection = NULL;
+    	session = NULL;
+    	destination = NULL;
+    	producer = NULL;
+    	this->numMessages = numMessages;
+	}
+	
+	virtual ~HelloWorldProducer(){
+		cleanup();
+	}
+	
+    virtual void run() {
+        try {
+            // Create a ConnectionFactory
+            ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory("127.0.0.1:61613");
+
+            // Create a Connection
+            connection = connectionFactory->createConnection();
+            connection->start();
+
+            // Create a Session
+            session = connection->createSession( Session::AutoAcknowledge );
+
+            // Create the destination (Topic or Queue)
+            destination = session->createQueue("TEST.FOO");
+
+            // Create a MessageProducer from the Session to the Topic or Queue
+            producer = session->createProducer(*destination);
+            producer->setDeliveryMode( Message::NONPERSISTANT);
+
+            // Stringify the thread id
+            char threadIdStr[100];
+            itoa( Thread::getId(), threadIdStr, 10 );
+            
+            // Create a messages
+            string text = (string)"Hello world! from thread " + threadIdStr;
+            
+            for( int ix=0; ix<numMessages; ++ix ){
+	            TextMessage* message = session->createTextMessage( text );
+
+    	        // Tell the producer to send the message
+        	    printf( "Sent message from thread %s\n", threadIdStr );
+            	producer->send(*message);
+            	
+            	delete message;
+            }
+			
+        }catch (CMSException& e) {
+            e.printStackTrace();
+        }
+    }
+    
+private:
+
+    void cleanup(){
+    	
+    		// Close open resources.
+    		try{
+    			if( session != NULL ) session->close();
+    			if( connection != NULL ) connection->close();
+			}catch (CMSException& e) {}
+			
+			// Destroy resources.
+			try{                        
+            	if( destination != NULL ) delete destination;
+			}catch (CMSException& e) {}
+			destination = NULL;
+			
+			try{
+	            if( producer != NULL ) delete producer;
+			}catch (CMSException& e) {}
+			producer = NULL;
+			
+			try{
+            	if( session != NULL ) delete session;
+			}catch (CMSException& e) {}
+			session = NULL;
+			
+			try{
+            	if( connection != NULL ) delete connection;
+			}catch (CMSException& e) {}
+    		connection = NULL;
+    }
+};
+
+class HelloWorldConsumer : public ExceptionListener, 
+                           public MessageListener,
+                           public Runnable {
+	
+private:
+	
+	Connection* connection;
+	Session* session;
+	Destination* destination;
+	MessageConsumer* consumer;
+	long waitMillis;
+		
+public: 
+
+	HelloWorldConsumer( long waitMillis ){
+		connection = NULL;
+    	session = NULL;
+    	destination = NULL;
+    	consumer = NULL;
+    	this->waitMillis = waitMillis;
+	}
+    virtual ~HelloWorldConsumer(){    	
+    	cleanup();
+    }
+    
+    virtual void run() {
+    	    	
+        try {
+
+            // Create a ConnectionFactory
+            ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory("127.0.0.1:61613");
+
+            // Create a Connection
+            connection = connectionFactory->createConnection();
+            delete connectionFactory;
+            connection->start();
+            
+            connection->setExceptionListener(this);
+
+            // Create a Session
+            session = connection->createSession( Session::AutoAcknowledge );
+
+            // Create the destination (Topic or Queue)
+            destination = session->createQueue("TEST.FOO");
+
+            // Create a MessageConsumer from the Session to the Topic or Queue
+            consumer = session->createConsumer(*destination);
+            
+            consumer->setMessageListener( this );
+            
+            // Sleep while asynchronous messages come in.
+            Thread::sleep( waitMillis );		
+            
+        } catch (CMSException& e) {
+            e.printStackTrace();
+        }
+    }
+    
+    virtual void onMessage( const Message& message ){
+    	
+        try
+        {
+    	    const TextMessage& textMessage = dynamic_cast<const TextMessage&>(message);
+            string text = textMessage.getText();
+            printf( "Received: %s\n", text.c_str() );
+        }
+        catch( std::bad_cast& ex )
+        {
+            printf( "Received something other than a text Message\n" );
+        }
+    }
+
+    virtual void onException( const CMSException& ex ) {
+        printf("JMS Exception occured.  Shutting down client.\n");
+    }
+    
+private:
+
+    void cleanup(){
+    	
+		// Close open resources.
+		try{
+			if( session != NULL ) session->close();
+			if( connection != NULL ) connection->close();
+		}catch (CMSException& e) {}
+		
+		// Destroy resources.
+		try{                        
+        	if( destination != NULL ) delete destination;
+		}catch (CMSException& e) {}
+		destination = NULL;
+		
+		try{
+            if( consumer != NULL ) delete consumer;
+		}catch (CMSException& e) {}
+		consumer = NULL;
+		
+		try{
+        	if( session != NULL ) delete session;
+		}catch (CMSException& e) {}
+		session = NULL;
+		
+		try{
+        	if( connection != NULL ) delete connection;
+		}catch (CMSException& e) {}
+		connection = NULL;
+    }
+};
+    
+void main(int argc, char* argv[]) {
+    
+    HelloWorldProducer producer( 1000 );
+	HelloWorldConsumer consumer( 5000 );
+	
+	// Start the consumer thread.
+	Thread consumerThread( &consumer );
+	consumerThread.start();
+	
+	// Start the producer thread.
+	Thread producerThread( &producer );
+	producerThread.start();
+
+	// Wait for the threads to complete.
+	producerThread.join();
+	consumerThread.join();
+}
+    
+// END SNIPPET: demo