You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Daniel (JIRA)" <ji...@apache.org> on 2009/06/11 09:27:35 UTC

[jira] Updated: (AMQCPP-246) Failover transport doesn't detect network failures

     [ https://issues.apache.org/activemq/browse/AMQCPP-246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Daniel updated AMQCPP-246:
--------------------------

    Description: 
I tested the CMS SimpleAsyncConsumer example to check the failover transport funcionality and it doesn't detect the network failueres, and after network restauration the consumer never receives any message.

These are the steps:
1. The network has a failure. 
2. The Producer sends a message. 
3.  After a minute the network is up. (The consumer doesn't detect the failure)
4. The consumer doesn't receive the message.
5. The producer sends other message.
6. The consumer never receives old and new messages


  was:
I tested the CMS SimpleAsyncConsumer example to check the failover transport funcionality and it doesn't detect the network failueres, and after network restauration the consumer never receives any message.

This is the SimpleAsyncConsumer.cpp code (The same as the sample included in the las CMS 3.1 RC version, but modifying the brokerURI variable ):

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/transport/failover/FailoverTransport.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <iostream>

using namespace activemq;
using namespace activemq::core;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace activemq::transport::failover;
using namespace cms;
using namespace std;

////////////////////////////////////////////////////////////////////////////////
class SimpleAsyncConsumer : public ExceptionListener,
                            public MessageListener {
private:

	Connection* connection;
	    Session* session;
	    Destination* destination;
	    MessageConsumer* consumer;
	    bool useTopic;
	    bool clientAck;
	    std::string brokerURI;
	    std::string destURI;

public:

    SimpleAsyncConsumer( const std::string& brokerURI,
                         const std::string& destURI,
                         bool useTopic = false,
                         bool clientAck = false ) {
        connection = NULL;
        session = NULL;
        destination = NULL;
        consumer = NULL;
        this->useTopic = useTopic;
        this->brokerURI = brokerURI;
        this->destURI = destURI;
        this->clientAck = clientAck;
    }

    virtual ~SimpleAsyncConsumer(){
        cleanup();
    }

    void runConsumer() {

        try {

            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory =
                new ActiveMQConnectionFactory( brokerURI );

            // Create a Connection
            connection = connectionFactory->createConnection();
            delete connectionFactory;
            connection->start();

            connection->setExceptionListener(this);

            // Create a Session
            if( clientAck ) {
                session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            }

            // Create the destination (Topic or Queue)
            if( useTopic ) {
                destination = session->createTopic( destURI );
            } else {
                destination = session->createQueue( destURI );
            }

            // Create a MessageConsumer from the Session to the Topic or Queue
            consumer = session->createConsumer( destination );
            consumer->setMessageListener( this );

        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    
    // Called from the consumer since this class is a registered MessageListener.
    virtual void onMessage( const Message* message ){

        static int count = 0;

        try
        {
            count++;
            const TextMessage* textMessage =
                dynamic_cast< const TextMessage* >( message );
            string text = "";

            if( textMessage != NULL ) {
                text = textMessage->getText();
            } else {
                text = "NOT A TEXTMESSAGE!";
            }

            if( clientAck ) {
                message->acknowledge();
            }

            printf( "Message #%d Received: %s\n", count, text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    // If something bad happens you see it here as this class is also been
    // registered as an ExceptionListener with the connection.
    virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
        printf("CMS Exception occurred.  Shutting down client.\n");
        exit(1);
    }

private:

    void cleanup(){

        //*************************************************
        // Always close destination, consumers and producers before
        // you destroy their sessions and connection.
        //*************************************************

        // Destroy resources.
        try{
            if( destination != NULL ) delete destination;
        }catch (CMSException& e) { e.printStackTrace(); }
        destination = NULL;

        try{
            if( consumer != NULL ) delete consumer;
        }catch (CMSException& e) { e.printStackTrace(); }
        consumer = NULL;

        // Close open resources.
        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch (CMSException& e) { e.printStackTrace(); }

        // Now Destroy them
        try{
            if( session != NULL ) delete session;
        }catch (CMSException& e) { e.printStackTrace(); }
        session = NULL;

        try{
            if( connection != NULL ) delete connection;
        }catch (CMSException& e) { e.printStackTrace(); }
        connection = NULL;
    }
};

////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {

    activemq::library::ActiveMQCPP::initializeLibrary();

    std::cout << "=====================================================\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------\n";

    // Set the URI to point to the IPAddress of your broker.
    // add any optional params to the url to enable things like
    // tightMarshalling or tcp logging etc.  See the CMS web site for
    // a full list of configuration options.
    //
    //  http://activemq.apache.org/cms/
    //
    // Wire Format Options:
    // =====================
    // Use either stomp or openwire, the default ports are different for each
    //
    // Examples:
    //    tcp://127.0.0.1:61616                      default to openwire
    //    tcp://127.0.0.1:61616?wireFormat=openwire  same as above
    //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead
    //
    std::string brokerURI = "failover:(tcp://192.168.240.211:61616)";
       
    //============================================================
    // This is the Destination Name and URI options.  Use this to
    // customize where the consumer listens, to have the consumer
    // use a topic or queue set the 'useTopics' flag.
    //============================================================
    std::string destURI = "Monitor"; //?consumer.prefetchSize=1";

    //============================================================
    // set to true to use topics instead of queues
    // Note in the code above that this causes createTopic or
    // createQueue to be used in the consumer.
    //============================================================
    bool useTopics = false;

    //============================================================
    // set to true if you want the consumer to use client ack mode
    // instead of the default auto ack mode.
    //============================================================
    bool clientAck = false;

    // Create the consumer
    SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );

    // Start it up and it will listen forever.
    consumer.runConsumer();

    // Wait to exit.
    std::cout << "Press 'q' to quit" << std::endl;
    while( std::cin.get() != 'q') {
    }

    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================\n";

    activemq::library::ActiveMQCPP::shutdownLibrary();
}






> Failover transport doesn't detect network failures
> --------------------------------------------------
>
>                 Key: AMQCPP-246
>                 URL: https://issues.apache.org/activemq/browse/AMQCPP-246
>             Project: ActiveMQ C++ Client
>          Issue Type: Bug
>          Components: CMS Impl
>    Affects Versions: 3.1
>         Environment: CMS Consumer running under  Linux (Fedora Core 10)
> Broker and JMS producer running under Linux (Redhat 7)
>            Reporter: Daniel
>            Assignee: Timothy Bish
>
> I tested the CMS SimpleAsyncConsumer example to check the failover transport funcionality and it doesn't detect the network failueres, and after network restauration the consumer never receives any message.
> These are the steps:
> 1. The network has a failure. 
> 2. The Producer sends a message. 
> 3.  After a minute the network is up. (The consumer doesn't detect the failure)
> 4. The consumer doesn't receive the message.
> 5. The producer sends other message.
> 6. The consumer never receives old and new messages

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.