You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/03/03 11:31:19 UTC
svn commit: r918386 - in /activemq/branches/activemq-5.3/activemq-core/src:
main/java/org/apache/activemq/ test/java/org/apache/activemq/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Wed Mar 3 10:31:19 2010
New Revision: 918386
URL: http://svn.apache.org/viewvc?rev=918386&view=rev
Log:
merge -c 918384 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2635
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=918386&r1=918385&r2=918386&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Wed Mar 3 10:31:19 2010
@@ -115,6 +115,7 @@
private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
private boolean useDedicatedTaskRunner;
private long consumerFailoverRedeliveryWaitPeriod = 0;
+ private ClientInternalExceptionListener clientInternalExceptionListener;
// /////////////////////////////////////////////
//
@@ -323,6 +324,9 @@
if (exceptionListener != null) {
connection.setExceptionListener(exceptionListener);
}
+ if (clientInternalExceptionListener != null) {
+ connection.setClientInternalExceptionListener(clientInternalExceptionListener);
+ }
}
// /////////////////////////////////////////////
@@ -923,4 +927,22 @@
public long getConsumerFailoverRedeliveryWaitPeriod() {
return consumerFailoverRedeliveryWaitPeriod;
}
+
+ public ClientInternalExceptionListener getClientInternalExceptionListener() {
+ return clientInternalExceptionListener;
+ }
+
+ /**
+ * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
+ * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
+ * an exception listener.
+ * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
+ * on connection (as it will be if more than one connection is subsequently created by this connection factory)
+ * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
+ * created by this factory
+ */
+ public void setClientInternalExceptionListener(
+ ClientInternalExceptionListener clientInternalExceptionListener) {
+ this.clientInternalExceptionListener = clientInternalExceptionListener;
+ }
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=918386&r1=918385&r2=918386&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Mar 3 10:31:19 2010
@@ -1207,6 +1207,7 @@
} catch (RuntimeException e) {
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
// Redeliver the message
+ unconsumedMessages.enqueueFirst(md);
} else {
// Transacted or Client ack: Deliver the
// next message.
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java?rev=918386&r1=918385&r2=918386&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java Wed Mar 3 10:31:19 2010
@@ -30,7 +30,6 @@
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -184,6 +183,29 @@
}
+
+ public void testSetClientInternalExceptionListener() throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ connection = (ActiveMQConnection)cf.createConnection();
+ assertNull(connection.getClientInternalExceptionListener());
+
+ ClientInternalExceptionListener listener = new ClientInternalExceptionListener() {
+ public void onException(Throwable exception) {
+ }
+ };
+ connection.setClientInternalExceptionListener(listener);
+ cf.setClientInternalExceptionListener(listener);
+
+ connection = (ActiveMQConnection)cf.createConnection();
+ assertNotNull(connection.getClientInternalExceptionListener());
+ assertEquals(listener, connection.getClientInternalExceptionListener());
+
+ connection = (ActiveMQConnection)cf.createConnection();
+ assertEquals(listener, connection.getClientInternalExceptionListener());
+ assertEquals(listener, cf.getClientInternalExceptionListener());
+
+ }
+
protected void assertCreateConnection(String uri) throws Exception {
// Start up a broker with a tcp connector.
broker = new BrokerService();
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java?rev=918386&r1=918385&r2=918386&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java Wed Mar 3 10:31:19 2010
@@ -16,24 +16,34 @@
*/
package org.apache.activemq.usecases;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import org.apache.activemq.memory.list.MessageList;
import org.apache.activemq.test.TestSupport;
import org.apache.activemq.util.IdGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.1.1.1 $
*/
public class TopicRedeliverTest extends TestSupport {
+ private static final Log LOG = LogFactory.getLog(TopicRedeliverTest.class);
private static final int RECEIVE_TIMEOUT = 10000;
protected int deliveryMode = DeliveryMode.PERSISTENT;
@@ -224,4 +234,50 @@
connection.close();
}
+
+ public void testRedeliveryOnListenerException() throws Exception {
+ Destination destination = createDestination(getClass().getName());
+ Connection connection = createConnection();
+ connection.setClientID(idGen.generateId());
+ connection.start();
+ Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+ final ArrayList<Message> receivedMessages = new ArrayList<Message>();
+ final CountDownLatch received = new CountDownLatch(6);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ LOG.info("got: " + message);
+ receivedMessages.add(message);
+ received.countDown();
+ if (received.getCount() == 5) {
+ throw new RuntimeException("force redelivery on first message");
+ }
+ }
+ });
+ Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(destination);
+ producer.setDeliveryMode(deliveryMode);
+
+ TextMessage sentMsg = producerSession.createTextMessage();
+ sentMsg.setText("msg1");
+ producer.send(sentMsg);
+ producerSession.commit();
+
+ sentMsg = producerSession.createTextMessage();
+ sentMsg.setText("msg2");
+ producer.send(sentMsg);
+ producerSession.commit();
+
+ TimeUnit.SECONDS.sleep(2);
+ //assertTrue("got our redeliveries", received.await(20, TimeUnit.SECONDS));
+ assertEquals("got message one", "msg1", ((TextMessage)receivedMessages.get(0)).getText());
+ // retries
+ for (int i=1; i< 6; i++) {
+ assertEquals("got message one", "msg2", ((TextMessage)receivedMessages.get(i)).getText());
+ }
+
+ connection.close();
+ }
+
}