You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/10 19:44:51 UTC
svn commit: r703532 - in /incubator/qpid/trunk/qpid/cpp:
examples/failover/direct_producer.cpp examples/failover/listener.cpp
src/qpid/client/Connection.cpp src/qpid/client/ConnectionImpl.cpp
src/qpid/client/FailoverListener.cpp
Author: aconway
Date: Fri Oct 10 10:44:50 2008
New Revision: 703532
URL: http://svn.apache.org/viewvc?rev=703532&view=rev
Log:
Failover client and example fixes & tidy up.
Modified:
incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp?rev=703532&r1=703531&r2=703532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp Fri Oct 10 10:44:50 2008
@@ -22,127 +22,129 @@
int
main ( int argc, char ** argv)
{
- struct timeval broker_killed_time = {0,0},
- failover_complete_time = {0,0},
- duration = {0,0};
-
-
- if ( argc < 3 )
- {
- std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n";
- std::cerr << "i.e. for host: 127.0.0.1\n";
- exit(1);
- }
-
- char const * host = argv[1];
- int port = atoi(argv[2]);
- char const * broker_to_kill = 0;
-
- if ( argc > 3 )
- {
- broker_to_kill = argv[3];
- std::cerr << "main: Broker marked for death is process ID "
- << broker_to_kill
- << endl;
- }
- else
- {
- std::cerr << "PRODUCER main: there is no broker to kill.\n";
- }
-
- FailoverConnection connection;
- FailoverSession * session;
- Message message;
-
- string program_name = "PRODUCER";
-
-
- connection.failoverCompleteTime = & failover_complete_time;
- connection.name = program_name;
- connection.open ( host, port );
-
- session = connection.newSession();
- session->name = program_name;
-
- int send_this_many = 30,
- messages_sent = 0;
-
- while ( messages_sent < send_this_many )
- {
- if ( (messages_sent == 13) && broker_to_kill )
- {
- char command[1000];
- std::cerr << program_name << " killing broker " << broker_to_kill << ".\n";
- sprintf(command, "kill -9 %s", broker_to_kill);
- system ( command );
- gettimeofday ( & broker_killed_time, 0 );
- }
-
- message.getDeliveryProperties().setRoutingKey("routing_key");
-
- std::cerr << "sending message "
- << messages_sent
- << " of "
- << send_this_many
- << ".\n";
-
- stringstream message_data;
- message_data << messages_sent;
- message.setData(message_data.str());
-
- try
- {
- /* MICK FIXME
- session.messageTransfer ( arg::content=message,
- arg::destination="amq.direct"
- ); */
- session->messageTransfer ( "amq.direct",
- 1,
- 0,
- message
- );
- }
- catch ( const std::exception& error)
- {
- cerr << program_name << " exception: " << error.what() << endl;
- }
-
- sleep ( 1 );
- ++ messages_sent;
- }
-
- message.setData ( "That's all, folks!" );
-
- /* MICK FIXME
- session.messageTransfer ( arg::content=message,
- arg::destination="amq.direct"
- );
- */
- session->messageTransfer ( "amq.direct",
- 1,
- 0,
- message
- );
-
- session->sync();
- connection.close();
-
- // This will be incorrect if you killed more than one...
- if ( broker_to_kill )
- {
- timersub ( & failover_complete_time,
- & broker_killed_time,
- & duration
- );
- fprintf ( stderr,
- "Failover time: %ld.%.6ld\n",
- duration.tv_sec,
- duration.tv_usec
+ try {
+ struct timeval broker_killed_time = {0,0};
+ struct timeval failover_complete_time = {0,0};
+ struct timeval duration = {0,0};
+
+
+ if ( argc < 3 )
+ {
+ std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n";
+ std::cerr << "i.e. for host: 127.0.0.1\n";
+ exit(1);
+ }
+
+ char const * host = argv[1];
+ int port = atoi(argv[2]);
+ char const * broker_to_kill = 0;
+
+ if ( argc > 3 )
+ {
+ broker_to_kill = argv[3];
+ std::cerr << "main: Broker marked for death is process ID "
+ << broker_to_kill
+ << endl;
+ }
+ else
+ {
+ std::cerr << "PRODUCER main: there is no broker to kill.\n";
+ }
+
+ FailoverConnection connection;
+ FailoverSession * session;
+ Message message;
+
+ string program_name = "PRODUCER";
+
+
+ connection.failoverCompleteTime = & failover_complete_time;
+ connection.name = program_name;
+ connection.open ( host, port );
+
+ session = connection.newSession();
+ session->name = program_name;
+
+ int send_this_many = 30,
+ messages_sent = 0;
+
+ while ( messages_sent < send_this_many )
+ {
+ if ( (messages_sent == 13) && broker_to_kill )
+ {
+ char command[1000];
+ std::cerr << program_name << " killing broker " << broker_to_kill << ".\n";
+ sprintf(command, "kill -9 %s", broker_to_kill);
+ system ( command );
+ gettimeofday ( & broker_killed_time, 0 );
+ }
+
+ message.getDeliveryProperties().setRoutingKey("routing_key");
+
+ std::cerr << "sending message "
+ << messages_sent
+ << " of "
+ << send_this_many
+ << ".\n";
+
+ stringstream message_data;
+ message_data << messages_sent;
+ message.setData(message_data.str());
+
+ try
+ {
+ /* MICK FIXME
+ session.messageTransfer ( arg::content=message,
+ arg::destination="amq.direct"
+ ); */
+ session->messageTransfer ( "amq.direct",
+ 1,
+ 0,
+ message
+ );
+ }
+ catch ( const std::exception& error)
+ {
+ cerr << program_name << " exception: " << error.what() << endl;
+ }
+
+ sleep ( 1 );
+ ++ messages_sent;
+ }
+
+ message.setData ( "That's all, folks!" );
+
+ /* MICK FIXME
+ session.messageTransfer ( arg::content=message,
+ arg::destination="amq.direct"
+ );
+ */
+ session->messageTransfer ( "amq.direct",
+ 1,
+ 0,
+ message
+ );
+
+ session->sync();
+ connection.close();
+
+ // This will be incorrect if you killed more than one...
+ if ( broker_to_kill )
+ {
+ timersub ( & failover_complete_time,
+ & broker_killed_time,
+ & duration
);
- }
+ fprintf ( stderr,
+ "Failover time: %ld.%.6ld\n",
+ duration.tv_sec,
+ duration.tv_usec
+ );
+ }
+ return 0;
- return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
}
-
-
-
Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp?rev=703532&r1=703531&r2=703532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp Fri Oct 10 10:44:50 2008
@@ -20,119 +20,119 @@
struct Recorder
{
- unsigned int max_messages;
- unsigned int * messages_received;
+ unsigned int max_messages;
+ unsigned int * messages_received;
- Recorder ( )
- {
- max_messages = 1000;
- messages_received = new unsigned int [ max_messages ];
- memset ( messages_received, 0, max_messages * sizeof(int) );
- }
+ Recorder ( )
+ {
+ max_messages = 1000;
+ messages_received = new unsigned int [ max_messages ];
+ memset ( messages_received, 0, max_messages * sizeof(int) );
+ }
- void
- received ( int i )
- {
- messages_received[i] ++;
- }
+ void
+ received ( int i )
+ {
+ messages_received[i] ++;
+ }
- void
- report ( )
- {
- int i;
+ void
+ report ( )
+ {
+ int i;
- int last_received_message = 0;
+ int last_received_message = 0;
- vector<unsigned int> missed_messages,
- multiple_messages;
+ vector<unsigned int> missed_messages,
+ multiple_messages;
- /*----------------------------------------------------
- Collect indices of missed and multiple messages.
- ----------------------------------------------------*/
- bool seen_first_message = false;
- for ( i = max_messages - 1; i >= 0; -- i )
- {
- if ( ! seen_first_message )
- {
- if ( messages_received [i] > 0 )
+ /*----------------------------------------------------
+ Collect indices of missed and multiple messages.
+ ----------------------------------------------------*/
+ bool seen_first_message = false;
+ for ( i = max_messages - 1; i >= 0; -- i )
{
- seen_first_message = true;
- last_received_message = i;
+ if ( ! seen_first_message )
+ {
+ if ( messages_received [i] > 0 )
+ {
+ seen_first_message = true;
+ last_received_message = i;
+ }
+ }
+ else
+ {
+ if ( messages_received [i] == 0 )
+ missed_messages.push_back ( i );
+ else
+ if ( messages_received [i] > 1 )
+ {
+ multiple_messages.push_back ( i );
+ }
+ }
}
- }
- else
- {
- if ( messages_received [i] == 0 )
- missed_messages.push_back ( i );
- else
- if ( messages_received [i] > 1 )
+
+ /*--------------------------------------------
+ Report missed messages.
+ --------------------------------------------*/
+ char const * verb = ( missed_messages.size() == 1 )
+ ? " was "
+ : " were ";
+
+ char const * plural = ( missed_messages.size() == 1 )
+ ? "."
+ : "s.";
+
+ std::cerr << "Listener::shutdown: There"
+ << verb
+ << missed_messages.size()
+ << " missed message"
+ << plural
+ << endl;
+
+ for ( i = 0; i < int(missed_messages.size()); ++ i )
{
- multiple_messages.push_back ( i );
+ std::cerr << " " << i << " was missed.\n";
}
- }
- }
- /*--------------------------------------------
- Report missed messages.
- --------------------------------------------*/
- char const * verb = ( missed_messages.size() == 1 )
- ? " was "
- : " were ";
-
- char const * plural = ( missed_messages.size() == 1 )
- ? "."
- : "s.";
-
- std::cerr << "Listener::shutdown: There"
- << verb
- << missed_messages.size()
- << " missed message"
- << plural
- << endl;
-
- for ( i = 0; i < int(missed_messages.size()); ++ i )
- {
- std::cerr << " " << i << " was missed.\n";
- }
+ /*--------------------------------------------
+ Report multiple messages.
+ --------------------------------------------*/
+ verb = ( multiple_messages.size() == 1 )
+ ? " was "
+ : " were ";
+
+ plural = ( multiple_messages.size() == 1 )
+ ? "."
+ : "s.";
+
+ std::cerr << "Listener::shutdown: There"
+ << verb
+ << multiple_messages.size()
+ << " multiple message"
+ << plural
+ << endl;
- /*--------------------------------------------
- Report multiple messages.
- --------------------------------------------*/
- verb = ( multiple_messages.size() == 1 )
- ? " was "
- : " were ";
-
- plural = ( multiple_messages.size() == 1 )
- ? "."
- : "s.";
-
- std::cerr << "Listener::shutdown: There"
- << verb
- << multiple_messages.size()
- << " multiple message"
- << plural
- << endl;
-
- for ( i = 0; i < int(multiple_messages.size()); ++ i )
- {
- std::cerr << " "
- << multiple_messages[i]
- << " was received "
- << messages_received [ multiple_messages[i] ]
- << " times.\n";
- }
+ for ( i = 0; i < int(multiple_messages.size()); ++ i )
+ {
+ std::cerr << " "
+ << multiple_messages[i]
+ << " was received "
+ << messages_received [ multiple_messages[i] ]
+ << " times.\n";
+ }
- /*
- for ( i = 0; i < last_received_message; ++ i )
- {
- std::cerr << "Message " << i << ": " << messages_received[i] << std::endl;
+ /*
+ for ( i = 0; i < last_received_message; ++ i )
+ {
+ std::cerr << "Message " << i << ": " << messages_received[i] << std::endl;
+ }
+ */
}
- */
- }
};
@@ -141,18 +141,18 @@
struct Listener : public MessageListener
{
- FailoverSubscriptionManager & subscriptionManager;
- Recorder & recorder;
+ FailoverSubscriptionManager & subscriptionManager;
+ Recorder & recorder;
- Listener ( FailoverSubscriptionManager& subs,
- Recorder & recorder
- );
+ Listener ( FailoverSubscriptionManager& subs,
+ Recorder & recorder
+ );
- void shutdown() { recorder.report(); }
- void parse_message ( std::string const & msg );
+ void shutdown() { recorder.report(); }
+ void parse_message ( std::string const & msg );
- virtual void received ( Message & message );
+ virtual void received ( Message & message );
};
@@ -160,8 +160,8 @@
Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) :
- subscriptionManager(s),
- recorder(r)
+ subscriptionManager(s),
+ recorder(r)
{
}
@@ -172,19 +172,19 @@
void
Listener::received ( Message & message )
{
- std::cerr << "Listener received: " << message.getData() << std::endl;
- if (message.getData() == "That's all, folks!")
- {
- std::cout << "Shutting down listener for " << message.getDestination()
- << std::endl;
- subscriptionManager.cancel(message.getDestination());
-
- shutdown();
- }
- else
- {
- parse_message ( message.getData() );
- }
+ std::cerr << "Listener received: " << message.getData() << std::endl;
+ if (message.getData() == "That's all, folks!")
+ {
+ std::cout << "Shutting down listener for " << message.getDestination()
+ << std::endl;
+ subscriptionManager.cancel(message.getDestination());
+
+ shutdown();
+ }
+ else
+ {
+ parse_message ( message.getData() );
+ }
}
@@ -194,13 +194,13 @@
void
Listener::parse_message ( const std::string & msg )
{
- int msg_number;
- if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) )
- {
- std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n";
- return;
- }
- recorder.received ( msg_number );
+ int msg_number;
+ if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) )
+ {
+ std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n";
+ return;
+ }
+ recorder.received ( msg_number );
}
@@ -211,37 +211,42 @@
int
main ( int argc, char ** argv )
{
- string program_name = "LISTENER";
+ try {
+ string program_name = "LISTENER";
- if ( argc < 3 )
- {
- std::cerr << "Usage: ./listener host cluster_port_file_name\n";
- std::cerr << "i.e. for host: 127.0.0.1\n";
- exit(1);
- }
-
- char const * host = argv[1];
- int port = atoi(argv[2]);
-
- FailoverConnection connection;
- FailoverSession * session;
- Recorder recorder;
-
- connection.name = program_name;
+ if ( argc < 3 )
+ {
+ std::cerr << "Usage: ./listener host cluster_port_file_name\n";
+ std::cerr << "i.e. for host: 127.0.0.1\n";
+ exit(1);
+ }
- connection.open ( host, port );
- session = connection.newSession();
- session->name = program_name;
+ char const * host = argv[1];
+ int port = atoi(argv[2]);
- FailoverSubscriptionManager subscriptions ( session );
- subscriptions.name = program_name;
- Listener listener ( subscriptions, recorder );
- subscriptions.subscribe ( listener, "message_queue" );
- subscriptions.run ( );
+ FailoverConnection connection;
+ FailoverSession * session;
+ Recorder recorder;
+
+ connection.name = program_name;
+
+ connection.open ( host, port );
+ session = connection.newSession();
+ session->name = program_name;
+
+ FailoverSubscriptionManager subscriptions ( session );
+ subscriptions.name = program_name;
+ Listener listener ( subscriptions, recorder );
+ subscriptions.subscribe ( listener, "message_queue" );
+ subscriptions.run ( );
- connection.close();
+ connection.close();
+ return 0;
- return 1;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=703532&r1=703531&r2=703532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Fri Oct 10 10:44:50 2008
@@ -139,7 +139,7 @@
}
std::vector<Url> Connection::getKnownBrokers() {
- return isOpen() ? impl->getKnownBrokers() : std::vector<Url>();
+ return impl ? impl->getKnownBrokers() : std::vector<Url>();
}
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=703532&r1=703531&r2=703532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri Oct 10 10:44:50 2008
@@ -151,11 +151,8 @@
void ConnectionImpl::shutdown() {
Mutex::ScopedLock l(lock);
- if (handler.isClosed())
- {
- std::cerr << "MDEBUG ConnectionImpl::shutdown -- returning w/o failure callback!\n";
- return;
- }
+ if (handler.isClosed()) return;
+
// FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
// an appropriate close-code. connection-forced is not right.
if (!handler.isClosing())
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=703532&r1=703531&r2=703532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Fri Oct 10 10:44:50 2008
@@ -64,7 +64,10 @@
subscriptions.reset();
}
FailoverListener::~FailoverListener() {
- stop();
+ try { stop(); }
+ catch (const std::exception& e) {
+ QPID_LOG(warning, QPID_MSG("Ignoring exception in destructor" << e.what()));
+ }
}
void FailoverListener::received(Message& msg) {