You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/08/29 10:21:10 UTC
svn commit: r690144 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/Queue.java
test/java/org/apache/activemq/bugs/AMQ1917Test.java
Author: gtully
Date: Fri Aug 29 01:21:09 2008
New Revision: 690144
URL: http://svn.apache.org/viewvc?rev=690144&view=rev
Log:
fix AMQ-1917
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=690144&r1=690143&r2=690144&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Aug 29 01:21:09 2008
@@ -210,11 +210,13 @@
LinkedList<RecoveryDispatch> recoveries = new LinkedList<RecoveryDispatch>();
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
+ // synchronize with dispatch method so that no new messages are sent
+ // while setting up a subscription. avoid out of order messages,
+ // duplicates, etc.
dispatchLock.lock();
try {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
-// MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
// needs to be synchronized - so no contention with dispatching
synchronized (consumers) {
@@ -229,32 +231,28 @@
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
}
- // synchronize with dispatch method so that no new messages are sent
- // while
- // setting up a subscription. avoid out of order messages,
- // duplicates
- // etc.
+
+ // any newly paged in messages that are not dispatched are added to pagedInPending in iterate()
doPageIn(false);
-
+
synchronized (pagedInMessages) {
RecoveryDispatch rd = new RecoveryDispatch();
rd.messages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
rd.subscription = sub;
recoveries.addLast(rd);
}
-
if( sub instanceof QueueBrowserSubscription ) {
((QueueBrowserSubscription)sub).incrementQueueRef();
}
if (!this.optimizedDispatch) {
- wakeup();
+ wakeup();
}
}finally {
dispatchLock.unlock();
}
if (this.optimizedDispatch) {
- // Outside of dispatchLock() to maintain the lock hierarchy of
- // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
+ // Outside of dispatchLock() to maintain the lock hierarchy of
+ // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
wakeup();
}
}
@@ -262,11 +260,10 @@
public void removeSubscription(ConnectionContext context, Subscription sub)
throws Exception {
destinationStatistics.getConsumers().decrement();
+ // synchronize with dispatch method so that no new messages are sent
+ // while removing up a subscription.
dispatchLock.lock();
try {
- // synchronize with dispatch method so that no new messages are sent
- // while
- // removing up a subscription.
synchronized (consumers) {
removeFromConsumerList(sub);
if (sub.getConsumerInfo().isExclusive()) {
@@ -324,7 +321,6 @@
}
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
-// System.out.println(getName()+" send "+message.getMessageId());
final ConnectionContext context = producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
@@ -934,9 +930,17 @@
for (QueueMessageReference node : rd.messages) {
if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
msgContext.setMessageReference(node);
- if (rd.subscription.matches(node, msgContext)) {
- rd.subscription.add(node);
+ if (rd.subscription.matches(node, msgContext)) {
+ rd.subscription.add(node);
+ } else {
+ // make sure it gets queued for dispatched again
+ dispatchLock.lock();
+ try {
+ pagedInPendingDispatch.add(node);
+ } finally {
+ dispatchLock.unlock();
}
+ }
}
}
@@ -949,24 +953,24 @@
}
}
- boolean result = false;
+ boolean pageInMoreMessages = false;
synchronized (messages) {
- result = !messages.isEmpty();
+ pageInMoreMessages = !messages.isEmpty();
}
// Kinda ugly.. but I think dispatchLock is the only mutex protecting the
// pagedInPendingDispatch variable.
dispatchLock.lock();
try {
- result |= !pagedInPendingDispatch.isEmpty();
+ pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
} finally {
dispatchLock.unlock();
}
// Perhaps we should page always into the pagedInPendingDispatch list is
- // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
- // then we do a dispatch.
- if (result) {
+ // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
+ // then we do a dispatch.
+ if (pageInMoreMessages) {
try {
pageInMessages(false);
@@ -1116,8 +1120,8 @@
int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
toPageIn = Math.min(toPageIn,getMaxPageSize());
if (isLazyDispatch()&& !force) {
- // Only page in the minimum number of messages which can be dispatched immediately.
- toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
+ // Only page in the minimum number of messages which can be dispatched immediately.
+ toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
}
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn);
@@ -1158,21 +1162,17 @@
dispatchLock.lock();
try {
if(!pagedInPendingDispatch.isEmpty()) {
- // System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size());
// Try to first dispatch anything that had not been dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
-// System.out.println(getName()+": new pending list1: "+pagedInPendingDispatch.size());
}
// and now see if we can dispatch the new stuff.. and append to the pending
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
-// System.out.println(getName()+": dispatching from paged in: "+list.size());
if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list));
} else {
pagedInPendingDispatch.addAll(list);
}
-// System.out.println(getName()+": new pending list2: "+pagedInPendingDispatch.size());
}
} finally {
dispatchLock.unlock();
@@ -1200,7 +1200,6 @@
if (!s.isFull()) {
// Dispatch it.
s.add(node);
- //System.err.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
target = s;
break;
} else {
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java?rev=690144&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java Fri Aug 29 01:21:09 2008
@@ -0,0 +1,207 @@
+package org.apache.activemq.bugs;
+
+import junit.framework.TestCase;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+
+
+public class AMQ1917Test extends TestCase {
+
+ private static final int NUM_MESSAGES = 4000;
+ private static final int NUM_THREADS = 10;
+ public static final String REQUEST_QUEUE = "mock.in.queue";
+ public static final String REPLY_QUEUE = "mock.out.queue";
+
+ Destination requestDestination = ActiveMQDestination.createDestination(
+ REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE);
+ Destination replyDestination = ActiveMQDestination.createDestination(
+ REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE);
+
+ CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
+ CountDownLatch errorLatch = new CountDownLatch(1);
+ ThreadPoolExecutor tpe;
+ final String BROKER_URL = "tcp://localhost:61616";
+ BrokerService broker = null;
+ private boolean working = true;
+
+ // trival session/producer pool
+ final Session[] sessions = new Session[NUM_THREADS];
+ final MessageProducer[] producers = new MessageProducer[NUM_THREADS];
+
+ public void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.addConnector(BROKER_URL);
+ broker.start();
+
+ BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
+ tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000,
+ TimeUnit.MILLISECONDS, queue);
+ ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory());
+ tpe.setThreadFactory(limitedthreadFactory);
+ }
+
+ public void tearDown() throws Exception {
+ broker.stop();
+ tpe.shutdown();
+ }
+
+ public void testLoadedSendRecieveWithCorrelationId() throws Exception {
+
+ ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
+ connectionFactory.setBrokerURL(BROKER_URL);
+ Connection connection = connectionFactory.createConnection();
+ setupReceiver(connection);
+
+ connection = connectionFactory.createConnection();
+ connection.start();
+
+ // trival session/producer pool
+ for (int i=0; i<NUM_THREADS; i++) {
+ sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producers[i] = sessions[i].createProducer(requestDestination);
+ }
+
+ for (int i = 0; i < NUM_MESSAGES; i++) {
+ MessageSenderReceiver msr = new MessageSenderReceiver(requestDestination,
+ replyDestination, "Test Message : " + i);
+ tpe.execute(msr);
+ }
+
+ while (!roundTripLatch.await(4000, TimeUnit.MILLISECONDS)) {
+ if (errorLatch.await(1000, TimeUnit.MILLISECONDS)) {
+ fail("there was an error, check the console for thread or thread allocation failure");
+ break;
+ }
+ }
+ working = false;
+ }
+
+ private void setupReceiver(final Connection connection) throws Exception {
+
+ final Session session = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = session
+ .createConsumer(requestDestination);
+ final MessageProducer sender = session.createProducer(replyDestination);
+ connection.start();
+
+ new Thread() {
+ public void run() {
+ while (working) {
+ // wait for messages in infinitive loop
+ // time out is set to show the client is awaiting
+ try {
+ TextMessage msg = (TextMessage) consumer.receive(20000);
+ if (msg == null) {
+ errorLatch.countDown();
+ fail("Response timed out."
+ + " latchCount=" + roundTripLatch.getCount());
+ } else {
+ String result = msg.getText();
+ //System.out.println("Request:" + (i++)
+ // + ", msg=" + result + ", ID" + msg.getJMSMessageID());
+ TextMessage response = session.createTextMessage();
+ response.setJMSCorrelationID(msg.getJMSMessageID());
+ response.setText(result);
+ sender.send(response);
+ }
+ } catch (JMSException e) {
+ errorLatch.countDown();
+ fail("Unexpected exception:" + e);
+ }
+ }
+ }
+ }.start();
+ }
+
+ class MessageSenderReceiver implements Runnable {
+
+ Destination reqDest;
+ Destination replyDest;
+ String origMsg;
+
+ public MessageSenderReceiver(Destination reqDest,
+ Destination replyDest, String msg) throws Exception {
+ this.replyDest = replyDest;
+ this.reqDest = reqDest;
+ this.origMsg = msg;
+ }
+
+ private int getIndexFromCurrentThread() {
+ String name = Thread.currentThread().getName();
+ String num = name.substring(name.lastIndexOf('-') +1);
+ int idx = Integer.parseInt(num) -1;
+ assertTrue("idx is in range: idx=" + idx, idx < NUM_THREADS);
+ return idx;
+ }
+
+ public void run() {
+ try {
+ // get thread session and producer from pool
+ int threadIndex = getIndexFromCurrentThread();
+ Session session = sessions[threadIndex];
+ MessageProducer producer = producers[threadIndex];
+
+ final Message sendJmsMsg = session.createTextMessage(origMsg);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producer.send(sendJmsMsg);
+
+ String jmsId = sendJmsMsg.getJMSMessageID();
+ String selector = "JMSCorrelationID='" + jmsId + "'";
+
+ MessageConsumer consumer = session.createConsumer(replyDest,
+ selector);
+ Message receiveJmsMsg = consumer.receive(2000);
+ consumer.close();
+ if (receiveJmsMsg == null) {
+ errorLatch.countDown();
+ fail("Unable to receive response for:" + origMsg
+ + ", with selector=" + selector);
+ } else {
+ //System.out.println("received response message :"
+ // + ((TextMessage) receiveJmsMsg).getText()
+ // + " with selector : " + selector);
+ roundTripLatch.countDown();
+ }
+ } catch (JMSException e) {
+ fail("unexpected exception:" + e);
+ }
+ }
+ }
+
+ public class LimitedThreadFactory implements ThreadFactory {
+ int threadCount;
+ private ThreadFactory factory;
+ public LimitedThreadFactory(ThreadFactory threadFactory) {
+ this.factory = threadFactory;
+ }
+
+ public Thread newThread(Runnable arg0) {
+ if (++threadCount > NUM_THREADS) {
+ errorLatch.countDown();
+ fail("too many threads requested");
+ }
+ return factory.newThread(arg0);
+ }
+ }
+ }
+
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date