You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/03/18 20:49:39 UTC
svn commit: r755715 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/store/kahadaptor/
test/java/org/apache/activemq/bugs/ test/resou...
Author: gtully
Date: Wed Mar 18 19:49:39 2009
New Revision: 755715
URL: http://svn.apache.org/viewvc?rev=755715&view=rev
Log:
partial fix for AMQ2149|http://issues.apache.org/activemq/browse/AMQ-2149 , contention when usage limit reached can lead to out or order mesage dispatch
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=755715&r1=755714&r2=755715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Mar 18 19:49:39 2009
@@ -70,6 +70,8 @@
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -81,7 +83,7 @@
*
* @version $Revision: 1.28 $
*/
-public class Queue extends BaseDestination implements Task {
+public class Queue extends BaseDestination implements Task, UsageListener {
protected static final Log LOG = LogFactory.getLog(Queue.class);
protected TaskRunnerFactory taskFactory;
protected TaskRunner taskRunner;
@@ -99,7 +101,7 @@
private final ReentrantLock dispatchLock = new ReentrantLock();
private boolean useConsumerPriority=true;
private boolean strictOrderDispatch=false;
- private QueueDispatchSelector dispatchSelector;
+ private QueueDispatchSelector dispatchSelector;
private boolean optimizedDispatch=false;
private boolean firstConsumer = false;
private int timeBeforeDispatchStarts = 0;
@@ -133,6 +135,16 @@
}
}
+ // make the queue easily visible in the debugger from its task runner threads
+ final class QueueThread extends Thread {
+ final Queue queue;
+ public QueueThread(Runnable runnable, String name,
+ Queue queue) {
+ super(runnable, name);
+ this.queue = queue;
+ }
+ }
+
public void initialize() throws Exception {
if (this.messages == null) {
if (destination.isTemporary() || broker == null || store == null) {
@@ -153,9 +165,10 @@
if (isOptimizedDispatch()) {
this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName());
}else {
+ final Queue queue = this;
this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "QueueThread:"+destination);
+ Thread thread = new QueueThread(runnable, "QueueThread:"+destination, queue);
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
@@ -565,6 +578,7 @@
if (memoryUsage != null) {
memoryUsage.start();
}
+ systemUsage.getMemoryUsage().addUsageListener(this);
messages.start();
doPageIn(false);
}
@@ -579,6 +593,8 @@
if (messages != null) {
messages.stop();
}
+
+ systemUsage.getMemoryUsage().removeUsageListener(this);
if (memoryUsage != null) {
memoryUsage.stop();
}
@@ -1000,6 +1016,15 @@
public boolean iterate() {
boolean pageInMoreMessages = false;
synchronized(iteratingMutex) {
+
+ // do early to allow dispatch of these waiting messages
+ synchronized(messagesWaitingForSpace) {
+ while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
+ Runnable op = messagesWaitingForSpace.removeFirst();
+ op.run();
+ }
+ }
+
BrowserDispatch rd;
while ((rd = getNextBrowserDispatch()) != null) {
pageInMoreMessages = true;
@@ -1078,13 +1103,7 @@
LOG.error("Failed to page in more queue messages ", e);
}
}
- synchronized(messagesWaitingForSpace) {
- while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
- Runnable op = messagesWaitingForSpace.removeFirst();
- op.run();
- }
- }
- return false;
+ return !messagesWaitingForSpace.isEmpty();
}
}
@@ -1520,4 +1539,18 @@
}
return sub;
}
+
+ public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
+ if (oldPercentUsage > newPercentUsage) {
+ synchronized(messagesWaitingForSpace) {
+ if (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
+ try {
+ this.taskRunner.wakeup();
+ } catch (InterruptedException e) {
+ LOG.warn(getName() + " failed to wakeup task runner on usageChange: " + e);
+ }
+ }
+ }
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=755715&r1=755714&r2=755715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Mar 18 19:49:39 2009
@@ -25,8 +25,6 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.usage.Usage;
-import org.apache.activemq.usage.UsageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,7 +32,7 @@
* Store based cursor
*
*/
-public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
+public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
protected final Destination regionDestination;
private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
@@ -60,11 +58,9 @@
cacheEnabled=true;
}
}
- getSystemUsage().getMemoryUsage().addUsageListener(this);
}
public final synchronized void stop() throws Exception {
- getSystemUsage().getMemoryUsage().removeUsageListener(this);
resetBatch();
super.stop();
gc();
@@ -160,7 +156,9 @@
if (cacheEnabled) {
cacheEnabled=false;
// sync with store on disabling the cache
- setBatch(lastCachedId);
+ if (lastCachedId != null) {
+ setBatch(lastCachedId);
+ }
}
}
size++;
@@ -190,20 +188,6 @@
batchList.remove(node.getMessageId());
}
-
- public final synchronized void onUsageChanged(Usage usage, int oldPercentUsage,
- int newPercentUsage) {
- if (oldPercentUsage > newPercentUsage && oldPercentUsage >= memoryUsageHighWaterMark) {
- storeHasMessages = true;
- try {
- fillBatch();
- } catch (Exception e) {
- LOG.error("Failed to fill batch ", e);
- }
- }
-
- }
-
public final synchronized void clear() {
gc();
}
@@ -229,7 +213,6 @@
resetBatch();
this.batchResetNeeded = false;
}
-
if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) {
this.storeHasMessages = false;
try {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=755715&r1=755714&r2=755715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Mar 18 19:49:39 2009
@@ -32,7 +32,8 @@
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.AbstractMessageStore;
-import org.apache.activemq.usage.MemoryUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @author rajdavies
@@ -40,6 +41,7 @@
*/
public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
+ private static final Log LOG = LogFactory.getLog(KahaReferenceStore.class);
protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
protected KahaReferenceStoreAdapter adapter;
private StoreEntry batchEntry;
@@ -120,6 +122,11 @@
if ( recoverReference(listener, msg)) {
count++;
lastBatchId = msg.getMessageId();
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(destination.getQualifiedName() + " did not recover:" + msg.getMessageId());
+ }
+ break;
}
} else {
lastBatchId = null;
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=755715&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Wed Mar 18 19:49:39 2009
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.util.Vector;
+
+import junit.framework.TestCase;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AMQ2149Test extends TestCase {
+
+ private static final Log log = LogFactory.getLog(AMQ2149Test.class);
+
+ private String BROKER_URL;
+ private final String SEQ_NUM_PROPERTY = "seqNum";
+
+ final int MESSAGE_LENGTH_BYTES = 75000;
+ final int MAX_TO_SEND = 2000;
+ final long SLEEP_BETWEEN_SEND_MS = 5;
+ final int NUM_SENDERS_AND_RECEIVERS = 10;
+
+ BrokerService broker;
+ Vector<Throwable> exceptions = new Vector<Throwable>();
+
+ public void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.addConnector("tcp://localhost:0");
+ broker.deleteAllMessages();
+
+ SystemUsage usage = new SystemUsage();
+ MemoryUsage memoryUsage = new MemoryUsage();
+ memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS);
+ usage.setMemoryUsage(memoryUsage);
+ broker.setSystemUsage(usage);
+ broker.start();
+
+ BROKER_URL = "failover:("
+ + broker.getTransportConnectors().get(0).getUri()
+ +")?maxReconnectDelay=1000&useExponentialBackOff=false";
+ }
+
+ public void tearDown() throws Exception {
+ broker.stop();
+ }
+
+ private String buildLongString() {
+ final StringBuilder stringBuilder = new StringBuilder(
+ MESSAGE_LENGTH_BYTES);
+ for (int i = 0; i < MESSAGE_LENGTH_BYTES; ++i) {
+ stringBuilder.append((int) (Math.random() * 10));
+ }
+ return stringBuilder.toString();
+ }
+
+ private class Receiver implements MessageListener {
+
+ private final String queueName;
+
+ private final Connection connection;
+
+ private final Session session;
+
+ private final MessageConsumer messageConsumer;
+
+ private volatile long nextExpectedSeqNum = 0;
+
+ public Receiver(String queueName) throws JMSException {
+ this.queueName = queueName;
+ connection = new ActiveMQConnectionFactory(BROKER_URL)
+ .createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ messageConsumer = session.createConsumer(new ActiveMQQueue(
+ queueName));
+ messageConsumer.setMessageListener(this);
+ connection.start();
+ }
+
+ public void onMessage(Message message) {
+ try {
+ final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
+ if ((seqNum % 100) == 0) {
+ log.info(queueName + " received " + seqNum);
+ }
+ if (seqNum != nextExpectedSeqNum) {
+ log.warn(queueName + " received " + seqNum + " expected "
+ + nextExpectedSeqNum);
+ fail(queueName + " received " + seqNum + " expected "
+ + nextExpectedSeqNum);
+ }
+ ++nextExpectedSeqNum;
+ } catch (Throwable e) {
+ log.error(queueName + " onMessage error", e);
+ exceptions.add(e);
+ }
+ }
+
+ }
+
+ private class Sender implements Runnable {
+
+ private final String queueName;
+
+ private final Connection connection;
+
+ private final Session session;
+
+ private final MessageProducer messageProducer;
+
+ private volatile long nextSequenceNumber = 0;
+
+ public Sender(String queueName) throws JMSException {
+ this.queueName = queueName;
+ connection = new ActiveMQConnectionFactory(BROKER_URL)
+ .createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ messageProducer = session.createProducer(new ActiveMQQueue(
+ queueName));
+ messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection.start();
+ }
+
+ public void run() {
+ final String longString = buildLongString();
+ while (nextSequenceNumber <= MAX_TO_SEND) {
+ try {
+ final Message message = session
+ .createTextMessage(longString);
+ message.setLongProperty(SEQ_NUM_PROPERTY,
+ nextSequenceNumber);
+ ++nextSequenceNumber;
+ messageProducer.send(message);
+ } catch (Exception e) {
+ log.error(queueName + " send error", e);
+ exceptions.add(e);
+ }
+ try {
+ Thread.sleep(SLEEP_BETWEEN_SEND_MS);
+ } catch (InterruptedException e) {
+ log.warn(queueName + " sleep interrupted", e);
+ }
+ }
+ }
+ }
+
+ public void testOutOfOrderWithMemeUsageLimit() throws Exception {
+ Vector<Thread> threads = new Vector<Thread>();
+
+ for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) {
+ final String queueName = "test.queue." + i;
+ new Receiver(queueName);
+ Thread thread = new Thread(new Sender(queueName));
+ thread.start();
+ threads.add(thread);
+ }
+
+ final long expiry = System.currentTimeMillis() + 1000 * 60 * 5;
+ while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
+ Thread sendThread = threads.firstElement();
+ sendThread.join(1000*10);
+ if (!sendThread.isAlive()) {
+ threads.remove(sendThread);
+ }
+ }
+ assertTrue("No timeout waiting for senders to complete", System.currentTimeMillis() < expiry);
+ assertTrue("No exceptions", exceptions.isEmpty());
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml?rev=755715&r1=755714&r2=755715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml Wed Mar 18 19:49:39 2009
@@ -18,7 +18,7 @@
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
- <broker brokerName="master" persistent="false" useJmx="false" deleteAllMessagesOnStartup="true" xmlns="http://activemq.apache.org/schema/core">
+ <broker brokerName="master" useJmx="false" deleteAllMessagesOnStartup="true" xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:62001"/>
</transportConnectors>
Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml?rev=755715&r1=755714&r2=755715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml Wed Mar 18 19:49:39 2009
@@ -18,7 +18,7 @@
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
- <broker brokerName="slave" useJmx="false" masterConnectorURI="tcp://localhost:62001" xmlns="http://activemq.apache.org/schema/core">
+ <broker brokerName="slave" deleteAllMessagesOnStartup="true" useJmx="false" masterConnectorURI="tcp://localhost:62001" xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:62002"/>
</transportConnectors>