You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/03/05 14:25:44 UTC

svn commit: r750455 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ft/MasterBroker.java test/java/org/apache/activemq/bugs/AMQ2102Test.java

Author: gtully
Date: Thu Mar  5 13:25:44 2009
New Revision: 750455

URL: http://svn.apache.org/viewvc?rev=750455&view=rev
Log:
fix AMQ-2102, apply patch from ying with test from dan, the test shows stack traces on console re slave out of sync, patch resolved this

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=750455&r1=750454&r2=750455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Thu Mar  5 13:25:44 2009
@@ -58,6 +58,7 @@
     private static final Log LOG = LogFactory.getLog(MasterBroker.class);
     private Transport slave;
     private AtomicBoolean started = new AtomicBoolean(false);
+    private final Object addConsumerLock = new Object();
 
     /**
      * Constructor
@@ -196,9 +197,12 @@
      * @throws Exception
      */
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
-        sendAsyncToSlave(info);
-        Subscription answer = super.addConsumer(context, info);
-        return answer;
+        // as master and slave do independent dispatch, the consumer add order between master and slave
+        // needs to be maintained
+        synchronized (addConsumerLock) {
+    	    sendSyncToSlave(info);
+    	    return super.addConsumer(context, info);
+        }
     }
 
     /**
@@ -313,7 +317,7 @@
         if (messageDispatch.getMessage() != null) {
             Message msg = messageDispatch.getMessage();
             mdn.setMessageId(msg.getMessageId());
-            sendAsyncToSlave(mdn);
+            sendSyncToSlave(mdn);
         }
     }
 

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java?rev=750455&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java Thu Mar  5 13:25:44 2009
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
+    
+    
+    final static int MESSAGE_COUNT = 5120;
+    final static int NUM_CONSUMERS = 20;
+    
+    private static final Log LOG = LogFactory.getLog(AMQ2102Test.class);
+    
+    private final Map<Thread, Throwable> exceptions = new ConcurrentHashMap<Thread, Throwable>();
+    
+    private class Consumer implements Runnable, ExceptionListener {
+        private ActiveMQConnectionFactory connectionFactory;
+        private String name;
+        private String queueName;
+        private boolean running;
+        private org.omg.CORBA.IntHolder startup;
+        private Thread thread;
+
+        Consumer(ActiveMQConnectionFactory connectionFactory, String queueName, org.omg.CORBA.IntHolder startup, int id) {
+            this.connectionFactory = connectionFactory;
+            this.queueName = queueName;
+            this.startup = startup;
+            name = "Consumer-" + queueName + "-" + id;
+            thread = new Thread(this, name);
+        }
+
+        private String getClientId() {
+            try {
+                return InetAddress.getLocalHost().getHostName() + ":" + name;
+            } catch (UnknownHostException e) {
+                return "localhost:" + name;
+            }
+        }
+
+        synchronized boolean isRunning() {
+            return running;
+        }
+
+        void join() {
+            try {
+                thread.join(30000);
+            } catch (InterruptedException e) {
+                error("Interrupted waiting for " + name + " to stop", e);
+            }
+        }
+
+        public void onException(JMSException e) {
+            error("JMS exception: ", e);
+        }
+
+        private void processMessage(Session session, MessageProducer producer, Message message) throws Exception {
+            if (message instanceof TextMessage) {
+                TextMessage textMessage = (TextMessage) message;
+
+                Destination replyQueue = textMessage.getJMSReplyTo();
+                if (replyQueue != null) {
+                    TextMessage reply = session.createTextMessage("reply-" + textMessage.getText());
+                    
+                    reply.setJMSCorrelationID(textMessage.getJMSCorrelationID());
+                    
+                    producer.send(replyQueue, reply);
+                    debug("replied via " + replyQueue + " for message => " + textMessage.getText());
+                } else {
+                    debug("no reply to message => " + textMessage.getText());
+                }
+            } else {
+                error("Consumer cannot process " + message.getClass().getSimpleName());
+            }
+        }
+
+        private void processMessages() throws JMSException {
+            ActiveMQConnection connection = null;
+
+            try {
+                connection = (ActiveMQConnection) connectionFactory.createConnection();
+
+                RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+
+                policy.setMaximumRedeliveries(6);
+                policy.setInitialRedeliveryDelay(1000);
+                policy.setUseCollisionAvoidance(false);
+                policy.setCollisionAvoidancePercent((short) 15);
+                policy.setUseExponentialBackOff(false);
+                policy.setBackOffMultiplier((short) 5);
+
+                connection.setClientID(getClientId());
+                connection.setExceptionListener(this);
+                connection.start();
+
+                processMessages(connection);
+            } finally {
+                connection.close();
+                connection = null;
+            }
+        }
+
+        private void processMessages(Connection connection) throws JMSException {
+            Session session = null;
+            try {
+                session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                processMessages(session);
+            } finally {
+                if (session != null) {
+                    session.close();
+                }
+            }
+        }
+        private void processMessages(Session session) throws JMSException {
+            MessageConsumer consumer = null;
+
+            try {
+                consumer = session.createConsumer(session.createQueue(queueName), null);
+                processMessages(session, consumer);
+            } finally {
+                if (consumer != null) {
+                    consumer.close();
+                }
+            }
+        }
+
+        private void processMessages(Session session, MessageConsumer consumer) throws JMSException {
+            MessageProducer producer = null;
+
+            try {
+                producer = session.createProducer(null);
+                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                processMessages(session, consumer, producer);
+            } finally {
+                if (producer != null) {
+                    producer.close();
+                }
+            }
+        }
+
+        private void processMessages(Session session, MessageConsumer consumer, MessageProducer producer) throws JMSException {
+            debug("waiting for messages...");
+            if (startup != null) {
+                synchronized (startup) {
+                    startup.value--;
+                    startup.notify();
+                }
+                startup = null;
+            }
+            while (isRunning()) {
+                Message message = consumer.receive(5000);
+
+                if (message != null) {
+                    try {
+                        processMessage(session, producer, message);
+                        session.commit();
+                    } catch (Throwable t) {
+                        error("message=" + message + " failure", t);
+                        session.rollback();
+                    }
+                }
+            }
+        }
+
+        public void run() {
+            setRunning(true);
+            
+            while (isRunning()) {
+                try {
+                    processMessages();
+                } catch (Throwable t) {
+                    error("Unexpected consumer problem: ", t);
+                }
+            }
+        }
+        synchronized void setRunning(boolean running) {
+            this.running = running;
+        }
+
+        void start() {
+            thread.start();
+        }
+    }
+    
+    private class Producer {
+        private ActiveMQConnectionFactory connectionFactory;
+        private String queueName;
+        
+        Producer(ActiveMQConnectionFactory connectionFactory, String queueName) {
+            this.connectionFactory = connectionFactory;
+            this.queueName = queueName;
+        }
+        
+        void execute(String[] args) {
+            try {
+                sendMessages();
+            } catch (Exception e) {
+                error("Producer failed", e);
+            }
+        }
+
+        private void sendMessages() throws JMSException {
+            ActiveMQConnection connection = null;
+
+            try {
+                connection = (ActiveMQConnection) connectionFactory.createConnection();
+                connection.start();
+
+                sendMessages(connection);
+            } finally {
+                if (connection != null) {
+                    try {
+                        connection.close();
+                    } catch (JMSException e) {
+                        error("Problem closing connection", e);
+                    }
+                }
+            }
+        }
+
+        private void sendMessages(ActiveMQConnection connection) throws JMSException {
+            Session session = null;
+
+            try {
+                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+                
+                sendMessages(session);
+            } catch (JMSException e) {
+                e.printStackTrace();
+                exceptions.put(Thread.currentThread(), e);
+                if (session != null) {
+                    session.rollback();
+                }
+            } finally {
+                if (session != null) {
+                    session.close();
+                }
+            }
+        }
+
+        private void sendMessages(Session session) throws JMSException {
+            TemporaryQueue replyQueue = null;
+            
+            try {
+                replyQueue = session.createTemporaryQueue();
+                
+                sendMessages(session, replyQueue);
+            } finally {
+                if (replyQueue != null) {
+                    replyQueue.delete();
+                }
+            }
+        }
+
+        private void sendMessages(Session session, Destination replyQueue) throws JMSException {
+            MessageConsumer consumer = null;
+            
+            try {
+                consumer = session.createConsumer(replyQueue);
+                sendMessages(session, replyQueue, consumer);
+            } finally {
+                consumer.close();
+            }
+        }
+
+        private void sendMessages(Session session, Destination replyQueue, int messageCount) throws JMSException {
+            MessageProducer producer = null;
+
+            try {
+                producer = session.createProducer(session.createQueue(queueName));
+                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+                producer.setTimeToLive(0);
+                producer.setPriority(Message.DEFAULT_PRIORITY);
+
+                for (int i = 0; i < messageCount; i++) {
+                    TextMessage message = session.createTextMessage("message#" + i);
+                    message.setJMSReplyTo(replyQueue);
+                    producer.send(message);
+                }
+            } finally {
+                if (producer != null) {
+                    producer.close();
+                }
+            }
+        }
+
+        private void sendMessages(Session session, Destination replyQueue, MessageConsumer consumer) throws JMSException {
+            final org.omg.CORBA.IntHolder messageCount = new org.omg.CORBA.IntHolder(MESSAGE_COUNT);
+
+            consumer.setMessageListener(new MessageListener() {
+                public void onMessage(Message reply) {
+                    if (reply instanceof TextMessage) {
+                        TextMessage textReply = (TextMessage) reply;
+                        synchronized (messageCount) {
+                            try {
+                                debug("receive reply#" + messageCount.value + " " + textReply.getText());
+                            } catch (JMSException e) {
+                                error("Problem processing reply", e);
+                            }
+                            messageCount.value--;
+                            messageCount.notify();
+                        }
+                    } else {
+                        error("Producer cannot process " + reply.getClass().getSimpleName());
+                    }
+                }});
+
+            sendMessages(session, replyQueue, messageCount.value);
+
+            session.commit();
+            
+            synchronized (messageCount) {
+                while (messageCount.value > 0) {
+                    
+                    if (messageCount.value % 100 == 0) {
+                        // ack a bunch of replys
+                        debug("acking via session commit: messageCount=" + messageCount.value);
+                        session.commit();
+                    }
+                    try {
+                        messageCount.wait();
+                    } catch (InterruptedException e) {
+                        error("Interrupted waiting for replies", e);
+                    }
+                }
+            }
+            // outstanding replys
+            session.commit();
+            debug("All replies received...");
+        }
+    }
+
+    private static void debug(String message) {
+        LOG.debug(message);
+    }
+
+    private static void error(String message) {
+        LOG.error(message);
+    }
+
+    private static void error(String message, Throwable t) {
+        t.printStackTrace();
+        String msg = message + ": " + (t.getMessage() != null ? t.getMessage() : t.toString());
+        LOG.error(msg, t);
+        fail(msg);
+    }
+
+    private ArrayList<Consumer> createConsumers(ActiveMQConnectionFactory connectionFactory, String queueName, int max) {
+        ArrayList<Consumer> consumers = new ArrayList<Consumer>(max);
+        org.omg.CORBA.IntHolder startup = new org.omg.CORBA.IntHolder(max);
+
+        for (int id = 0; id < max; id++) {
+            consumers.add(new Consumer(connectionFactory, queueName, startup, id));
+        }
+        for (Consumer consumer : consumers) {
+            consumer.start();
+        }
+        synchronized (startup) {
+            while (startup.value > 0) {
+                try {
+                    startup.wait();
+                } catch (InterruptedException e) {
+                    error("Interrupted waiting for consumers to start", e);
+                }
+            }
+        }
+        return consumers;
+    }
+
+    final BrokerService master = new BrokerService();
+    BrokerService slave = new BrokerService();
+    String masterUrl;
+
+    public void setUp() throws Exception {
+        
+        master.setBrokerName("Master");
+        master.addConnector("tcp://localhost:0");
+        master.deleteAllMessages();
+        master.setWaitForSlave(true);
+        
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    master.start();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    exceptions.put(Thread.currentThread(), e);
+                }
+            }
+        };
+        t.start();
+        Thread.sleep(2000);
+        masterUrl = master.getTransportConnectors().get(0).getConnectUri().toString(); 
+        
+        debug("masterUrl: " + masterUrl);
+        slave.setBrokerName("Slave");
+        slave.deleteAllMessages();
+        slave.addConnector("tcp://localhost:0");
+        slave.setMasterConnectorURI(masterUrl);
+        slave.start();
+    }
+    
+    public void tearDown() throws Exception {
+        master.stop();
+        slave.stop();
+    }
+    
+    public void testMasterSlaveBug() throws Exception {
+        
+        Thread.setDefaultUncaughtExceptionHandler(this);
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + 
+                masterUrl + ")?randomize=false");
+        String queueName = "MasterSlaveBug";
+        ArrayList<Consumer> consumers = createConsumers(connectionFactory, queueName, NUM_CONSUMERS);
+        
+        Producer producer = new Producer(connectionFactory, queueName);
+        producer.execute(new String[]{});
+
+        for (Consumer consumer : consumers) {
+            consumer.setRunning(false);
+        }
+        
+        for (Consumer consumer : consumers) {
+            consumer.join();
+        }
+        assertTrue(exceptions.isEmpty());
+    }
+
+    public void uncaughtException(Thread t, Throwable e) {
+        error("" + t + e);
+        exceptions.put(t,e);
+        
+        
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date