You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/12/23 18:02:35 UTC

svn commit: r729038 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/RegionBroker.java test/java/org/apache/activemq/bugs/AMQ2021Test.java

Author: gtully
Date: Tue Dec 23 09:02:35 2008
New Revision: 729038

URL: http://svn.apache.org/viewvc?rev=729038&view=rev
Log:
fix AMQ-2021 with test, seems to be the same issue in AMQ-2035

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=729038&r1=729037&r2=729038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Dec 23 09:02:35 2008
@@ -677,18 +677,21 @@
 					        .getRegionDestination().getDeadLetterStrategy();
 					if(deadLetterStrategy!=null){
 						if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
+						    if (node.getRegionDestination().getActiveMQDestination().isTopic()) {
+						        // message may be inflight to other subscriptions so do not modify
+						        message = message.copy();
+						    }
 							long expiration=message.getExpiration();
 							message.setExpiration(0);
 							message.setProperty("originalExpiration",new Long(
 							        expiration));
 							if(!message.isPersistent()){
-								message.setPersistent(true);
-								message.setProperty("originalDeliveryMode",
+							    message.setPersistent(true);
+							    message.setProperty("originalDeliveryMode",
 								        "NON_PERSISTENT");
 							}
 							// The original destination and transaction id do
-							// not get filled when the message is first
-							// sent,
+							// not get filled when the message is first sent,
 							// it is only populated if the message is routed to
 							// another destination like the DLQ
 							ActiveMQDestination deadLetterDestination=deadLetterStrategy

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java?rev=729038&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java Tue Dec 23 09:02:35 2008
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This is a test case for the issue reported at:
+ * https://issues.apache.org/activemq/browse/AMQ-2021 
+ * Bug is modification of inflight message properties so the failure can manifest itself in a bunch
+ * or ways, from message receipt with null properties to marshall errors
+ */
+public class AMQ2021Test extends TestCase implements ExceptionListener, UncaughtExceptionHandler {
+
+    private static final Log log = LogFactory.getLog(AMQ2021Test.class);
+    BrokerService brokerService;
+    ArrayList<Thread> threads = new ArrayList<Thread>();
+    Vector<Throwable> exceptions;
+    
+    AMQ2021Test testCase;
+    
+    String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616";    
+    String ACTIVEMQ_BROKER_URL = ACTIVEMQ_BROKER_BIND + "?jms.redeliveryPolicy.maximumRedeliveries=1";
+    
+    private int numMessages = 1000;
+    private int numConsumers = 2;
+    private int dlqMessages = numMessages/2;
+    
+    CountDownLatch receivedLatch;
+    private ActiveMQTopic destination;
+    public CountDownLatch started;
+
+    @Override
+    protected void setUp() throws Exception {
+        Thread.setDefaultUncaughtExceptionHandler(this);
+        testCase = this;
+        
+        // Start an embedded broker up.
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
+        brokerService.start();
+        destination = new ActiveMQTopic(getName());
+        exceptions = new Vector<Throwable>();
+        
+        receivedLatch = 
+            new CountDownLatch(numConsumers * (numMessages + dlqMessages));
+        started = new CountDownLatch(1);
+    }
+    
+    @Override
+    protected void tearDown() throws Exception {
+        for (Thread t : threads) {
+            t.interrupt();
+            t.join();
+        }        
+        brokerService.stop();
+    }
+
+    public void testConcurrentTopicResendToDLQ() throws Exception {
+
+        for (int i=0; i<numConsumers;i++) {
+            ConsumerThread c1 = new ConsumerThread("Consumer-" + i);
+            threads.add(c1);
+            c1.start();
+        }
+        
+        assertTrue(started.await(5, TimeUnit.SECONDS));
+        
+        Thread producer = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    produce(numMessages);
+                } catch (Exception e) {
+                }
+            }
+        };
+        threads.add(producer);
+        producer.start();
+        
+        boolean allGood = receivedLatch.await(30, TimeUnit.SECONDS);
+        for (Throwable t: exceptions) {
+            log.error("failing test with first exception", t);
+            fail("exception during test : " + t);
+        }        
+        assertTrue("excepted messages received within time limit", allGood);
+        
+        assertEquals(0, exceptions.size());
+    
+        for (int i=0; i<numConsumers; i++) {
+            // last recovery sends message to deq so is not received again
+            assertEquals(dlqMessages*2, ((ConsumerThread)threads.get(i)).recoveries);
+            assertEquals(numMessages + dlqMessages, ((ConsumerThread)threads.get(i)).counter);
+        }
+       
+        // half of the messages for each consumer should go to the dlq but duplicates will
+        // be suppressed
+        consumeFromDLQ(dlqMessages);
+        
+    }    
+    
+    private void consumeFromDLQ( int messageCount) throws Exception {
+        ActiveMQConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URL);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+                    
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+        int count = 0;
+        for (int i=0; i< messageCount; i++) {
+            if (dlqConsumer.receive(1000) == null) {
+                break;
+            }
+            count++;
+        }
+        assertEquals(messageCount, count);
+    }
+
+    public void produce(int count) throws Exception {
+        Connection connection=null;
+        try {
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND);
+            connection = factory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            producer.setTimeToLive(0);
+            connection.start();
+            
+            for (int i=0 ; i< count; i++) {
+                int id = i+1;
+                TextMessage message = session.createTextMessage(getName()+" Message "+ id);
+                message.setIntProperty("MsgNumber", id);
+                producer.send(message);
+                
+                if (id % 500 == 0) {
+                    log.info("sent " + id + ", ith " + message);
+                }
+            }
+        } catch (JMSException e) {
+            log.error("unexpected ex on produce", e);
+            exceptions.add(e);
+        } finally {
+            try {
+                if (connection != null) {
+                    connection.close();
+                }
+            } catch (Throwable e) {
+            }
+        }
+    }
+    
+    public class ConsumerThread extends Thread implements MessageListener   {
+        public long counter = 0;
+        public long recoveries = 0;
+        private Session session;
+
+        public ConsumerThread(String threadId) {
+            super(threadId);
+        }
+
+        public void run() {
+            try {
+                ActiveMQConnectionFactory connectionFactory = 
+                    new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URL);
+                Connection connection = connectionFactory.createConnection();
+                connection.setExceptionListener(testCase);
+                connection.setClientID(getName());            
+                session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                MessageConsumer consumer = session.createDurableSubscriber(destination, getName());                            
+                consumer.setMessageListener(this);
+                connection.start();
+                
+                started .countDown();
+            
+            } catch (JMSException exception) {
+                log.error("unexpected ex in consumer run", exception);
+                exceptions.add(exception);
+            }
+        }
+        
+        public void onMessage(Message message) {
+            try {
+                counter++;
+                int messageNumber=message.getIntProperty("MsgNumber");
+                if(messageNumber%2==0){
+                    session.recover();
+                    recoveries++;
+                } else {
+                    message.acknowledge();
+                }     
+                
+                if (counter % 200 == 0) {
+                    log.info("recoveries:" + recoveries + ", Received " + counter + ", counter'th " + message);
+                }
+                receivedLatch.countDown();
+            }catch (Exception e) {
+                log.error("unexpected ex on onMessage", e);
+                exceptions.add(e);
+            }
+        }
+        
+    }
+
+    public void onException(JMSException exception) {
+        log.info("Unexpected JMSException", exception);
+        exceptions.add(exception);
+    }
+
+    public void uncaughtException(Thread thread, Throwable exception) {
+        log.info("Unexpected exception from thread " + thread + ", ex: " + exception);
+        exceptions.add(exception);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date