You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by jandeclercq <ja...@alsic.be> on 2010/11/17 12:56:07 UTC
activemq-cpp-library-3.2.2 - reconnect fails
Hey,
When I disconnected and closed down a producer. And afterwards I'm trying to
reconnect, the cpp-library crashes. I don't know what I'm doing wrong.
See the code below (choose option R)
#include <iostream>
#include <stdio.h>
#include <string.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
using namespace activemq;
using namespace activemq::transport;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
using namespace std;
class SimpleProducer: public ExceptionListener, public
DefaultTransportListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool useTopic;
bool clientAck;
std::string brokerURI;
std::string destURI;
bool isClosed;
public:
SimpleProducer(const std::string& brokerURI, const std::string& destURI,
bool useTopic, bool clientAck) {
activemq::library::ActiveMQCPP::initializeLibrary();
this->connection = NULL;
this->session = NULL;
this->destination = NULL;
this->producer = NULL;
this->useTopic = useTopic;
this->brokerURI = brokerURI;
this->destURI = destURI;
this->clientAck = clientAck;
this->isClosed = true;
}
virtual ~SimpleProducer() {
close();
}
void close() {
if (!isClosed) {
isClosed = true;
this->cleanup();
}
}
virtual void connect() {
try {
try {
this->isClosed = false;
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory(this->brokerURI);
// Create a Connection
this->connection = connectionFactory->createConnection();
delete connectionFactory;
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>
(connection);
if (amqConnection != NULL) {
amqConnection->addTransportListener(this);
}
//This will crash the 2nd time!:
this->connection->start();
this->connection->setExceptionListener(this);
} catch (Exception& e) {
throw e;
}
// Create a Session
if (this->clientAck) {
this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
} else {
this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
}
// Create the destination (Topic or Queue)
if (this->useTopic) {
this->destination = this->session->createTopic(this->destURI);
} else {
this->destination = this->session->createQueue(this->destURI);
}
// Create a MessageProducer from the Session to the Topic or Queue
this->producer = this->session->createProducer(this->destination);
this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
fflush(stdout);
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
}
virtual void send(const char *msg) {
try {
string text(msg);
TextMessage* message = this->session->createTextMessage(text);
char logMessage[256];
this->producer->send(message);
//logging:
sprintf(logMessage, "Message '%s' send to serviceBus", msg);
//end logging
delete message;
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
}
//Exception Listener
virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
fprintf(stdout, "!! CMS lib Exception occured !!");
fflush(stdout);
}
//Transport Listener
virtual void transportInterrupted() {
fprintf(stdout, "The Connection's Transport has been Interrupted.");
fflush(stdout);
}
virtual void transportResumed() {
fprintf(stdout, "The Connection's Transport has been Restored.");
fflush(stdout);
}
private:
void cleanup() {
//Producer
try {
if (producer != NULL) {
producer->close();
delete producer;
}
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
producer = NULL;
// Destination
try {
if (destination != NULL) {
delete destination;
}
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
destination = NULL;
//SESSION
try {
if (session != NULL) {
session->close();
delete session;
}
} catch (Exception& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
session = NULL;
//CONNECTION
try {
if (connection != NULL) {
connection->close();
delete connection;
}
connection = NULL;
} catch (Exception& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
connection = NULL;
activemq::library::ActiveMQCPP::shutdownLibrary();
}
};//class SimpleProducer
SimpleProducer *amq_producer;
void connect(void) {
std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
std::string _destURI = "TEST.FOO";
amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
amq_producer->connect();
}
void disconnect(void) {
if(amq_producer != NULL){
amq_producer->close();
}
delete amq_producer;
}
void send(void) {
amq_producer->send("TEST MESSAGE");
}
int main(int argc, const char* argv[]) {
string str;
connect();
while (str.compare("Q") != 0) {
printf("\n P = to produce\n R = to reset Producer\n Q = quit\n");
getline(cin, str);
if (str.compare("P") == 0) {
send();
} else if (str.compare("R") == 0) {
disconnect();
connect();
}
}
disconnect();
return 0;
}
--
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046566.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
RE: activemq-cpp-library-3.2.2 - reconnect fails
Posted by jandeclercq <ja...@alsic.be>.
Thx for your reply.
Yes.. I delete a bit to much.. just to be sure there wasn't anything open anymore.
When I delete all the deletes the behaviour remaines the same!
grtz..
________________________________________
Van: Nag [via ActiveMQ] [ml-node+3046860-2107221496-202575@n4.nabble.com]
Verzonden: woensdag 17 november 2010 15:37
Aan: Jan Declercq
Onderwerp: RE: activemq-cpp-library-3.2.2 - reconnect fails
Using delete on a pointer to an object not allocated with new gives unpredictable results
Please check where you are mis-managing 'amq_producer'
Thank You
Nag P
-----Original Message-----
From: jandeclercq [mailto:[hidden email]</user/SendEmail.jtp?type=node&node=3046860&i=0>]
Sent: Wednesday, November 17, 2010 6:56 AM
To: [hidden email]</user/SendEmail.jtp?type=node&node=3046860&i=1>
Subject: activemq-cpp-library-3.2.2 - reconnect fails
Hey,
When I disconnected and closed down a producer. And afterwards I'm trying to reconnect, the cpp-library crashes. I don't know what I'm doing wrong.
See the code below (choose option R)
#include <iostream>
#include <stdio.h>
#include <string.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h> #include <decaf/lang/Integer.h> #include <activemq/util/Config.h> #include <decaf/util/Date.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h>
using namespace activemq;
using namespace activemq::transport;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent; using namespace cms; using namespace std;
using namespace std;
class SimpleProducer: public ExceptionListener, public DefaultTransportListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool useTopic;
bool clientAck;
std::string brokerURI;
std::string destURI;
bool isClosed;
public:
SimpleProducer(const std::string& brokerURI, const std::string& destURI, bool useTopic, bool clientAck) {
activemq::library::ActiveMQCPP::initializeLibrary();
this->connection = NULL;
this->session = NULL;
this->destination = NULL;
this->producer = NULL;
this->useTopic = useTopic;
this->brokerURI = brokerURI;
this->destURI = destURI;
this->clientAck = clientAck;
this->isClosed = true;
}
virtual ~SimpleProducer() {
close();
}
void close() {
if (!isClosed) {
isClosed = true;
this->cleanup();
}
}
virtual void connect() {
try {
try {
this->isClosed = false;
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory(this->brokerURI);
// Create a Connection
this->connection = connectionFactory->createConnection();
delete connectionFactory;
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*> (connection);
if (amqConnection != NULL) {
amqConnection->addTransportListener(this);
}
//This will crash the 2nd time!:
this->connection->start();
this->connection->setExceptionListener(this);
} catch (Exception& e) {
throw e;
}
// Create a Session
if (this->clientAck) {
this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
} else {
this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
}
// Create the destination (Topic or Queue)
if (this->useTopic) {
this->destination = this->session->createTopic(this->destURI);
} else {
this->destination = this->session->createQueue(this->destURI);
}
// Create a MessageProducer from the Session to the Topic or Queue
this->producer = this->session->createProducer(this->destination);
this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
fflush(stdout);
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
}
virtual void send(const char *msg) {
try {
string text(msg);
TextMessage* message = this->session->createTextMessage(text);
char logMessage[256];
this->producer->send(message);
//logging:
sprintf(logMessage, "Message '%s' send to serviceBus", msg);
//end logging
delete message;
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
}
//Exception Listener
virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
fprintf(stdout, "!! CMS lib Exception occured !!");
fflush(stdout);
}
//Transport Listener
virtual void transportInterrupted() {
fprintf(stdout, "The Connection's Transport has been Interrupted.");
fflush(stdout);
}
virtual void transportResumed() {
fprintf(stdout, "The Connection's Transport has been Restored.");
fflush(stdout);
}
private:
void cleanup() {
//Producer
try {
if (producer != NULL) {
producer->close();
delete producer;
}
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
producer = NULL;
// Destination
try {
if (destination != NULL) {
delete destination;
}
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
destination = NULL;
//SESSION
try {
if (session != NULL) {
session->close();
delete session;
}
} catch (Exception& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
session = NULL;
//CONNECTION
try {
if (connection != NULL) {
connection->close();
delete connection;
}
connection = NULL;
} catch (Exception& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
connection = NULL;
activemq::library::ActiveMQCPP::shutdownLibrary();
}
};//class SimpleProducer
SimpleProducer *amq_producer;
void connect(void) {
std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
std::string _destURI = "TEST.FOO";
amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
amq_producer->connect();
}
void disconnect(void) {
if(amq_producer != NULL){
amq_producer->close();
}
delete amq_producer;
}
void send(void) {
amq_producer->send("TEST MESSAGE");
}
int main(int argc, const char* argv[]) {
string str;
connect();
while (str.compare("Q") != 0) {
printf("\n P = to produce\n R = to reset Producer\n Q = quit\n");
getline(cin, str);
if (str.compare("P") == 0) {
send();
} else if (str.compare("R") == 0) {
disconnect();
connect();
}
}
disconnect();
return 0;
}
--
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046566.html<http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046566.html?by-user=t>
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
________________________________
View message @ http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046860.html
To unsubscribe from activemq-cpp-library-3.2.2 - reconnect fails, click here<http://activemq.2283324.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=3046566&code=amFuLmRlY2xlcmNxQGFsc2ljLmJlfDMwNDY1NjZ8LTc3OTY2MjEwMw==>.
--
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046987.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
RE: activemq-cpp-library-3.2.2 - reconnect fails
Posted by "Nakarikanti, Nageswara" <NN...@dwd.IN.gov>.
Using delete on a pointer to an object not allocated with new gives unpredictable results
Please check where you are mis-managing 'amq_producer'
Thank You
Nag P
-----Original Message-----
From: jandeclercq [mailto:jan.declercq@alsic.be]
Sent: Wednesday, November 17, 2010 6:56 AM
To: users@activemq.apache.org
Subject: activemq-cpp-library-3.2.2 - reconnect fails
Hey,
When I disconnected and closed down a producer. And afterwards I'm trying to reconnect, the cpp-library crashes. I don't know what I'm doing wrong.
See the code below (choose option R)
#include <iostream>
#include <stdio.h>
#include <string.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h> #include <decaf/lang/Integer.h> #include <activemq/util/Config.h> #include <decaf/util/Date.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h>
using namespace activemq;
using namespace activemq::transport;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent; using namespace cms; using namespace std;
using namespace std;
class SimpleProducer: public ExceptionListener, public DefaultTransportListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool useTopic;
bool clientAck;
std::string brokerURI;
std::string destURI;
bool isClosed;
public:
SimpleProducer(const std::string& brokerURI, const std::string& destURI, bool useTopic, bool clientAck) {
activemq::library::ActiveMQCPP::initializeLibrary();
this->connection = NULL;
this->session = NULL;
this->destination = NULL;
this->producer = NULL;
this->useTopic = useTopic;
this->brokerURI = brokerURI;
this->destURI = destURI;
this->clientAck = clientAck;
this->isClosed = true;
}
virtual ~SimpleProducer() {
close();
}
void close() {
if (!isClosed) {
isClosed = true;
this->cleanup();
}
}
virtual void connect() {
try {
try {
this->isClosed = false;
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory(this->brokerURI);
// Create a Connection
this->connection = connectionFactory->createConnection();
delete connectionFactory;
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*> (connection);
if (amqConnection != NULL) {
amqConnection->addTransportListener(this);
}
//This will crash the 2nd time!:
this->connection->start();
this->connection->setExceptionListener(this);
} catch (Exception& e) {
throw e;
}
// Create a Session
if (this->clientAck) {
this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
} else {
this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
}
// Create the destination (Topic or Queue)
if (this->useTopic) {
this->destination = this->session->createTopic(this->destURI);
} else {
this->destination = this->session->createQueue(this->destURI);
}
// Create a MessageProducer from the Session to the Topic or Queue
this->producer = this->session->createProducer(this->destination);
this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
fflush(stdout);
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
}
virtual void send(const char *msg) {
try {
string text(msg);
TextMessage* message = this->session->createTextMessage(text);
char logMessage[256];
this->producer->send(message);
//logging:
sprintf(logMessage, "Message '%s' send to serviceBus", msg);
//end logging
delete message;
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
}
//Exception Listener
virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
fprintf(stdout, "!! CMS lib Exception occured !!");
fflush(stdout);
}
//Transport Listener
virtual void transportInterrupted() {
fprintf(stdout, "The Connection's Transport has been Interrupted.");
fflush(stdout);
}
virtual void transportResumed() {
fprintf(stdout, "The Connection's Transport has been Restored.");
fflush(stdout);
}
private:
void cleanup() {
//Producer
try {
if (producer != NULL) {
producer->close();
delete producer;
}
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
producer = NULL;
// Destination
try {
if (destination != NULL) {
delete destination;
}
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
destination = NULL;
//SESSION
try {
if (session != NULL) {
session->close();
delete session;
}
} catch (Exception& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
session = NULL;
//CONNECTION
try {
if (connection != NULL) {
connection->close();
delete connection;
}
connection = NULL;
} catch (Exception& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
connection = NULL;
activemq::library::ActiveMQCPP::shutdownLibrary();
}
};//class SimpleProducer
SimpleProducer *amq_producer;
void connect(void) {
std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
std::string _destURI = "TEST.FOO";
amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
amq_producer->connect();
}
void disconnect(void) {
if(amq_producer != NULL){
amq_producer->close();
}
delete amq_producer;
}
void send(void) {
amq_producer->send("TEST MESSAGE");
}
int main(int argc, const char* argv[]) {
string str;
connect();
while (str.compare("Q") != 0) {
printf("\n P = to produce\n R = to reset Producer\n Q = quit\n");
getline(cin, str);
if (str.compare("P") == 0) {
send();
} else if (str.compare("R") == 0) {
disconnect();
connect();
}
}
disconnect();
return 0;
}
--
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046566.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
RE: activemq-cpp-library-3.2.2 - reconnect fails
Posted by jandeclercq <ja...@alsic.be>.
Hey,
Thank you thank you,.. you solved my problem !
It does make my architecture puzzle a lot bigger, but that is the fun in programming, isn't it? :-)
(yes it is a lot bigger than the example I posted)
Thanks!
________________________________________
Van: Timothy Bish [via ActiveMQ] [ml-node+3046971-419590077-202575@n4.nabble.com]
Verzonden: woensdag 17 november 2010 16:31
Aan: Jan Declercq
Onderwerp: Re: activemq-cpp-library-3.2.2 - reconnect fails
On Wed, 2010-11-17 at 03:56 -0800, jandeclercq wrote:
> Hey,
>
> When I disconnected and closed down a producer. And afterwards I'm trying to
> reconnect, the cpp-library crashes. I don't know what I'm doing wrong.
>
> See the code below (choose option R)
>
>
The problem most likely results from the fact that you are calling the
library init and shutdown methods more than once per application run by
placing them in the constructor and cleanup methods of the class. Try
placing them in the main method at start and end.
Regards
>
> #include <iostream>
> #include <stdio.h>
> #include <string.h>
>
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/core/ActiveMQConnection.h>
> #include <activemq/transport/DefaultTransportListener.h>
> #include <activemq/library/ActiveMQCPP.h>
> #include <decaf/lang/Integer.h>
> #include <activemq/util/Config.h>
> #include <decaf/util/Date.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
>
> using namespace activemq;
> using namespace activemq::transport;
> using namespace activemq::core;
> using namespace decaf;
> using namespace decaf::lang;
> using namespace decaf::util;
> using namespace decaf::util::concurrent;
> using namespace cms;
> using namespace std;
>
> using namespace std;
>
> class SimpleProducer: public ExceptionListener, public
> DefaultTransportListener {
> private:
>
> Connection* connection;
> Session* session;
> Destination* destination;
> MessageProducer* producer;
> bool useTopic;
> bool clientAck;
> std::string brokerURI;
> std::string destURI;
> bool isClosed;
>
> public:
> SimpleProducer(const std::string& brokerURI, const std::string& destURI,
> bool useTopic, bool clientAck) {
> activemq::library::ActiveMQCPP::initializeLibrary();
> this->connection = NULL;
> this->session = NULL;
> this->destination = NULL;
> this->producer = NULL;
> this->useTopic = useTopic;
> this->brokerURI = brokerURI;
> this->destURI = destURI;
> this->clientAck = clientAck;
> this->isClosed = true;
> }
>
> virtual ~SimpleProducer() {
> close();
> }
>
> void close() {
> if (!isClosed) {
> isClosed = true;
> this->cleanup();
> }
> }
>
> virtual void connect() {
> try {
> try {
> this->isClosed = false;
> // Create a ConnectionFactory
> ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory(this->brokerURI);
> // Create a Connection
> this->connection = connectionFactory->createConnection();
> delete connectionFactory;
> ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>
> (connection);
> if (amqConnection != NULL) {
> amqConnection->addTransportListener(this);
> }
> //This will crash the 2nd time!:
> this->connection->start();
> this->connection->setExceptionListener(this);
>
> } catch (Exception& e) {
> throw e;
> }
>
> // Create a Session
> if (this->clientAck) {
> this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
> } else {
> this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
> }
>
> // Create the destination (Topic or Queue)
> if (this->useTopic) {
> this->destination = this->session->createTopic(this->destURI);
> } else {
> this->destination = this->session->createQueue(this->destURI);
> }
>
> // Create a MessageProducer from the Session to the Topic or Queue
> this->producer = this->session->createProducer(this->destination);
>
> this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
>
> fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
> fflush(stdout);
>
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> }
>
> virtual void send(const char *msg) {
> try {
>
> string text(msg);
> TextMessage* message = this->session->createTextMessage(text);
>
> char logMessage[256];
>
> this->producer->send(message);
>
> //logging:
> sprintf(logMessage, "Message '%s' send to serviceBus", msg);
>
> //end logging
> delete message;
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> }
> //Exception Listener
> virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
> fprintf(stdout, "!! CMS lib Exception occured !!");
> fflush(stdout);
> }
> //Transport Listener
> virtual void transportInterrupted() {
> fprintf(stdout, "The Connection's Transport has been Interrupted.");
> fflush(stdout);
> }
> virtual void transportResumed() {
> fprintf(stdout, "The Connection's Transport has been Restored.");
> fflush(stdout);
> }
> private:
>
> void cleanup() {
> //Producer
> try {
> if (producer != NULL) {
> producer->close();
> delete producer;
> }
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
>
> }
> producer = NULL;
>
> // Destination
> try {
> if (destination != NULL) {
> delete destination;
> }
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> destination = NULL;
>
> //SESSION
> try {
> if (session != NULL) {
> session->close();
> delete session;
> }
> } catch (Exception& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> session = NULL;
>
> //CONNECTION
> try {
> if (connection != NULL) {
> connection->close();
> delete connection;
> }
> connection = NULL;
> } catch (Exception& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> connection = NULL;
> activemq::library::ActiveMQCPP::shutdownLibrary();
> }
>
> };//class SimpleProducer
>
>
> SimpleProducer *amq_producer;
> void connect(void) {
> std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
> std::string _destURI = "TEST.FOO";
> amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
> amq_producer->connect();
> }
>
> void disconnect(void) {
> if(amq_producer != NULL){
> amq_producer->close();
> }
> delete amq_producer;
> }
> void send(void) {
> amq_producer->send("TEST MESSAGE");
> }
>
> int main(int argc, const char* argv[]) {
> string str;
> connect();
> while (str.compare("Q") != 0) {
> printf("\n P = to produce\n R = to reset Producer\n Q = quit\n");
> getline(cin, str);
>
> if (str.compare("P") == 0) {
> send();
> } else if (str.compare("R") == 0) {
> disconnect();
> connect();
> }
> }
> disconnect();
> return 0;
> }
>
>
>
--
Tim Bish
------------
FuseSource
Email: [hidden email]</user/SendEmail.jtp?type=node&node=3046971&i=0>
Web: http://fusesource.com<http://fusesource.com?by-user=t>
Twitter: tabish121
Blog: http://timbish.blogspot.com/
________________________________
View message @ http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046971.html
To unsubscribe from activemq-cpp-library-3.2.2 - reconnect fails, click here<http://activemq.2283324.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=3046566&code=amFuLmRlY2xlcmNxQGFsc2ljLmJlfDMwNDY1NjZ8LTc3OTY2MjEwMw==>.
--
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046993.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
Re: activemq-cpp-library-3.2.2 - reconnect fails
Posted by Timothy Bish <ta...@gmail.com>.
On Wed, 2010-11-17 at 03:56 -0800, jandeclercq wrote:
> Hey,
>
> When I disconnected and closed down a producer. And afterwards I'm trying to
> reconnect, the cpp-library crashes. I don't know what I'm doing wrong.
>
> See the code below (choose option R)
>
>
The problem most likely results from the fact that you are calling the
library init and shutdown methods more than once per application run by
placing them in the constructor and cleanup methods of the class. Try
placing them in the main method at start and end.
Regards
>
> #include <iostream>
> #include <stdio.h>
> #include <string.h>
>
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/core/ActiveMQConnection.h>
> #include <activemq/transport/DefaultTransportListener.h>
> #include <activemq/library/ActiveMQCPP.h>
> #include <decaf/lang/Integer.h>
> #include <activemq/util/Config.h>
> #include <decaf/util/Date.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
>
> using namespace activemq;
> using namespace activemq::transport;
> using namespace activemq::core;
> using namespace decaf;
> using namespace decaf::lang;
> using namespace decaf::util;
> using namespace decaf::util::concurrent;
> using namespace cms;
> using namespace std;
>
> using namespace std;
>
> class SimpleProducer: public ExceptionListener, public
> DefaultTransportListener {
> private:
>
> Connection* connection;
> Session* session;
> Destination* destination;
> MessageProducer* producer;
> bool useTopic;
> bool clientAck;
> std::string brokerURI;
> std::string destURI;
> bool isClosed;
>
> public:
> SimpleProducer(const std::string& brokerURI, const std::string& destURI,
> bool useTopic, bool clientAck) {
> activemq::library::ActiveMQCPP::initializeLibrary();
> this->connection = NULL;
> this->session = NULL;
> this->destination = NULL;
> this->producer = NULL;
> this->useTopic = useTopic;
> this->brokerURI = brokerURI;
> this->destURI = destURI;
> this->clientAck = clientAck;
> this->isClosed = true;
> }
>
> virtual ~SimpleProducer() {
> close();
> }
>
> void close() {
> if (!isClosed) {
> isClosed = true;
> this->cleanup();
> }
> }
>
> virtual void connect() {
> try {
> try {
> this->isClosed = false;
> // Create a ConnectionFactory
> ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory(this->brokerURI);
> // Create a Connection
> this->connection = connectionFactory->createConnection();
> delete connectionFactory;
> ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>
> (connection);
> if (amqConnection != NULL) {
> amqConnection->addTransportListener(this);
> }
> //This will crash the 2nd time!:
> this->connection->start();
> this->connection->setExceptionListener(this);
>
> } catch (Exception& e) {
> throw e;
> }
>
> // Create a Session
> if (this->clientAck) {
> this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
> } else {
> this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
> }
>
> // Create the destination (Topic or Queue)
> if (this->useTopic) {
> this->destination = this->session->createTopic(this->destURI);
> } else {
> this->destination = this->session->createQueue(this->destURI);
> }
>
> // Create a MessageProducer from the Session to the Topic or Queue
> this->producer = this->session->createProducer(this->destination);
>
> this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
>
> fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
> fflush(stdout);
>
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> }
>
> virtual void send(const char *msg) {
> try {
>
> string text(msg);
> TextMessage* message = this->session->createTextMessage(text);
>
> char logMessage[256];
>
> this->producer->send(message);
>
> //logging:
> sprintf(logMessage, "Message '%s' send to serviceBus", msg);
>
> //end logging
> delete message;
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> }
> //Exception Listener
> virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
> fprintf(stdout, "!! CMS lib Exception occured !!");
> fflush(stdout);
> }
> //Transport Listener
> virtual void transportInterrupted() {
> fprintf(stdout, "The Connection's Transport has been Interrupted.");
> fflush(stdout);
> }
> virtual void transportResumed() {
> fprintf(stdout, "The Connection's Transport has been Restored.");
> fflush(stdout);
> }
> private:
>
> void cleanup() {
> //Producer
> try {
> if (producer != NULL) {
> producer->close();
> delete producer;
> }
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
>
> }
> producer = NULL;
>
> // Destination
> try {
> if (destination != NULL) {
> delete destination;
> }
> } catch (CMSException& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> destination = NULL;
>
> //SESSION
> try {
> if (session != NULL) {
> session->close();
> delete session;
> }
> } catch (Exception& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> session = NULL;
>
> //CONNECTION
> try {
> if (connection != NULL) {
> connection->close();
> delete connection;
> }
> connection = NULL;
> } catch (Exception& e) {
> fprintf(stdout, e.getMessage().c_str());
> fflush(stdout);
> }
> connection = NULL;
> activemq::library::ActiveMQCPP::shutdownLibrary();
> }
>
> };//class SimpleProducer
>
>
> SimpleProducer *amq_producer;
> void connect(void) {
> std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
> std::string _destURI = "TEST.FOO";
> amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
> amq_producer->connect();
> }
>
> void disconnect(void) {
> if(amq_producer != NULL){
> amq_producer->close();
> }
> delete amq_producer;
> }
> void send(void) {
> amq_producer->send("TEST MESSAGE");
> }
>
> int main(int argc, const char* argv[]) {
> string str;
> connect();
> while (str.compare("Q") != 0) {
> printf("\n P = to produce\n R = to reset Producer\n Q = quit\n");
> getline(cin, str);
>
> if (str.compare("P") == 0) {
> send();
> } else if (str.compare("R") == 0) {
> disconnect();
> connect();
> }
> }
> disconnect();
> return 0;
> }
>
>
>
--
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/
Re: activemq-cpp-library-3.2.2 - reconnect fails
Posted by Nag <NN...@dwd.IN.gov>.
Using delete on a pointer to an object not allocated with new gives
unpredictable results
Please check where you are mis-managing 'amq_producer'
--
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046865.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.