You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Javier Leyba <xl...@gmail.com> on 2006/05/02 13:09:28 UTC

Master/slave problem

Hi

I've configured two servers. A master:

--------------------

<beans xmlns="http://activemq.org/config/1.0">

  <broker useJmx="true">

        <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
        <managementContext>
           <managementContext connectorPort="2199"
jmxDomainName="org.apache.activemq"/>
        </managementContext>


    <persistenceAdapter>
        <!--
      <journaledJDBC journalLogFiles="5"
dataDirectory="/home/jcm/jl/activemq-data"/>
        -->
        <journaledJDBC journalLogFiles="5"
dataDirectory="/home/jcm/jl/activemq-data" dataSource="#mysql-ds"/>
    </persistenceAdapter>

    <transportConnectors>
       <transportConnector name="default" uri="tcp://172.31.112.9:62002" />
    </transportConnectors>

        <networkConnectors>
                <networkConnector name="default"
uri="static://(tcp://172.31.112.9:62003,tcp://172.30.27.1:62002)"
failover="true"/>
    </networkConnectors>

  </broker>

        <bean id="mysql-ds"
class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
            <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
            <property name="url"
value="jdbc:mysql://172.31.112.16/activeMQ?relaxAutoCommit=true"/>
            <property name="username" value="activeMQ"/>
            <property name="password" value="activeMQ"/>
            <property name="poolPreparedStatements" value="true"/>
        </bean>

</beans>

---------------------

and a slave

---------------

<beans xmlns="http://activemq.org/config/1.0">

  <broker useJmx="true" masterConnectorURI="tcp://172.31.112.9:62002"
shutdownOnMasterFailure="false">

        <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
        <managementContext>
           <managementContext connectorPort="2199"
jmxDomainName="org.apache.activemq"/>
        </managementContext>


    <persistenceAdapter>
        <!--
      <journaledJDBC journalLogFiles="5"
dataDirectory="/home/jcm/jl/activemq-data"/>
        -->
        <journaledJDBC journalLogFiles="5"
dataDirectory="/home/arqweb/jl/activemq-data" dataSource="#mysql-ds"
/>
    </persistenceAdapter>

    <transportConnectors>
       <transportConnector name="default" uri="tcp://172.30.27.1:62002" />
    </transportConnectors>

    <networkConnectors>
        <networkConnector name="default"
uri="static://(tcp://172.30.27.1:62003,tcp://172.31.112.9:62002)" fail
over="true"/>
    </networkConnectors>


  </broker>

        <bean id="mysql-ds"
class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
            <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
            <property name="url"
value="jdbc:mysql://172.31.112.16/activeMQ1?relaxAutoCommit=true"/>
            <property name="username" value="activeMQ1"/>
            <property name="password" value="activeMQ1"/>
            <property name="poolPreparedStatements" value="true"/>
        </bean>

</beans>

-----------


Also, I've a little application that mwill subscribe to a topic:

----------------
public SimpleConsumer() {
        Connection connection = null;
        InputStreamReader inputStreamReader = null;
        char answer = '\0';
        String user = "javier";
        String[] topics = {"nuevo1"};
        TextListener listener = null;

        try {
            // Create a ConnectionFactory
            ActiveMQConnectionFactory connectionFactory =
                    new
ActiveMQConnectionFactory("failover://(tcp://172.31.112.9:62002,tcp://172.30.27.1:62002)?randomize=false&connectionTimeout=20000&soTimeout=10000&wireFormat.maxInactivityDuration=20000");
            connectionFactory.setClientID("javier");


            connection = connectionFactory.createConnection();
            listener = new TextListener();
            connection.setExceptionListener(listener);
            connection.start();
            Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

            MessageConsumer mc = null;

            for (int x = 0; x < topics.length; x++) {
                Destination destination = session.createTopic(topics[x]);
                System.out.println("Created destination " + topics[x]);
                mc = session.createDurableSubscriber((Topic)
destination, (user + "_" + x));
                listener = new TextListener();
                mc.setMessageListener(listener);
                System.out.println("Listening...");
            }



            inputStreamReader = new InputStreamReader(System.in);
            while (!((answer == 'q') || (answer == 'Q'))) {
                try {
                    answer = (char) inputStreamReader.read();
                } catch (IOException e) {
                    System.out.println("I/O exception: "
                            + e.toString());
                }
            }


            System.out.println("End of listener setting");

        } catch (JMSException e) {
            System.out.println("Exception occurred: " +
                    e.toString());
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                }
            }
        }
    }


    public static void main(String[] args) {
        new SimpleConsumer();
    }

    class TextListener implements MessageListener, ExceptionListener {
        private int x = 0;

        /**
         * Casts the message to a TextMessage and displays its text.
         *
         * @param message the incoming message
         */
        public void onMessage(Message message) {
            TextMessage msg = null;

            System.out.println("Ha llegado un nuevo mensaje !!!");

            try {
                if (message instanceof TextMessage) {
                    msg = (TextMessage) message;
                    System.out.println("Reading message: " +
                            msg.getText() + " - " + x++);
                } else {
                    System.out.println("Message of wrong type: " +
                            message.getClass().getName());
                }
            } catch (JMSException e) {
                System.out.println("JMSException in onMessage(): " +
                        e.toString());
            } catch (Throwable t) {
                System.out.println("Exception in onMessage():" +
                        t.getMessage());
            }
        }

------------------------------


I've started master server, then started slave and then started my
subscriber client.

I've sent a message and my subscriber received it ok.

Then, I stopped master (while my subscriber is still running). Slave
takes control but show me some error messages like this:

2006-05-02 12:54:25,382 :INFO 
:[tcp:///172.31.112.96:2116]:AbstractConnection  : Sync error
occurred: javax.jm
s.InvalidClientIDException: Broker: localhost - Client: javier already connected
javax.jms.InvalidClientIDException: Broker: localhost - Client: javier
already connected
    at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:154)
    at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:65)
    at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:67)



or


2006-05-02 12:54:25,409 :ERROR
:[tcp:///172.31.112.96:2116]:ManagedRegionBroker : Failed to register
subscripti
on TopicSubscription: consumer=ID:nt93233-2110-1146567148853-1:0:-1:1,
destinations=0, dispatched=0, delivered=
0, matched=0, discarded=0
javax.management.InstanceAlreadyExistsException:
org.apache.activemq:BrokerName=localhost,Type=Subscription,act
ive=true,name=ID_nt93233-2110-1146567148853-1_0_-1_1 already registered.
    at org.jboss.mx.server.registry.BasicMBeanRegistry.add(BasicMBeanRegistry.java:742)
    at org.jboss.mx.server.registry.BasicMBeanRegistry.registerMBean(BasicMBeanRegistry.java:210)


I sent a new message but my subscriber didn't noticed !!!

What is wrong with this scenario ? Why the errors received ?

I've turned on DEBUG and attached log file if it could be usefull to
detect errors.

Thanks in advance

J