You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/03/03 11:31:19 UTC

svn commit: r918386 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Wed Mar  3 10:31:19 2010
New Revision: 918386

URL: http://svn.apache.org/viewvc?rev=918386&view=rev
Log:
merge -c 918384 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2635

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=918386&r1=918385&r2=918386&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Wed Mar  3 10:31:19 2010
@@ -115,6 +115,7 @@
     private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
     private boolean useDedicatedTaskRunner;
     private long consumerFailoverRedeliveryWaitPeriod = 0;
+    private ClientInternalExceptionListener clientInternalExceptionListener;
 
     // /////////////////////////////////////////////
     //
@@ -323,6 +324,9 @@
         if (exceptionListener != null) {
         	connection.setExceptionListener(exceptionListener);
         }
+        if (clientInternalExceptionListener != null) {
+            connection.setClientInternalExceptionListener(clientInternalExceptionListener);
+        }
     }
 
     // /////////////////////////////////////////////
@@ -923,4 +927,22 @@
     public long getConsumerFailoverRedeliveryWaitPeriod() {
         return consumerFailoverRedeliveryWaitPeriod;
     }
+
+    public ClientInternalExceptionListener getClientInternalExceptionListener() {
+        return clientInternalExceptionListener;
+    }
+    
+    /**
+     * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
+     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
+     * an exception listener.
+     * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
+     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
+     * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
+     * created by this factory
+     */
+    public void setClientInternalExceptionListener(
+            ClientInternalExceptionListener clientInternalExceptionListener) {
+        this.clientInternalExceptionListener = clientInternalExceptionListener;
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=918386&r1=918385&r2=918386&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Mar  3 10:31:19 2010
@@ -1207,6 +1207,7 @@
                             } catch (RuntimeException e) {
                                 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
                                     // Redeliver the message
+                                    unconsumedMessages.enqueueFirst(md);
                                 } else {
                                     // Transacted or Client ack: Deliver the
                                     // next message.

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java?rev=918386&r1=918385&r2=918386&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java Wed Mar  3 10:31:19 2010
@@ -30,7 +30,6 @@
 import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -184,6 +183,29 @@
         
     }
 
+    
+    public void testSetClientInternalExceptionListener() throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        connection = (ActiveMQConnection)cf.createConnection();
+        assertNull(connection.getClientInternalExceptionListener());
+        
+        ClientInternalExceptionListener listener = new ClientInternalExceptionListener() {
+            public void onException(Throwable exception) {
+            }
+        };
+        connection.setClientInternalExceptionListener(listener);
+        cf.setClientInternalExceptionListener(listener);
+        
+        connection = (ActiveMQConnection)cf.createConnection();
+        assertNotNull(connection.getClientInternalExceptionListener());
+        assertEquals(listener, connection.getClientInternalExceptionListener());
+        
+        connection = (ActiveMQConnection)cf.createConnection();
+        assertEquals(listener, connection.getClientInternalExceptionListener());   
+        assertEquals(listener, cf.getClientInternalExceptionListener());
+        
+    }
+
     protected void assertCreateConnection(String uri) throws Exception {
         // Start up a broker with a tcp connector.
         broker = new BrokerService();

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java?rev=918386&r1=918385&r2=918386&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java Wed Mar  3 10:31:19 2010
@@ -16,24 +16,34 @@
  */
 package org.apache.activemq.usecases;
 
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.activemq.memory.list.MessageList;
 import org.apache.activemq.test.TestSupport;
 import org.apache.activemq.util.IdGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.1.1.1 $
  */
 public class TopicRedeliverTest extends TestSupport {
 
+    private static final Log LOG = LogFactory.getLog(TopicRedeliverTest.class);
     private static final int RECEIVE_TIMEOUT = 10000;
 
     protected int deliveryMode = DeliveryMode.PERSISTENT;
@@ -224,4 +234,50 @@
         connection.close();
     }
 
+    
+    public void testRedeliveryOnListenerException() throws Exception {
+        Destination destination = createDestination(getClass().getName());
+        Connection connection = createConnection();
+        connection.setClientID(idGen.generateId());
+        connection.start();
+        Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(destination);
+        
+        final ArrayList<Message> receivedMessages = new ArrayList<Message>();
+        final CountDownLatch received = new CountDownLatch(6);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                LOG.info("got: " + message);
+                receivedMessages.add(message);
+                received.countDown();
+                if (received.getCount() == 5) {
+                    throw new RuntimeException("force redelivery on first message");
+                }
+            }
+        });
+        Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(destination);
+        producer.setDeliveryMode(deliveryMode);
+
+        TextMessage sentMsg = producerSession.createTextMessage();
+        sentMsg.setText("msg1");
+        producer.send(sentMsg);
+        producerSession.commit();
+
+        sentMsg = producerSession.createTextMessage();
+        sentMsg.setText("msg2");
+        producer.send(sentMsg);
+        producerSession.commit();
+
+        TimeUnit.SECONDS.sleep(2);
+        //assertTrue("got our redeliveries", received.await(20, TimeUnit.SECONDS));
+        assertEquals("got message one", "msg1", ((TextMessage)receivedMessages.get(0)).getText());
+        // retries
+        for (int i=1; i< 6; i++) {
+            assertEquals("got message one", "msg2", ((TextMessage)receivedMessages.get(i)).getText());
+        }
+        
+        connection.close();
+    }
+
 }