You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Gary Tully (JIRA)" <ji...@apache.org> on 2008/09/08 16:55:55 UTC

[jira] Updated: (AMQ-1585) Problems with pure master/slave configuration

     [ https://issues.apache.org/activemq/browse/AMQ-1585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gary Tully updated AMQ-1585:
----------------------------

    Fix Version/s: 5.3.0
                       (was: 5.2.0)

> Problems with pure master/slave configuration
> ---------------------------------------------
>
>                 Key: AMQ-1585
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1585
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 4.1.1, 5.0.0, 5.1.0
>         Environment: Ubuntu 6.04, JDK 1.5.0_011, Spring 2.0.x
>            Reporter: Thomas Buckel
>            Assignee: Rob Davies
>             Fix For: 5.3.0
>
>         Attachments: AMQ-1585.patch, AMQ-1585.transacted.patch
>
>
> As posted in the AMQ user forum:
> http://www.nabble.com/Problems-with-Pure-Master-Slave-in-AMQ-5.0.0-to15471491s2354.html#a15474769
> -------------------
> Hi all,
> I am having trouble setting up a *stable* ActiveMQ Pure Master/Slave topology.
> Initially I have tried v4.1.1 which failed with an exception. I found an AMQ JIRA ticket which said that Pure/Master slave didn't work in v4.1.1.
> Ok, so I switched to AMQ 5.0.0, created 2 configs (master/slave, see end of message) and ran two AMQ instances (on the same box) and most of the times my test (see below) worked, but more often I get various error messages like:
> - On the slave:
> ERROR Service                        - Async error occurred: javax.jms.JMSException: Slave broker out of sync with master: Dispatched message (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1) was not in the pending list
> javax.jms.JMSException: Slave broker out of sync with master: Dispatched message (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1) was not in the pending list
>         at org.apache.activemq.broker.region.PrefetchSubscription.processMessageDispatchNotification(PrefetchSubscription.java:160)
>         at org.apache.activemq.broker.region.AbstractRegion.processDispatchNotification(AbstractRegion.java:381)
>         at org.apache.activemq.broker.region.RegionBroker.processDispatchNotification(RegionBroker.java:550)
>         at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
>         at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
>         at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
>         at org.apache.activemq.broker.MutableBrokerFilter.processDispatchNotification(MutableBrokerFilter.java:211)
>         at org.apache.activemq.broker.TransportConnection.processMessageDispatchNotification(TransportConnection.java:450)
>         at org.apache.activemq.command.MessageDispatchNotification.visit(MessageDispatchNotification.java:77)
>         at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281)
>         at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178)
>         at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
>         at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
>         at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:202)
>         at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
>         at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
> - After having killed the master, stopped the slave, copied the slave's data into the master's data directory various error message came up (as described in the Master/Slave recovery section), e.g. (internal) ActiveMQ topics were not available, the admin webApp showed exceptions and errors on the client.
> The test I've created uses Spring 2.0.x and pumps 1000 MapMessages in a queue through Spring's JmsTempate, each message is created within its own transaction, using JmsTransactionManager and TransactionTemplate.
> The created messages are consumed by an initially instantiated transactional DefaultMessageListenerContainer. The AMQ JARs in the test's classpath are activemq-core-5.0.0.jar, geronimo-jms_1.1_spec-1.0.jar, geronimo-jta_1.0.1B_spec-1.0.jar as I've noticed a really bad performance when only using the activemq-all-5.0.0.jar (maybe this is the problem?).
> The test code work's without problems with OpenMQ, but I'd prefer using the nice Pure Master/Active ActiveMQ if I can get it running in a *stable* config ;)
> I would highly appreciate any help or suggestions. Maybe my config is wrong or I miss something essential. I've also tried a recent AMQ 5.1 SNAPSHOT which wasn't better...
> See below for the small program i used to test (no unit test, behaviour appeared to be non deterministic to me and it's not so nice as i've changed it quite often)
> Thanks in advance,
> Thomas
> <!-- MASTER config -->
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.org/config/1.0"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd
>   http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
>   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>   <broker xmlns="http://activemq.org/config/1.0" brokerName="master" dataDirectory="${activemq.base}/data">
>     <destinationPolicy>
>       <policyMap>
>         <policyEntries>
>           <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="1mb">
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>         </policyEntries>
>       </policyMap> 
>     </destinationPolicy>
>     <transportConnectors>
>        <transportConnector name="openwire" uri="tcp://tbuckel-desktop:7778" />
>     </transportConnectors>
>     <networkConnectors/>
>     <managementContext>
>        <managementContext connectorPort="1100" jmxDomainName="org.apache.activemq"/>
>     </managementContext>
>   </broker>
>   <commandAgent xmlns="http://activemq.org/config/1.0"/>
>   <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
>     <connectors>
>       <nioConnector port="8161" />
>     </connectors>
>     <handlers>
>       <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" />
>     </handlers>
>   </jetty>
> </beans>
> <!-- SLAVE config -->
> <beans
>   xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.org/config/1.0"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd
>   http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
>   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>   
>   <broker xmlns="http://activemq.org/config/1.0" brokerName="slave" dataDirectory="${activemq.base}/data-slave"
>           masterConnectorURI="tcp://tbuckel-desktop:7778">
>   
>     <destinationPolicy>
>       <policyMap>
>         <policyEntries>
>           <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="1mb">
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>         </policyEntries>
>       </policyMap>
>     </destinationPolicy>
>     <transportConnectors>
>        <transportConnector name="openwire" uri="tcp://localhost:7779"/>
>     </transportConnectors>
>     <networkConnectors/>
>     <managementContext>
>        <managementContext connectorPort="1101" jmxDomainName="org.apache.activemq"/>
>     </managementContext>
>   </broker>
>   <commandAgent xmlns="http://activemq.org/config/1.0"/>
>   <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
>     <connectors>
>       <nioConnector port="8162" />
>     </connectors>
>     <handlers>
>       <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" />
>     </handlers>
>   </jetty>
> </beans>
> ------------
> Test code:
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.springframework.jms.connection.JmsTransactionManager;
> import org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.MessageCreator;
> import org.springframework.jms.listener.DefaultMessageListenerContainer;
> import org.springframework.transaction.TransactionStatus;
> import org.springframework.transaction.support.TransactionCallbackWithoutResult;
> import org.springframework.transaction.support.TransactionTemplate;
> import javax.jms.*;
> import java.math.BigInteger;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.concurrent.TimeUnit;
> public class AnotherFailoverTest {
>     public static final int MESSAGES = 1000;
>     private final static List<BigInteger> notConsumedMessages = new ArrayList<BigInteger>(MESSAGES);
>     private static ConnectionFactory createCF() throws Exception {
>         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
>         cf.setBrokerURL("failover://(tcp://localhost:7778,tcp://localhost:7779)?randomize=false");
>         return new TransactionAwareConnectionFactoryProxy(cf);
>     }
>     private static void send() throws Exception {
>         JmsTransactionManager transactionManager = new JmsTransactionManager();
>         transactionManager.setConnectionFactory(createCF());
>         transactionManager.afterPropertiesSet();
>         int i=0;
>         do {
>             i++;
>             final int number = i;
>             try {
>                 final BigInteger v = new BigInteger(Integer.toString(number));
>                 TransactionTemplate tt = new TransactionTemplate(transactionManager);
>                 tt.execute(new TransactionCallbackWithoutResult() {
>                     protected void doInTransactionWithoutResult(TransactionStatus status) {
>                         final JmsTemplate template = new JmsTemplate(pcf);
>                         template.setSessionTransacted(true);
>                         template.afterPropertiesSet();
>                         template.send("testqueue", new MessageCreator() {
>                             public Message createMessage(Session session) throws JMSException {
>                                 ObjectMessage dummyMessage = session.createObjectMessage();
>                                 dummyMessage.setObject(v);
>                                 synchronized (notConsumedMessages) {
>                                     notConsumedMessages.add(v);
>                                 }
> //                                System.out.println("Created message " + number + "(" + notConsumedMessages.size() + ")");
>                                 return dummyMessage;
>                             }
>                         });
>                     }
>                 });
>             } catch (Exception e) {
>                 e.printStackTrace();
>                 System.out.println("Error creating message " + number);
>             }
>         } while (i < MESSAGES);
>     }
>     private static void setupReceiver() throws Exception {
>         JmsTransactionManager transactionManager = new JmsTransactionManager();
>         transactionManager.setConnectionFactory(createCF());
>         transactionManager.afterPropertiesSet();
>         final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
>         container.setConnectionFactory(pcf);
>         container.setTransactionManager(transactionManager);
>         container.setMessageListener(new MessageListener() {
>             public void onMessage(Message message) {
>                 try {
>                     ObjectMessage msg = (ObjectMessage) message;
>                     BigInteger number = (BigInteger) msg.getObject();
>                     synchronized (notConsumedMessages) {
>                         if (!notConsumedMessages.remove(number)) {
>                            System.err.println("Message " + number + " not found in list!");
>                         } else {
>    //                        System.out.println("Consumed message " + number);
>                        }
>                    }
>                 } catch (JMSException e) {
> //                    e.printStackTrace();
>                     System.out.println("Error consuming message!");
>                 }
>             }
>         });
>         container.setSessionTransacted(true);
>         container.setDestinationName("testqueue");
>         container.setExceptionListener(new ExceptionListener() {
>             public void onException(JMSException jmsException) {
>                 System.err.println(jmsException);
>             }
>         });
>         container.afterPropertiesSet();
>         container.initialize();
>         TimeUnit.SECONDS.sleep(1);
>     }
>     public static void main(String[] args) throws Exception {
>         long start = System.currentTimeMillis();
>         setupReceiver();
>         send();
>         int remainingSize = 0;
>         do {
>             Thread.sleep(500);
>             synchronized (notConsumedMessages) {
>                 remainingSize = notConsumedMessages.size();
>             }
>             System.out.println("Unconsumed " + remainingSize + ": " + sb);
>         } while (remainingSize > 0);
>         System.out.println("All messages consumed.");
>         long end = System.currentTimeMillis();
>         System.out.println((end-start));
>         System.exit(0);
>     }
> }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.