You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by jandeclercq <ja...@alsic.be> on 2010/11/17 12:56:07 UTC

activemq-cpp-library-3.2.2 - reconnect fails

Hey,

When I disconnected and closed down a producer. And afterwards I'm trying to
reconnect, the cpp-library crashes.  I don't know what I'm doing wrong.

See the code below (choose option R)



#include <iostream>
#include <stdio.h>
#include <string.h>

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>

using namespace activemq;
using namespace activemq::transport;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;

using namespace std;

class SimpleProducer: public ExceptionListener, public
DefaultTransportListener {
private:

	Connection* connection;
	Session* session;
	Destination* destination;
	MessageProducer* producer;
	bool useTopic;
	bool clientAck;
	std::string brokerURI;
	std::string destURI;
	bool isClosed;

public:
	SimpleProducer(const std::string& brokerURI, const std::string& destURI,
bool useTopic, bool clientAck) {
		activemq::library::ActiveMQCPP::initializeLibrary();
		this->connection = NULL;
		this->session = NULL;
		this->destination = NULL;
		this->producer = NULL;
		this->useTopic = useTopic;
		this->brokerURI = brokerURI;
		this->destURI = destURI;
		this->clientAck = clientAck;
		this->isClosed = true;
	}

	virtual ~SimpleProducer() {
		close();
	}

	void close() {
		if (!isClosed) {
			isClosed = true;
			this->cleanup();
		}
	}

	virtual void connect() {
		try {
			try {
				this->isClosed = false;
				// Create a ConnectionFactory
				ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory(this->brokerURI);
				// Create a Connection
				this->connection = connectionFactory->createConnection();
				delete connectionFactory;
				ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>
(connection);
				if (amqConnection != NULL) {
					amqConnection->addTransportListener(this);
				}
				//This will crash the 2nd time!:
				this->connection->start();
				this->connection->setExceptionListener(this);

			} catch (Exception& e) {
				throw e;
			}

			// Create a Session
			if (this->clientAck) {
				this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
			} else {
				this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
			}

			// Create the destination (Topic or Queue)
			if (this->useTopic) {
				this->destination = this->session->createTopic(this->destURI);
			} else {
				this->destination = this->session->createQueue(this->destURI);
			}

			// Create a MessageProducer from the Session to the Topic or Queue
			this->producer = this->session->createProducer(this->destination);

			this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

			fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
			fflush(stdout);

		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
	}

	virtual void send(const char *msg) {
		try {

			string text(msg);
			TextMessage* message = this->session->createTextMessage(text);

			char logMessage[256];

			this->producer->send(message);

			//logging:
			sprintf(logMessage, "Message '%s' send to serviceBus", msg);

			//end logging
			delete message;
		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
	}
	//Exception Listener
	virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
		fprintf(stdout, "!! CMS lib Exception occured !!");
		fflush(stdout);
	}
	//Transport Listener
	virtual void transportInterrupted() {
		fprintf(stdout, "The Connection's Transport has been Interrupted.");
		fflush(stdout);
	}
	virtual void transportResumed() {
		fprintf(stdout, "The Connection's Transport has been Restored.");
		fflush(stdout);
	}
private:

	void cleanup() {
		//Producer
		try {
			if (producer != NULL) {
				producer->close();
				delete producer;
			}
		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);

		}
		producer = NULL;

		// Destination
		try {
			if (destination != NULL) {
				delete destination;
			}
		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
		destination = NULL;

		//SESSION
		try {
			if (session != NULL) {
				session->close();
				delete session;
			}
		} catch (Exception& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
		session = NULL;

		//CONNECTION
		try {
			if (connection != NULL) {
				connection->close();
				delete connection;
			}
			connection = NULL;
		} catch (Exception& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
		connection = NULL;
		activemq::library::ActiveMQCPP::shutdownLibrary();
	}

};//class SimpleProducer


SimpleProducer *amq_producer;
void connect(void) {
	std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
	std::string _destURI = "TEST.FOO";
	amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
	amq_producer->connect();
}

void disconnect(void) {
	if(amq_producer != NULL){
		amq_producer->close();
	}
	delete amq_producer;
}
void send(void) {
	amq_producer->send("TEST MESSAGE");
}

int main(int argc, const char* argv[]) {
	string str;
	connect();
	while (str.compare("Q") != 0) {
		printf("\n  P  = to produce\n  R = to reset Producer\n  Q  = quit\n");
		getline(cin, str);

		if (str.compare("P") == 0) {
			send();
		} else if (str.compare("R") == 0) {
			disconnect();
			connect();
		}
	}
	disconnect();
	return 0;
}



-- 
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046566.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

RE: activemq-cpp-library-3.2.2 - reconnect fails

Posted by jandeclercq <ja...@alsic.be>.
Thx for your reply.


Yes..  I delete a bit to much.. just to be sure there wasn't anything open anymore.

When I delete all the deletes the behaviour remaines the same!

grtz..


________________________________________
Van: Nag [via ActiveMQ] [ml-node+3046860-2107221496-202575@n4.nabble.com]
Verzonden: woensdag 17 november 2010 15:37
Aan: Jan Declercq
Onderwerp: RE: activemq-cpp-library-3.2.2 - reconnect fails

Using delete on a pointer to an object not allocated with new gives unpredictable results

Please check where you are mis-managing 'amq_producer'



Thank You
Nag P



-----Original Message-----
From: jandeclercq [mailto:[hidden email]</user/SendEmail.jtp?type=node&node=3046860&i=0>]
Sent: Wednesday, November 17, 2010 6:56 AM
To: [hidden email]</user/SendEmail.jtp?type=node&node=3046860&i=1>
Subject: activemq-cpp-library-3.2.2 - reconnect fails


Hey,

When I disconnected and closed down a producer. And afterwards I'm trying to reconnect, the cpp-library crashes.  I don't know what I'm doing wrong.

See the code below (choose option R)



#include <iostream>
#include <stdio.h>
#include <string.h>

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h> #include <decaf/lang/Integer.h> #include <activemq/util/Config.h> #include <decaf/util/Date.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h>

using namespace activemq;
using namespace activemq::transport;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent; using namespace cms; using namespace std;

using namespace std;

class SimpleProducer: public ExceptionListener, public DefaultTransportListener {
private:

        Connection* connection;
        Session* session;
        Destination* destination;
        MessageProducer* producer;
        bool useTopic;
        bool clientAck;
        std::string brokerURI;
        std::string destURI;
        bool isClosed;

public:
        SimpleProducer(const std::string& brokerURI, const std::string& destURI, bool useTopic, bool clientAck) {
                activemq::library::ActiveMQCPP::initializeLibrary();
                this->connection = NULL;
                this->session = NULL;
                this->destination = NULL;
                this->producer = NULL;
                this->useTopic = useTopic;
                this->brokerURI = brokerURI;
                this->destURI = destURI;
                this->clientAck = clientAck;
                this->isClosed = true;
        }

        virtual ~SimpleProducer() {
                close();
        }

        void close() {
                if (!isClosed) {
                        isClosed = true;
                        this->cleanup();
                }
        }

        virtual void connect() {
                try {
                        try {
                                this->isClosed = false;
                                // Create a ConnectionFactory
                                ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory(this->brokerURI);
                                // Create a Connection
                                this->connection = connectionFactory->createConnection();
                                delete connectionFactory;
                                ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*> (connection);
                                if (amqConnection != NULL) {
                                        amqConnection->addTransportListener(this);
                                }
                                //This will crash the 2nd time!:
                                this->connection->start();
                                this->connection->setExceptionListener(this);

                        } catch (Exception& e) {
                                throw e;
                        }

                        // Create a Session
                        if (this->clientAck) {
                                this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
                        } else {
                                this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
                        }

                        // Create the destination (Topic or Queue)
                        if (this->useTopic) {
                                this->destination = this->session->createTopic(this->destURI);
                        } else {
                                this->destination = this->session->createQueue(this->destURI);
                        }

                        // Create a MessageProducer from the Session to the Topic or Queue
                        this->producer = this->session->createProducer(this->destination);

                        this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

                        fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
                        fflush(stdout);

                } catch (CMSException& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);
                }
        }

        virtual void send(const char *msg) {
                try {

                        string text(msg);
                        TextMessage* message = this->session->createTextMessage(text);

                        char logMessage[256];

                        this->producer->send(message);

                        //logging:
                        sprintf(logMessage, "Message '%s' send to serviceBus", msg);

                        //end logging
                        delete message;
                } catch (CMSException& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);
                }
        }
        //Exception Listener
        virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
                fprintf(stdout, "!! CMS lib Exception occured !!");
                fflush(stdout);
        }
        //Transport Listener
        virtual void transportInterrupted() {
                fprintf(stdout, "The Connection's Transport has been Interrupted.");
                fflush(stdout);
        }
        virtual void transportResumed() {
                fprintf(stdout, "The Connection's Transport has been Restored.");
                fflush(stdout);
        }
private:

        void cleanup() {
                //Producer
                try {
                        if (producer != NULL) {
                                producer->close();
                                delete producer;
                        }
                } catch (CMSException& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);

                }
                producer = NULL;

                // Destination
                try {
                        if (destination != NULL) {
                                delete destination;
                        }
                } catch (CMSException& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);
                }
                destination = NULL;

                //SESSION
                try {
                        if (session != NULL) {
                                session->close();
                                delete session;
                        }
                } catch (Exception& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);
                }
                session = NULL;

                //CONNECTION
                try {
                        if (connection != NULL) {
                                connection->close();
                                delete connection;
                        }
                        connection = NULL;
                } catch (Exception& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);
                }
                connection = NULL;
                activemq::library::ActiveMQCPP::shutdownLibrary();
        }

};//class SimpleProducer


SimpleProducer *amq_producer;
void connect(void) {
        std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
        std::string _destURI = "TEST.FOO";
        amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
        amq_producer->connect();
}

void disconnect(void) {
        if(amq_producer != NULL){
                amq_producer->close();
        }
        delete amq_producer;
}
void send(void) {
        amq_producer->send("TEST MESSAGE");
}

int main(int argc, const char* argv[]) {
        string str;
        connect();
        while (str.compare("Q") != 0) {
                printf("\n  P  = to produce\n  R = to reset Producer\n  Q  = quit\n");
                getline(cin, str);

                if (str.compare("P") == 0) {
                        send();
                } else if (str.compare("R") == 0) {
                        disconnect();
                        connect();
                }
        }
        disconnect();
        return 0;
}



--
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046566.html<http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046566.html?by-user=t>
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


________________________________
View message @ http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046860.html
To unsubscribe from activemq-cpp-library-3.2.2 - reconnect fails, click here<http://activemq.2283324.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=3046566&code=amFuLmRlY2xlcmNxQGFsc2ljLmJlfDMwNDY1NjZ8LTc3OTY2MjEwMw==>.

-- 
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046987.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

RE: activemq-cpp-library-3.2.2 - reconnect fails

Posted by "Nakarikanti, Nageswara" <NN...@dwd.IN.gov>.

Using delete on a pointer to an object not allocated with new gives unpredictable results

Please check where you are mis-managing 'amq_producer'



Thank You
Nag P

 

-----Original Message-----
From: jandeclercq [mailto:jan.declercq@alsic.be] 
Sent: Wednesday, November 17, 2010 6:56 AM
To: users@activemq.apache.org
Subject: activemq-cpp-library-3.2.2 - reconnect fails


Hey,

When I disconnected and closed down a producer. And afterwards I'm trying to reconnect, the cpp-library crashes.  I don't know what I'm doing wrong.

See the code below (choose option R)



#include <iostream>
#include <stdio.h>
#include <string.h>

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h> #include <decaf/lang/Integer.h> #include <activemq/util/Config.h> #include <decaf/util/Date.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h>

using namespace activemq;
using namespace activemq::transport;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent; using namespace cms; using namespace std;

using namespace std;

class SimpleProducer: public ExceptionListener, public DefaultTransportListener {
private:

	Connection* connection;
	Session* session;
	Destination* destination;
	MessageProducer* producer;
	bool useTopic;
	bool clientAck;
	std::string brokerURI;
	std::string destURI;
	bool isClosed;

public:
	SimpleProducer(const std::string& brokerURI, const std::string& destURI, bool useTopic, bool clientAck) {
		activemq::library::ActiveMQCPP::initializeLibrary();
		this->connection = NULL;
		this->session = NULL;
		this->destination = NULL;
		this->producer = NULL;
		this->useTopic = useTopic;
		this->brokerURI = brokerURI;
		this->destURI = destURI;
		this->clientAck = clientAck;
		this->isClosed = true;
	}

	virtual ~SimpleProducer() {
		close();
	}

	void close() {
		if (!isClosed) {
			isClosed = true;
			this->cleanup();
		}
	}

	virtual void connect() {
		try {
			try {
				this->isClosed = false;
				// Create a ConnectionFactory
				ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory(this->brokerURI);
				// Create a Connection
				this->connection = connectionFactory->createConnection();
				delete connectionFactory;
				ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*> (connection);
				if (amqConnection != NULL) {
					amqConnection->addTransportListener(this);
				}
				//This will crash the 2nd time!:
				this->connection->start();
				this->connection->setExceptionListener(this);

			} catch (Exception& e) {
				throw e;
			}

			// Create a Session
			if (this->clientAck) {
				this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
			} else {
				this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
			}

			// Create the destination (Topic or Queue)
			if (this->useTopic) {
				this->destination = this->session->createTopic(this->destURI);
			} else {
				this->destination = this->session->createQueue(this->destURI);
			}

			// Create a MessageProducer from the Session to the Topic or Queue
			this->producer = this->session->createProducer(this->destination);

			this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

			fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
			fflush(stdout);

		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
	}

	virtual void send(const char *msg) {
		try {

			string text(msg);
			TextMessage* message = this->session->createTextMessage(text);

			char logMessage[256];

			this->producer->send(message);

			//logging:
			sprintf(logMessage, "Message '%s' send to serviceBus", msg);

			//end logging
			delete message;
		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
	}
	//Exception Listener
	virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
		fprintf(stdout, "!! CMS lib Exception occured !!");
		fflush(stdout);
	}
	//Transport Listener
	virtual void transportInterrupted() {
		fprintf(stdout, "The Connection's Transport has been Interrupted.");
		fflush(stdout);
	}
	virtual void transportResumed() {
		fprintf(stdout, "The Connection's Transport has been Restored.");
		fflush(stdout);
	}
private:

	void cleanup() {
		//Producer
		try {
			if (producer != NULL) {
				producer->close();
				delete producer;
			}
		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);

		}
		producer = NULL;

		// Destination
		try {
			if (destination != NULL) {
				delete destination;
			}
		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
		destination = NULL;

		//SESSION
		try {
			if (session != NULL) {
				session->close();
				delete session;
			}
		} catch (Exception& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
		session = NULL;

		//CONNECTION
		try {
			if (connection != NULL) {
				connection->close();
				delete connection;
			}
			connection = NULL;
		} catch (Exception& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
		connection = NULL;
		activemq::library::ActiveMQCPP::shutdownLibrary();
	}

};//class SimpleProducer


SimpleProducer *amq_producer;
void connect(void) {
	std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
	std::string _destURI = "TEST.FOO";
	amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
	amq_producer->connect();
}

void disconnect(void) {
	if(amq_producer != NULL){
		amq_producer->close();
	}
	delete amq_producer;
}
void send(void) {
	amq_producer->send("TEST MESSAGE");
}

int main(int argc, const char* argv[]) {
	string str;
	connect();
	while (str.compare("Q") != 0) {
		printf("\n  P  = to produce\n  R = to reset Producer\n  Q  = quit\n");
		getline(cin, str);

		if (str.compare("P") == 0) {
			send();
		} else if (str.compare("R") == 0) {
			disconnect();
			connect();
		}
	}
	disconnect();
	return 0;
}



--
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046566.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

RE: activemq-cpp-library-3.2.2 - reconnect fails

Posted by jandeclercq <ja...@alsic.be>.
Hey,

Thank you thank you,..  you solved my problem !

It does make my architecture puzzle a lot bigger, but that is the fun in programming, isn't it?  :-)
(yes it is a lot bigger than the example I posted)

Thanks!
________________________________________
Van: Timothy Bish [via ActiveMQ] [ml-node+3046971-419590077-202575@n4.nabble.com]
Verzonden: woensdag 17 november 2010 16:31
Aan: Jan Declercq
Onderwerp: Re: activemq-cpp-library-3.2.2 - reconnect fails

On Wed, 2010-11-17 at 03:56 -0800, jandeclercq wrote:
> Hey,
>
> When I disconnected and closed down a producer. And afterwards I'm trying to
> reconnect, the cpp-library crashes.  I don't know what I'm doing wrong.
>
> See the code below (choose option R)
>
>

The problem most likely results from the fact that you are calling the
library init and shutdown methods more than once per application run by
placing them in the constructor and cleanup methods of the class.  Try
placing them in the main method at start and end.


Regards

>
> #include <iostream>
> #include <stdio.h>
> #include <string.h>
>
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/core/ActiveMQConnection.h>
> #include <activemq/transport/DefaultTransportListener.h>
> #include <activemq/library/ActiveMQCPP.h>
> #include <decaf/lang/Integer.h>
> #include <activemq/util/Config.h>
> #include <decaf/util/Date.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
>
> using namespace activemq;
> using namespace activemq::transport;
> using namespace activemq::core;
> using namespace decaf;
> using namespace decaf::lang;
> using namespace decaf::util;
> using namespace decaf::util::concurrent;
> using namespace cms;
> using namespace std;
>
> using namespace std;
>
> class SimpleProducer: public ExceptionListener, public
> DefaultTransportListener {
> private:
>
> Connection* connection;
> Session* session;
> Destination* destination;
> MessageProducer* producer;
> bool useTopic;
> bool clientAck;
> std::string brokerURI;
> std::string destURI;
> bool isClosed;
>
> public:
> SimpleProducer(const std::string& brokerURI, const std::string& destURI,
> bool useTopic, bool clientAck) {
> activemq::library::ActiveMQCPP::initializeLibrary();
> this->connection = NULL;
> this->session = NULL;
> this->destination = NULL;
> this->producer = NULL;
> this->useTopic = useTopic;
> this->brokerURI = brokerURI;
> this->destURI = destURI;
> this->clientAck = clientAck;
> this->isClosed = true;
> }
>
> virtual ~SimpleProducer() {
> close();
> }
>
> void close() {
> if (!isClosed) {
> isClosed = true;
> this->cleanup();
> }
> }
>
> virtual void connect() {
> try {
> try {
> this->isClosed = false;
> // Create a ConnectionFactory
> ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory(this->brokerURI);
> // Create a Connection
> this->connection = connectionFactory->createConnection();
> delete connectionFactory;
> ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>
> (connection);
> if (amqConnection != NULL) {
> amqConnection->addTransportListener(this);
> }
> //This will crash the 2nd time!:
> this->connection->start();
> this->connection->setExceptionListener(this);
>
> } catch (Exception& e) {
> throw e;
> }
>
> // Create a Session
> if (this->clientAck) {
> this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
> } else {
> this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
> }
>
> // Create the destination (Topic or Queue)
> if (this->useTopic) {
> this->destination = this->session->createTopic(this->destURI);
> } else {
> this->destination = this->session->createQueue(this->destURI);
> }
>
> // Create a MessageProducer from the Session to the Topic or Queue
> this->producer = this->session->createProducer(this->destination);
>
> this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
>
> fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
> fflush(stdout);
>
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> }
>
> virtual void send(const char *msg) {
> try {
>
> string text(msg);
> TextMessage* message = this->session->createTextMessage(text);
>
> char logMessage[256];
>
> this->producer->send(message);
>
> //logging:
> sprintf(logMessage, "Message '%s' send to serviceBus", msg);
>
> //end logging
> delete message;
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> }
> //Exception Listener
> virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
> fprintf(stdout, "!! CMS lib Exception occured !!");
> fflush(stdout);
> }
> //Transport Listener
> virtual void transportInterrupted() {
> fprintf(stdout, "The Connection's Transport has been Interrupted.");
> fflush(stdout);
> }
> virtual void transportResumed() {
> fprintf(stdout, "The Connection's Transport has been Restored.");
> fflush(stdout);
> }
> private:
>
> void cleanup() {
> //Producer
> try {
> if (producer != NULL) {
> producer->close();
> delete producer;
> }
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
>
> }
> producer = NULL;
>
> // Destination
> try {
> if (destination != NULL) {
> delete destination;
> }
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> destination = NULL;
>
> //SESSION
> try {
> if (session != NULL) {
> session->close();
> delete session;
> }
> } catch (Exception& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> session = NULL;
>
> //CONNECTION
> try {
> if (connection != NULL) {
> connection->close();
> delete connection;
> }
> connection = NULL;
> } catch (Exception& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> connection = NULL;
> activemq::library::ActiveMQCPP::shutdownLibrary();
> }
>
> };//class SimpleProducer
>
>
> SimpleProducer *amq_producer;
> void connect(void) {
> std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
> std::string _destURI = "TEST.FOO";
> amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
> amq_producer->connect();
> }
>
> void disconnect(void) {
> if(amq_producer != NULL){
> amq_producer->close();
> }
> delete amq_producer;
> }
> void send(void) {
> amq_producer->send("TEST MESSAGE");
> }
>
> int main(int argc, const char* argv[]) {
> string str;
> connect();
> while (str.compare("Q") != 0) {
> printf("\n  P  = to produce\n  R = to reset Producer\n  Q  = quit\n");
> getline(cin, str);
>
> if (str.compare("P") == 0) {
> send();
> } else if (str.compare("R") == 0) {
> disconnect();
> connect();
> }
> }
> disconnect();
> return 0;
> }
>
>
>

--
Tim Bish
------------
FuseSource
Email: [hidden email]</user/SendEmail.jtp?type=node&node=3046971&i=0>
Web: http://fusesource.com<http://fusesource.com?by-user=t>
Twitter: tabish121
Blog: http://timbish.blogspot.com/




________________________________
View message @ http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046971.html
To unsubscribe from activemq-cpp-library-3.2.2 - reconnect fails, click here<http://activemq.2283324.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=3046566&code=amFuLmRlY2xlcmNxQGFsc2ljLmJlfDMwNDY1NjZ8LTc3OTY2MjEwMw==>.

-- 
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046993.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: activemq-cpp-library-3.2.2 - reconnect fails

Posted by Timothy Bish <ta...@gmail.com>.
On Wed, 2010-11-17 at 03:56 -0800, jandeclercq wrote:
> Hey,
> 
> When I disconnected and closed down a producer. And afterwards I'm trying to
> reconnect, the cpp-library crashes.  I don't know what I'm doing wrong.
> 
> See the code below (choose option R)
> 
> 

The problem most likely results from the fact that you are calling the
library init and shutdown methods more than once per application run by
placing them in the constructor and cleanup methods of the class.  Try
placing them in the main method at start and end.


Regards

> 
> #include <iostream>
> #include <stdio.h>
> #include <string.h>
> 
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/core/ActiveMQConnection.h>
> #include <activemq/transport/DefaultTransportListener.h>
> #include <activemq/library/ActiveMQCPP.h>
> #include <decaf/lang/Integer.h>
> #include <activemq/util/Config.h>
> #include <decaf/util/Date.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> 
> using namespace activemq;
> using namespace activemq::transport;
> using namespace activemq::core;
> using namespace decaf;
> using namespace decaf::lang;
> using namespace decaf::util;
> using namespace decaf::util::concurrent;
> using namespace cms;
> using namespace std;
> 
> using namespace std;
> 
> class SimpleProducer: public ExceptionListener, public
> DefaultTransportListener {
> private:
> 
> 	Connection* connection;
> 	Session* session;
> 	Destination* destination;
> 	MessageProducer* producer;
> 	bool useTopic;
> 	bool clientAck;
> 	std::string brokerURI;
> 	std::string destURI;
> 	bool isClosed;
> 
> public:
> 	SimpleProducer(const std::string& brokerURI, const std::string& destURI,
> bool useTopic, bool clientAck) {
> 		activemq::library::ActiveMQCPP::initializeLibrary();
> 		this->connection = NULL;
> 		this->session = NULL;
> 		this->destination = NULL;
> 		this->producer = NULL;
> 		this->useTopic = useTopic;
> 		this->brokerURI = brokerURI;
> 		this->destURI = destURI;
> 		this->clientAck = clientAck;
> 		this->isClosed = true;
> 	}
> 
> 	virtual ~SimpleProducer() {
> 		close();
> 	}
> 
> 	void close() {
> 		if (!isClosed) {
> 			isClosed = true;
> 			this->cleanup();
> 		}
> 	}
> 
> 	virtual void connect() {
> 		try {
> 			try {
> 				this->isClosed = false;
> 				// Create a ConnectionFactory
> 				ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory(this->brokerURI);
> 				// Create a Connection
> 				this->connection = connectionFactory->createConnection();
> 				delete connectionFactory;
> 				ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>
> (connection);
> 				if (amqConnection != NULL) {
> 					amqConnection->addTransportListener(this);
> 				}
> 				//This will crash the 2nd time!:
> 				this->connection->start();
> 				this->connection->setExceptionListener(this);
> 
> 			} catch (Exception& e) {
> 				throw e;
> 			}
> 
> 			// Create a Session
> 			if (this->clientAck) {
> 				this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
> 			} else {
> 				this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
> 			}
> 
> 			// Create the destination (Topic or Queue)
> 			if (this->useTopic) {
> 				this->destination = this->session->createTopic(this->destURI);
> 			} else {
> 				this->destination = this->session->createQueue(this->destURI);
> 			}
> 
> 			// Create a MessageProducer from the Session to the Topic or Queue
> 			this->producer = this->session->createProducer(this->destination);
> 
> 			this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
> 
> 			fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
> 			fflush(stdout);
> 
> 		} catch (CMSException& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 		}
> 	}
> 
> 	virtual void send(const char *msg) {
> 		try {
> 
> 			string text(msg);
> 			TextMessage* message = this->session->createTextMessage(text);
> 
> 			char logMessage[256];
> 
> 			this->producer->send(message);
> 
> 			//logging:
> 			sprintf(logMessage, "Message '%s' send to serviceBus", msg);
> 
> 			//end logging
> 			delete message;
> 		} catch (CMSException& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 		}
> 	}
> 	//Exception Listener
> 	virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
> 		fprintf(stdout, "!! CMS lib Exception occured !!");
> 		fflush(stdout);
> 	}
> 	//Transport Listener
> 	virtual void transportInterrupted() {
> 		fprintf(stdout, "The Connection's Transport has been Interrupted.");
> 		fflush(stdout);
> 	}
> 	virtual void transportResumed() {
> 		fprintf(stdout, "The Connection's Transport has been Restored.");
> 		fflush(stdout);
> 	}
> private:
> 
> 	void cleanup() {
> 		//Producer
> 		try {
> 			if (producer != NULL) {
> 				producer->close();
> 				delete producer;
> 			}
> 		} catch (CMSException& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 
> 		}
> 		producer = NULL;
> 
> 		// Destination
> 		try {
> 			if (destination != NULL) {
> 				delete destination;
> 			}
> 		} catch (CMSException& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 		}
> 		destination = NULL;
> 
> 		//SESSION
> 		try {
> 			if (session != NULL) {
> 				session->close();
> 				delete session;
> 			}
> 		} catch (Exception& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 		}
> 		session = NULL;
> 
> 		//CONNECTION
> 		try {
> 			if (connection != NULL) {
> 				connection->close();
> 				delete connection;
> 			}
> 			connection = NULL;
> 		} catch (Exception& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 		}
> 		connection = NULL;
> 		activemq::library::ActiveMQCPP::shutdownLibrary();
> 	}
> 
> };//class SimpleProducer
> 
> 
> SimpleProducer *amq_producer;
> void connect(void) {
> 	std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
> 	std::string _destURI = "TEST.FOO";
> 	amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
> 	amq_producer->connect();
> }
> 
> void disconnect(void) {
> 	if(amq_producer != NULL){
> 		amq_producer->close();
> 	}
> 	delete amq_producer;
> }
> void send(void) {
> 	amq_producer->send("TEST MESSAGE");
> }
> 
> int main(int argc, const char* argv[]) {
> 	string str;
> 	connect();
> 	while (str.compare("Q") != 0) {
> 		printf("\n  P  = to produce\n  R = to reset Producer\n  Q  = quit\n");
> 		getline(cin, str);
> 
> 		if (str.compare("P") == 0) {
> 			send();
> 		} else if (str.compare("R") == 0) {
> 			disconnect();
> 			connect();
> 		}
> 	}
> 	disconnect();
> 	return 0;
> }
> 
> 
> 

-- 
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/



Re: activemq-cpp-library-3.2.2 - reconnect fails

Posted by Nag <NN...@dwd.IN.gov>.
Using delete on a pointer to an object not allocated with new gives
unpredictable results

Please check where you are mis-managing 'amq_producer'

-- 
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046865.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.