You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/10 19:44:51 UTC

svn commit: r703532 - in /incubator/qpid/trunk/qpid/cpp: examples/failover/direct_producer.cpp examples/failover/listener.cpp src/qpid/client/Connection.cpp src/qpid/client/ConnectionImpl.cpp src/qpid/client/FailoverListener.cpp

Author: aconway
Date: Fri Oct 10 10:44:50 2008
New Revision: 703532

URL: http://svn.apache.org/viewvc?rev=703532&view=rev
Log:
Failover client and example fixes & tidy up.

Modified:
    incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
    incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp?rev=703532&r1=703531&r2=703532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp Fri Oct 10 10:44:50 2008
@@ -22,127 +22,129 @@
 int 
 main ( int argc, char ** argv) 
 {
-  struct timeval broker_killed_time     = {0,0},
-                 failover_complete_time = {0,0},
-                 duration               = {0,0};
-
-
-  if ( argc < 3 )
-  {
-    std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n";
-    std::cerr << "i.e. for host: 127.0.0.1\n";
-    exit(1);
-  }
-
-  char const * host = argv[1];
-  int  port = atoi(argv[2]);
-  char const * broker_to_kill = 0;
-
-  if ( argc > 3 )
-  {
-    broker_to_kill = argv[3];
-    std::cerr << "main:  Broker marked for death is process ID " 
-              << broker_to_kill
-              << endl;
-  }
-  else
-  {
-    std::cerr << "PRODUCER main: there is no broker to kill.\n";
-  }
-
-  FailoverConnection connection;
-  FailoverSession    * session;
-  Message message;
-
-  string program_name = "PRODUCER";
-
-
-  connection.failoverCompleteTime = & failover_complete_time;
-  connection.name = program_name;
-  connection.open ( host, port );
-
-  session = connection.newSession();
-  session->name    = program_name;
-
-  int send_this_many = 30,
-      messages_sent  = 0;
-
-  while ( messages_sent < send_this_many )
-  {
-    if ( (messages_sent == 13) && broker_to_kill )
-    {
-      char command[1000];
-      std::cerr << program_name << " killing broker " << broker_to_kill << ".\n";
-      sprintf(command, "kill -9 %s", broker_to_kill);
-      system ( command );
-      gettimeofday ( & broker_killed_time, 0 );
-    }
-
-    message.getDeliveryProperties().setRoutingKey("routing_key"); 
-
-    std::cerr << "sending message " 
-              << messages_sent
-              << " of " 
-              << send_this_many 
-              << ".\n";
-
-    stringstream message_data;
-    message_data << messages_sent;
-    message.setData(message_data.str());
-
-    try
-    {
-      /* MICK FIXME
-      session.messageTransfer ( arg::content=message,  
-                                arg::destination="amq.direct"
-                              ); */
-      session->messageTransfer ( "amq.direct",
-                                1,
-                                0,
-                                message
-                              );
-    }
-    catch ( const std::exception& error) 
-    {
-      cerr << program_name << " exception: " << error.what() << endl;
-    }
-
-    sleep ( 1 );
-    ++ messages_sent;
-  }
-
-  message.setData ( "That's all, folks!" );
-
-  /* MICK FIXME
-  session.messageTransfer ( arg::content=message,  
-                            arg::destination="amq.direct"
-                          ); 
-   */
-  session->messageTransfer ( "amq.direct",
-                            1,
-                            0,
-                            message
-                          ); 
-
-  session->sync();
-  connection.close();
-
-  // This will be incorrect if you killed more than one...
-  if ( broker_to_kill )
-  {
-    timersub ( & failover_complete_time, 
-               & broker_killed_time, 
-               & duration 
-             );
-    fprintf ( stderr, 
-              "Failover time: %ld.%.6ld\n",
-              duration.tv_sec,
-              duration.tv_usec
+    try {
+        struct timeval broker_killed_time     = {0,0};
+        struct timeval failover_complete_time = {0,0};
+        struct timeval duration               = {0,0};
+
+
+        if ( argc < 3 )
+        {
+            std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n";
+            std::cerr << "i.e. for host: 127.0.0.1\n";
+            exit(1);
+        }
+
+        char const * host = argv[1];
+        int  port = atoi(argv[2]);
+        char const * broker_to_kill = 0;
+
+        if ( argc > 3 )
+        {
+            broker_to_kill = argv[3];
+            std::cerr << "main:  Broker marked for death is process ID " 
+                      << broker_to_kill
+                      << endl;
+        }
+        else
+        {
+            std::cerr << "PRODUCER main: there is no broker to kill.\n";
+        }
+
+        FailoverConnection connection;
+        FailoverSession    * session;
+        Message message;
+
+        string program_name = "PRODUCER";
+
+
+        connection.failoverCompleteTime = & failover_complete_time;
+        connection.name = program_name;
+        connection.open ( host, port );
+
+        session = connection.newSession();
+        session->name    = program_name;
+
+        int send_this_many = 30,
+            messages_sent  = 0;
+
+        while ( messages_sent < send_this_many )
+        {
+            if ( (messages_sent == 13) && broker_to_kill )
+            {
+                char command[1000];
+                std::cerr << program_name << " killing broker " << broker_to_kill << ".\n";
+                sprintf(command, "kill -9 %s", broker_to_kill);
+                system ( command );
+                gettimeofday ( & broker_killed_time, 0 );
+            }
+
+            message.getDeliveryProperties().setRoutingKey("routing_key"); 
+
+            std::cerr << "sending message " 
+                      << messages_sent
+                      << " of " 
+                      << send_this_many 
+                      << ".\n";
+
+            stringstream message_data;
+            message_data << messages_sent;
+            message.setData(message_data.str());
+
+            try
+            {
+                /* MICK FIXME
+                   session.messageTransfer ( arg::content=message,  
+                   arg::destination="amq.direct"
+                   ); */
+                session->messageTransfer ( "amq.direct",
+                                           1,
+                                           0,
+                                           message
+                );
+            }
+            catch ( const std::exception& error) 
+            {
+                cerr << program_name << " exception: " << error.what() << endl;
+            }
+
+            sleep ( 1 );
+            ++ messages_sent;
+        }
+
+        message.setData ( "That's all, folks!" );
+
+        /* MICK FIXME
+           session.messageTransfer ( arg::content=message,  
+           arg::destination="amq.direct"
+           ); 
+        */
+        session->messageTransfer ( "amq.direct",
+                                   1,
+                                   0,
+                                   message
+        ); 
+
+        session->sync();
+        connection.close();
+
+        // This will be incorrect if you killed more than one...
+        if ( broker_to_kill )
+        {
+            timersub ( & failover_complete_time, 
+                       & broker_killed_time, 
+                       & duration 
             );
-  }
+            fprintf ( stderr, 
+                      "Failover time: %ld.%.6ld\n",
+                      duration.tv_sec,
+                      duration.tv_usec
+            );
+        }
+        return 0;  
 
-  return 0;  
+    } catch(const std::exception& error) {
+        std::cout << error.what() << std::endl;
+    }
+    return 1;
 }
-
-
-

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp?rev=703532&r1=703531&r2=703532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp Fri Oct 10 10:44:50 2008
@@ -20,119 +20,119 @@
 
 struct Recorder 
 {
-  unsigned int max_messages;
-  unsigned int * messages_received;
+    unsigned int max_messages;
+    unsigned int * messages_received;
 
-  Recorder ( )
-  {
-    max_messages = 1000;
-    messages_received = new unsigned int [ max_messages ];
-    memset ( messages_received, 0, max_messages * sizeof(int) );
-  }
+    Recorder ( )
+    {
+        max_messages = 1000;
+        messages_received = new unsigned int [ max_messages ];
+        memset ( messages_received, 0, max_messages * sizeof(int) );
+    }
 
 
-  void
-  received ( int i )
-  {
-    messages_received[i] ++;
-  }
+    void
+    received ( int i )
+    {
+        messages_received[i] ++;
+    }
 
 
 
-  void
-  report ( )
-  {
-    int i;
+    void
+    report ( )
+    {
+        int i;
 
-    int last_received_message = 0;
+        int last_received_message = 0;
 
-    vector<unsigned int> missed_messages,
-                         multiple_messages;
+        vector<unsigned int> missed_messages,
+            multiple_messages;
 
-    /*----------------------------------------------------
-       Collect indices of missed and multiple messages.
-    ----------------------------------------------------*/
-    bool seen_first_message = false;
-    for ( i = max_messages - 1; i >= 0; -- i )
-    {
-      if ( ! seen_first_message )
-      {
-        if ( messages_received [i] > 0 )
+        /*----------------------------------------------------
+          Collect indices of missed and multiple messages.
+          ----------------------------------------------------*/
+        bool seen_first_message = false;
+        for ( i = max_messages - 1; i >= 0; -- i )
         {
-          seen_first_message = true;
-          last_received_message = i;
+            if ( ! seen_first_message )
+            {
+                if ( messages_received [i] > 0 )
+                {
+                    seen_first_message = true;
+                    last_received_message = i;
+                }
+            }
+            else
+            {
+                if ( messages_received [i] == 0 )
+                    missed_messages.push_back ( i );
+                else
+                    if ( messages_received [i] > 1 )
+                    {
+                        multiple_messages.push_back ( i );
+                    }
+            }
         }
-      }
-      else
-      {
-        if ( messages_received [i] == 0 )
-          missed_messages.push_back ( i );
-        else
-        if ( messages_received [i] > 1 )
+
+        /*--------------------------------------------
+          Report missed messages.
+          --------------------------------------------*/
+        char const * verb = ( missed_messages.size() == 1 ) 
+            ? " was "
+            : " were ";
+
+        char const * plural = ( missed_messages.size() == 1 ) 
+            ? "."
+            : "s.";
+
+        std::cerr << "Listener::shutdown: There"
+                  << verb
+                  << missed_messages.size()
+                  << " missed message"
+                  << plural
+                  << endl;
+
+        for ( i = 0; i < int(missed_messages.size()); ++ i )
         {
-          multiple_messages.push_back ( i );
+            std::cerr << "    " << i << " was missed.\n";
         }
-      }
-    }
 
-    /*--------------------------------------------
-       Report missed messages.
-    --------------------------------------------*/
-    char const * verb = ( missed_messages.size() == 1 ) 
-                        ? " was "
-                        : " were ";
-
-    char const * plural = ( missed_messages.size() == 1 ) 
-                          ? "."
-                          : "s.";
-
-    std::cerr << "Listener::shutdown: There"
-              << verb
-              << missed_messages.size()
-              << " missed message"
-              << plural
-              << endl;
-
-    for ( i = 0; i < int(missed_messages.size()); ++ i )
-    {
-      std::cerr << "    " << i << " was missed.\n";
-    }
 
+        /*--------------------------------------------
+          Report multiple messages.
+          --------------------------------------------*/
+        verb = ( multiple_messages.size() == 1 ) 
+            ? " was "
+            : " were ";
+
+        plural = ( multiple_messages.size() == 1 ) 
+            ? "."
+            : "s.";
+
+        std::cerr << "Listener::shutdown: There"
+                  << verb
+                  << multiple_messages.size()
+                  << " multiple message"
+                  << plural
+                  << endl;
 
-    /*--------------------------------------------
-       Report multiple messages.
-    --------------------------------------------*/
-    verb = ( multiple_messages.size() == 1 ) 
-           ? " was "
-           : " were ";
-
-    plural = ( multiple_messages.size() == 1 ) 
-           ? "."
-           : "s.";
-
-    std::cerr << "Listener::shutdown: There"
-              << verb
-              << multiple_messages.size()
-              << " multiple message"
-              << plural
-              << endl;
-
-    for ( i = 0; i < int(multiple_messages.size()); ++ i )
-    {
-      std::cerr << "    " 
-                << multiple_messages[i] 
-                << " was received " 
-                << messages_received [ multiple_messages[i] ]
-                << " times.\n";
-    }
+        for ( i = 0; i < int(multiple_messages.size()); ++ i )
+        {
+            std::cerr << "    " 
+                      << multiple_messages[i] 
+                      << " was received " 
+                      << messages_received [ multiple_messages[i] ]
+                      << " times.\n";
+        }
     
-    /*
-    for ( i = 0; i < last_received_message; ++ i )
-    {
-      std::cerr << "Message " << i << ": " << messages_received[i] << std::endl;
+        /*
+          for ( i = 0; i < last_received_message; ++ i )
+          {
+          std::cerr << "Message " << i << ": " << messages_received[i] << std::endl;
+          }
+        */
     }
-    */
-  }
 
 };
 
@@ -141,18 +141,18 @@
 
 struct Listener : public MessageListener
 {
-  FailoverSubscriptionManager & subscriptionManager;
-  Recorder & recorder;
+    FailoverSubscriptionManager & subscriptionManager;
+    Recorder & recorder;
 
 
-  Listener ( FailoverSubscriptionManager& subs, 
-             Recorder & recorder
-           );
+    Listener ( FailoverSubscriptionManager& subs, 
+               Recorder & recorder
+    );
 
-  void shutdown() { recorder.report(); }
-  void parse_message ( std::string const & msg );
+    void shutdown() { recorder.report(); }
+    void parse_message ( std::string const & msg );
   
-  virtual void received ( Message & message );
+    virtual void received ( Message & message );
 };
 
 
@@ -160,8 +160,8 @@
 
 
 Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) : 
-  subscriptionManager(s),
-  recorder(r)
+    subscriptionManager(s),
+    recorder(r)
 {
 }
 
@@ -172,19 +172,19 @@
 void 
 Listener::received ( Message & message ) 
 {
-  std::cerr << "Listener received: " << message.getData() << std::endl;
-  if (message.getData() == "That's all, folks!") 
-  {
-    std::cout << "Shutting down listener for " << message.getDestination()
-              << std::endl;
-    subscriptionManager.cancel(message.getDestination());
-
-    shutdown();
-  }
-  else
-  {
-    parse_message ( message.getData() );
-  }
+    std::cerr << "Listener received: " << message.getData() << std::endl;
+    if (message.getData() == "That's all, folks!") 
+    {
+        std::cout << "Shutting down listener for " << message.getDestination()
+                  << std::endl;
+        subscriptionManager.cancel(message.getDestination());
+
+        shutdown();
+    }
+    else
+    {
+        parse_message ( message.getData() );
+    }
 }
 
 
@@ -194,13 +194,13 @@
 void
 Listener::parse_message ( const std::string & msg )
 {
-  int msg_number;
-  if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) ) 
-  {
-    std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n";
-    return;
-  }
-  recorder.received ( msg_number );
+    int msg_number;
+    if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) ) 
+    {
+        std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n";
+        return;
+    }
+    recorder.received ( msg_number );
 }
 
 
@@ -211,37 +211,42 @@
 int 
 main ( int argc, char ** argv ) 
 {
-  string program_name = "LISTENER";
+    try {
+        string program_name = "LISTENER";
 
-  if ( argc < 3 )
-  {
-    std::cerr << "Usage: ./listener host cluster_port_file_name\n";
-    std::cerr << "i.e. for host: 127.0.0.1\n";
-    exit(1);
-  }
-
-  char const * host = argv[1];
-  int port = atoi(argv[2]);
-
-  FailoverConnection connection;
-  FailoverSession    * session;
-  Recorder recorder;
-
-  connection.name = program_name; 
+        if ( argc < 3 )
+        {
+            std::cerr << "Usage: ./listener host cluster_port_file_name\n";
+            std::cerr << "i.e. for host: 127.0.0.1\n";
+            exit(1);
+        }
 
-  connection.open ( host, port );
-  session = connection.newSession();
-  session->name = program_name; 
+        char const * host = argv[1];
+        int port = atoi(argv[2]);
 
-  FailoverSubscriptionManager subscriptions ( session );
-  subscriptions.name = program_name;
-  Listener listener ( subscriptions, recorder );
-  subscriptions.subscribe ( listener, "message_queue" );
-  subscriptions.run ( );
+        FailoverConnection connection;
+        FailoverSession    * session;
+        Recorder recorder;
+
+        connection.name = program_name; 
+
+        connection.open ( host, port );
+        session = connection.newSession();
+        session->name = program_name; 
+
+        FailoverSubscriptionManager subscriptions ( session );
+        subscriptions.name = program_name;
+        Listener listener ( subscriptions, recorder );
+        subscriptions.subscribe ( listener, "message_queue" );
+        subscriptions.run ( );
 
-  connection.close();
+        connection.close();
+        return 0;
 
-  return 1;   
+    } catch(const std::exception& error) {
+        std::cout << error.what() << std::endl;
+    }
+    return 1;
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=703532&r1=703531&r2=703532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Fri Oct 10 10:44:50 2008
@@ -139,7 +139,7 @@
 }
 
 std::vector<Url> Connection::getKnownBrokers() {
-    return isOpen() ? impl->getKnownBrokers() : std::vector<Url>();
+    return impl ? impl->getKnownBrokers() : std::vector<Url>();
 }
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=703532&r1=703531&r2=703532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri Oct 10 10:44:50 2008
@@ -151,11 +151,8 @@
 
 void ConnectionImpl::shutdown() {
     Mutex::ScopedLock l(lock);
-    if (handler.isClosed()) 
-    {
-    std::cerr << "MDEBUG  ConnectionImpl::shutdown -- returning w/o failure callback!\n";
-    return;
-    }
+    if (handler.isClosed()) return;
+
     // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
     // an appropriate close-code. connection-forced is not right.
     if (!handler.isClosing()) 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=703532&r1=703531&r2=703532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Fri Oct 10 10:44:50 2008
@@ -64,7 +64,10 @@
     subscriptions.reset();
 }    
 FailoverListener::~FailoverListener() {
-    stop();
+    try { stop(); }
+    catch (const std::exception& e) { 
+        QPID_LOG(warning, QPID_MSG("Ignoring exception in destructor" << e.what()));
+    }
 }
 
 void FailoverListener::received(Message& msg) {