You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/12/27 12:06:53 UTC
svn commit: r607038 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/policy/
main/java/org/apache/activemq/store/amq/ test/java/org/ap...
Author: rajdavies
Date: Thu Dec 27 03:06:50 2007
New Revision: 607038
URL: http://svn.apache.org/viewvc?rev=607038&view=rev
Log:
Reduce contention around Queues
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Dec 27 03:06:50 2007
@@ -39,7 +39,16 @@
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.ft.MasterConnector;
-import org.apache.activemq.broker.jmx.*;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.ConnectorView;
+import org.apache.activemq.broker.jmx.ConnectorViewMBean;
+import org.apache.activemq.broker.jmx.FTConnectorView;
+import org.apache.activemq.broker.jmx.JmsConnectorView;
+import org.apache.activemq.broker.jmx.ManagedRegionBroker;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.broker.jmx.NetworkConnectorView;
+import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
+import org.apache.activemq.broker.jmx.ProxyConnectorView;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
@@ -151,6 +160,7 @@
private CountDownLatch stoppedLatch = new CountDownLatch(1);
private boolean supportFailOver;
private boolean clustered;
+
static {
String localHostName = "localhost";
@@ -363,7 +373,7 @@
/**
* @return true if this Broker is a slave to a Master
*/
- public synchronized boolean isSlave() {
+ public boolean isSlave() {
return masterConnector != null && masterConnector.isSlave();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Thu Dec 27 03:06:50 2007
@@ -56,18 +56,18 @@
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
}
- public synchronized boolean isActive() {
+ public boolean isActive() {
return active;
}
- protected synchronized boolean isFull() {
+ protected boolean isFull() {
return !active || super.isFull();
}
- public synchronized void gc() {
+ public void gc() {
}
- public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
+ public void add(ConnectionContext context, Destination destination) throws Exception {
super.add(context, destination);
destinations.put(destination.getActiveMQDestination(), destination);
if (active || keepDurableSubsActive) {
@@ -77,38 +77,43 @@
topic.recoverRetroactiveMessages(context, this);
}
}
- dispatchMatched();
+ dispatchPending();
}
- public synchronized void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info) throws Exception {
+ public void activate(SystemUsage memoryManager, ConnectionContext context,
+ ConsumerInfo info) throws Exception {
LOG.debug("Activating " + this);
if (!active) {
this.active = true;
this.context = context;
this.info = info;
if (!keepDurableSubsActive) {
- for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
- Topic topic = (Topic)iter.next();
+ for (Iterator<Destination> iter = destinations.values()
+ .iterator(); iter.hasNext();) {
+ Topic topic = (Topic) iter.next();
topic.activate(context, this);
}
}
- pending.setSystemUsage(memoryManager);
- pending.start();
+ synchronized (pending) {
+ pending.setSystemUsage(memoryManager);
+ pending.start();
- // If nothing was in the persistent store, then try to use the
- // recovery policy.
- if (pending.isEmpty()) {
- for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
- Topic topic = (Topic)iter.next();
- topic.recoverRetroactiveMessages(context, this);
+ // If nothing was in the persistent store, then try to use the
+ // recovery policy.
+ if (pending.isEmpty()) {
+ for (Iterator<Destination> iter = destinations.values()
+ .iterator(); iter.hasNext();) {
+ Topic topic = (Topic) iter.next();
+ topic.recoverRetroactiveMessages(context, this);
+ }
}
}
- dispatchMatched();
+ dispatchPending();
this.usageManager.getMemoryUsage().addUsageListener(this);
}
}
- public synchronized void deactivate(boolean keepDurableSubsActive) throws Exception {
+ public void deactivate(boolean keepDurableSubsActive) throws Exception {
active = false;
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (pending) {
@@ -136,7 +141,9 @@
node.decrementReferenceCount();
}
}
- dispatched.clear();
+ synchronized(dispatched) {
+ dispatched.clear();
+ }
if (!keepDurableSubsActive && pending.isTransient()) {
synchronized (pending) {
try {
@@ -163,7 +170,7 @@
return md;
}
- public synchronized void add(MessageReference node) throws Exception {
+ public void add(MessageReference node) throws Exception {
if (!active && !keepDurableSubsActive) {
return;
}
@@ -171,11 +178,13 @@
super.add(node);
}
- protected synchronized void doAddRecoveredMessage(MessageReference message) throws Exception {
+ protected void doAddRecoveredMessage(MessageReference message) throws Exception {
+ synchronized(pending) {
pending.addRecoveredMessage(message);
+ }
}
- public synchronized int getPendingQueueSize() {
+ public int getPendingQueueSize() {
if (active || keepDurableSubsActive) {
return super.getPendingQueueSize();
}
@@ -187,7 +196,7 @@
throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
}
- protected synchronized boolean canDispatch(MessageReference node) {
+ protected boolean canDispatch(MessageReference node) {
return active;
}
@@ -217,24 +226,28 @@
/**
* Release any references that we are holding.
*/
- public synchronized void destroy() {
- try {
- synchronized (pending) {
+ public void destroy() {
+ synchronized (pending) {
+ try {
+
pending.reset();
while (pending.hasNext()) {
MessageReference node = pending.next();
node.decrementReferenceCount();
}
+
+ } finally {
+ pending.release();
+ pending.clear();
+ }
+ }
+ synchronized(dispatched) {
+ for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
+ MessageReference node = (MessageReference) iter.next();
+ node.decrementReferenceCount();
}
- } finally {
- pending.release();
- pending.clear();
- }
- for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
- MessageReference node = (MessageReference)iter.next();
- node.decrementReferenceCount();
+ dispatched.clear();
}
- dispatched.clear();
}
/**
@@ -247,7 +260,7 @@
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
try {
- dispatchMatched();
+ dispatchPending();
} catch (IOException e) {
LOG.warn("problem calling dispatchMatched", e);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu Dec 27 03:06:50 2007
@@ -64,6 +64,8 @@
private int maxAuditDepth=2048;
protected final SystemUsage usageManager;
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
+ private final Object pendingLock = new Object();
+ private final Object dispatchLock = new Object();
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
super(broker, context, info);
@@ -87,14 +89,14 @@
if (getPrefetchSize() == 0 && !isSlave()) {
prefetchExtension++;
final long dispatchCounterBeforePull = dispatchCounter;
- dispatchMatched();
+ dispatchPending();
// If there was nothing dispatched.. we may need to setup a timeout.
if (dispatchCounterBeforePull == dispatchCounter) {
// imediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
// Send a NULL message.
add(QueueMessageReference.NULL_MESSAGE);
- dispatchMatched();
+ dispatchPending();
}
if (pull.getTimeout() > 0) {
Scheduler.executeAfterDelay(new Runnable() {
@@ -117,216 +119,238 @@
if (dispatchCounterBeforePull == dispatchCounter) {
try {
add(QueueMessageReference.NULL_MESSAGE);
- dispatchMatched();
+ dispatchPending();
} catch (Exception e) {
context.getConnection().serviceException(e);
}
}
}
- public synchronized void add(MessageReference node) throws Exception {
+ public void add(MessageReference node) throws Exception {
boolean pendingEmpty = false;
- pendingEmpty = pending.isEmpty();
+ synchronized(pendingLock) {
+ pendingEmpty = pending.isEmpty();
+ }
enqueueCounter++;
if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
dispatch(node);
} else {
optimizePrefetch();
- synchronized (pending) {
+ synchronized(pendingLock) {
if (pending.isEmpty() && LOG.isDebugEnabled()) {
LOG.debug("Prefetch limit.");
}
pending.addMessageLast(node);
- dispatchMatched();
+
}
+ dispatchPending();
}
}
- public synchronized void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
- try {
- pending.reset();
- while (pending.hasNext()) {
- MessageReference node = pending.next();
- if (node.getMessageId().equals(mdn.getMessageId())) {
- pending.remove();
- createMessageDispatch(node, node.getMessage());
- dispatched.add(node);
- return;
+ public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
+ synchronized(pendingLock) {
+ try {
+ pending.reset();
+ while (pending.hasNext()) {
+ MessageReference node = pending.next();
+ if (node.getMessageId().equals(mdn.getMessageId())) {
+ pending.remove();
+ createMessageDispatch(node, node.getMessage());
+ synchronized(dispatchLock) {
+ dispatched.add(node);
+ }
+ return;
+ }
}
+ } finally {
+ pending.release();
}
- } finally {
- pending.release();
}
- throw new JMSException("Slave broker out of sync with master: Dispatched message (" + mdn.getMessageId() + ") was not in the pending list");
+ throw new JMSException(
+ "Slave broker out of sync with master: Dispatched message ("
+ + mdn.getMessageId() + ") was not in the pending list");
}
- public synchronized void acknowledge(final ConnectionContext context,
- final MessageAck ack) throws Exception {
+ public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
boolean callDispatchMatched = false;
- if (ack.isStandardAck()) {
- // Acknowledge all dispatched messages up till the message id of the
- // acknowledgment.
- int index = 0;
- boolean inAckRange = false;
- List<MessageReference> removeList = new ArrayList<MessageReference>();
- for (final MessageReference node : dispatched) {
- MessageId messageId = node.getMessageId();
- if (ack.getFirstMessageId() == null
- || ack.getFirstMessageId().equals(messageId)) {
- inAckRange = true;
- }
- if (inAckRange) {
- // Don't remove the nodes until we are committed.
- if (!context.isInTransaction()) {
- dequeueCounter++;
- node.getRegionDestination().getDestinationStatistics()
- .getDequeues().increment();
- removeList.add(node);
- } else {
- // setup a Synchronization to remove nodes from the
- // dispatched list.
- context.getTransaction().addSynchronization(
- new Synchronization() {
-
- public void afterCommit() throws Exception {
- synchronized (PrefetchSubscription.this) {
- dequeueCounter++;
- dispatched.remove(node);
- node.getRegionDestination()
- .getDestinationStatistics()
- .getDequeues().increment();
- prefetchExtension--;
+ synchronized(dispatchLock) {
+ if (ack.isStandardAck()) {
+ // Acknowledge all dispatched messages up till the message id of
+ // the
+ // acknowledgment.
+ int index = 0;
+ boolean inAckRange = false;
+ List<MessageReference> removeList = new ArrayList<MessageReference>();
+ for (final MessageReference node : dispatched) {
+ MessageId messageId = node.getMessageId();
+ if (ack.getFirstMessageId() == null
+ || ack.getFirstMessageId().equals(messageId)) {
+ inAckRange = true;
+ }
+ if (inAckRange) {
+ // Don't remove the nodes until we are committed.
+ if (!context.isInTransaction()) {
+ dequeueCounter++;
+ node.getRegionDestination()
+ .getDestinationStatistics().getDequeues()
+ .increment();
+ removeList.add(node);
+ } else {
+ // setup a Synchronization to remove nodes from the
+ // dispatched list.
+ context.getTransaction().addSynchronization(
+ new Synchronization() {
+
+ public void afterCommit()
+ throws Exception {
+ synchronized(dispatchLock) {
+
+ dequeueCounter++;
+ dispatched.remove(node);
+ node
+ .getRegionDestination()
+ .getDestinationStatistics()
+ .getDequeues()
+ .increment();
+ prefetchExtension--;
+ }
}
- }
- public void afterRollback()
- throws Exception {
- super.afterRollback();
- }
- });
- }
- index++;
- acknowledge(context, ack, node);
- if (ack.getLastMessageId().equals(messageId)) {
- if (context.isInTransaction()) {
- // extend prefetch window only if not a pulling
- // consumer
- if (getPrefetchSize() != 0) {
- prefetchExtension = Math.max(prefetchExtension,
- index + 1);
+ public void afterRollback()
+ throws Exception {
+ super.afterRollback();
+ }
+ });
+ }
+ index++;
+ acknowledge(context, ack, node);
+ if (ack.getLastMessageId().equals(messageId)) {
+ if (context.isInTransaction()) {
+ // extend prefetch window only if not a pulling
+ // consumer
+ if (getPrefetchSize() != 0) {
+ prefetchExtension = Math.max(
+ prefetchExtension, index + 1);
+ }
+ } else {
+ prefetchExtension = Math.max(0,
+ prefetchExtension - (index + 1));
}
- } else {
- prefetchExtension = Math.max(0, prefetchExtension
- - (index + 1));
+ callDispatchMatched = true;
+ break;
}
- callDispatchMatched = true;
- break;
}
}
- }
- for (final MessageReference node : removeList) {
- dispatched.remove(node);
- }
- // this only happens after a reconnect - get an ack which is not
- // valid
- if (!callDispatchMatched) {
- if (LOG.isDebugEnabled()) {
- LOG
- .debug("Could not correlate acknowledgment with dispatched message: "
- + ack);
+ for (final MessageReference node : removeList) {
+ dispatched.remove(node);
}
- }
- } else if (ack.isDeliveredAck()) {
- // Message was delivered but not acknowledged: update pre-fetch
- // counters.
- // Acknowledge all dispatched messages up till the message id of the
- // acknowledgment.
- int index = 0;
- for (Iterator<MessageReference> iter = dispatched.iterator(); iter
- .hasNext(); index++) {
- final MessageReference node = iter.next();
- if (ack.getLastMessageId().equals(node.getMessageId())) {
- prefetchExtension = Math.max(prefetchExtension, index + 1);
- callDispatchMatched = true;
- break;
+ // this only happens after a reconnect - get an ack which is not
+ // valid
+ if (!callDispatchMatched) {
+ if (LOG.isDebugEnabled()) {
+ LOG
+ .debug("Could not correlate acknowledgment with dispatched message: "
+ + ack);
+ }
}
- }
- if (!callDispatchMatched) {
- throw new JMSException(
- "Could not correlate acknowledgment with dispatched message: "
- + ack);
- }
- } else if (ack.isRedeliveredAck()) {
- // Message was re-delivered but it was not yet considered to be a
- // DLQ message.
- // Acknowledge all dispatched messages up till the message id of the
- // acknowledgment.
- boolean inAckRange = false;
- for (final MessageReference node : dispatched) {
- MessageId messageId = node.getMessageId();
- if (ack.getFirstMessageId() == null
- || ack.getFirstMessageId().equals(messageId)) {
- inAckRange = true;
- }
- if (inAckRange) {
- node.incrementRedeliveryCounter();
- if (ack.getLastMessageId().equals(messageId)) {
+ } else if (ack.isDeliveredAck()) {
+ // Message was delivered but not acknowledged: update pre-fetch
+ // counters.
+ // Acknowledge all dispatched messages up till the message id of
+ // the
+ // acknowledgment.
+ int index = 0;
+ for (Iterator<MessageReference> iter = dispatched.iterator(); iter
+ .hasNext(); index++) {
+ final MessageReference node = iter.next();
+ if (ack.getLastMessageId().equals(node.getMessageId())) {
+ prefetchExtension = Math.max(prefetchExtension,
+ index + 1);
callDispatchMatched = true;
break;
}
}
- }
- if (!callDispatchMatched) {
- throw new JMSException(
- "Could not correlate acknowledgment with dispatched message: "
- + ack);
- }
- } else if (ack.isPoisonAck()) {
- // TODO: what if the message is already in a DLQ???
- // Handle the poison ACK case: we need to send the message to a DLQ
- if (ack.isInTransaction()) {
- throw new JMSException("Poison ack cannot be transacted: "
- + ack);
- }
- // Acknowledge all dispatched messages up till the message id of the
- // acknowledgment.
- int index = 0;
- boolean inAckRange = false;
- List<MessageReference> removeList = new ArrayList<MessageReference>();
- for (final MessageReference node : dispatched) {
- MessageId messageId = node.getMessageId();
- if (ack.getFirstMessageId() == null
- || ack.getFirstMessageId().equals(messageId)) {
- inAckRange = true;
- }
- if (inAckRange) {
- sendToDLQ(context, node);
- node.getRegionDestination().getDestinationStatistics()
- .getDequeues().increment();
- removeList.add(node);
- dequeueCounter++;
- index++;
- acknowledge(context, ack, node);
- if (ack.getLastMessageId().equals(messageId)) {
- prefetchExtension = Math.max(0, prefetchExtension
- - (index + 1));
- callDispatchMatched = true;
- break;
+ if (!callDispatchMatched) {
+ throw new JMSException(
+ "Could not correlate acknowledgment with dispatched message: "
+ + ack);
+ }
+ } else if (ack.isRedeliveredAck()) {
+ // Message was re-delivered but it was not yet considered to be
+ // a
+ // DLQ message.
+ // Acknowledge all dispatched messages up till the message id of
+ // the
+ // acknowledgment.
+ boolean inAckRange = false;
+ for (final MessageReference node : dispatched) {
+ MessageId messageId = node.getMessageId();
+ if (ack.getFirstMessageId() == null
+ || ack.getFirstMessageId().equals(messageId)) {
+ inAckRange = true;
+ }
+ if (inAckRange) {
+ node.incrementRedeliveryCounter();
+ if (ack.getLastMessageId().equals(messageId)) {
+ callDispatchMatched = true;
+ break;
+ }
}
}
- }
- for (final MessageReference node : removeList) {
- dispatched.remove(node);
- }
- if (!callDispatchMatched) {
- throw new JMSException(
- "Could not correlate acknowledgment with dispatched message: "
- + ack);
+ if (!callDispatchMatched) {
+ throw new JMSException(
+ "Could not correlate acknowledgment with dispatched message: "
+ + ack);
+ }
+ } else if (ack.isPoisonAck()) {
+ // TODO: what if the message is already in a DLQ???
+ // Handle the poison ACK case: we need to send the message to a
+ // DLQ
+ if (ack.isInTransaction()) {
+ throw new JMSException("Poison ack cannot be transacted: "
+ + ack);
+ }
+ // Acknowledge all dispatched messages up till the message id of
+ // the
+ // acknowledgment.
+ int index = 0;
+ boolean inAckRange = false;
+ List<MessageReference> removeList = new ArrayList<MessageReference>();
+ for (final MessageReference node : dispatched) {
+ MessageId messageId = node.getMessageId();
+ if (ack.getFirstMessageId() == null
+ || ack.getFirstMessageId().equals(messageId)) {
+ inAckRange = true;
+ }
+ if (inAckRange) {
+ sendToDLQ(context, node);
+ node.getRegionDestination().getDestinationStatistics()
+ .getDequeues().increment();
+ removeList.add(node);
+ dequeueCounter++;
+ index++;
+ acknowledge(context, ack, node);
+ if (ack.getLastMessageId().equals(messageId)) {
+ prefetchExtension = Math.max(0, prefetchExtension
+ - (index + 1));
+ callDispatchMatched = true;
+ break;
+ }
+ }
+ }
+ for (final MessageReference node : removeList) {
+ dispatched.remove(node);
+ }
+ if (!callDispatchMatched) {
+ throw new JMSException(
+ "Could not correlate acknowledgment with dispatched message: "
+ + ack);
+ }
}
}
if (callDispatchMatched) {
- dispatchMatched();
+ dispatchPending();
} else {
if (isSlave()) {
throw new JMSException(
@@ -356,45 +380,45 @@
*
* @return
*/
- protected synchronized boolean isFull() {
+ protected boolean isFull() {
return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize();
}
/**
* @return true when 60% or more room is left for dispatching messages
*/
- public synchronized boolean isLowWaterMark() {
+ public boolean isLowWaterMark() {
return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
- public synchronized boolean isHighWaterMark() {
+ public boolean isHighWaterMark() {
return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
}
- public synchronized int countBeforeFull() {
+ public int countBeforeFull() {
return info.getPrefetchSize() + prefetchExtension - dispatched.size();
}
- public synchronized int getPendingQueueSize() {
+ public int getPendingQueueSize() {
return pending.size();
}
- public synchronized int getDispatchedQueueSize() {
+ public int getDispatchedQueueSize() {
return dispatched.size();
}
- public synchronized long getDequeueCounter() {
+ public long getDequeueCounter() {
return dequeueCounter;
}
- public synchronized long getDispatchedCounter() {
+ public long getDispatchedCounter() {
return dispatchCounter;
}
- public synchronized long getEnqueueCounter() {
+ public long getEnqueueCounter() {
return enqueueCounter;
}
@@ -402,11 +426,11 @@
return pending.isRecoveryRequired();
}
- public synchronized PendingMessageCursor getPending() {
+ public PendingMessageCursor getPending() {
return this.pending;
}
- public synchronized void setPending(PendingMessageCursor pending) {
+ public void setPending(PendingMessageCursor pending) {
this.pending = pending;
if (this.pending!=null) {
this.pending.setSystemUsage(usageManager);
@@ -430,51 +454,60 @@
*/
}
- public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
- super.add(context, destination);
- pending.add(context, destination);
+ public void add(ConnectionContext context, Destination destination) throws Exception {
+ synchronized(pendingLock) {
+ super.add(context, destination);
+ pending.add(context, destination);
+ }
}
- public synchronized void remove(ConnectionContext context, Destination destination) throws Exception {
- super.remove(context, destination);
- pending.remove(context, destination);
+ public void remove(ConnectionContext context, Destination destination) throws Exception {
+ synchronized(pendingLock) {
+ super.remove(context, destination);
+ pending.remove(context, destination);
+ }
}
- protected synchronized void dispatchMatched() throws IOException {
+ protected void dispatchPending() throws IOException {
if (!isSlave()) {
- try {
- int numberToDispatch = countBeforeFull();
- if (numberToDispatch > 0) {
- pending.setMaxBatchSize(numberToDispatch);
- int count = 0;
- pending.reset();
- while (pending.hasNext() && !isFull() && count < numberToDispatch) {
- MessageReference node = pending.next();
- if (node == null) {
- break;
- }
- if (canDispatch(node)) {
- pending.remove();
- // Message may have been sitting in the pending list
- // a while
- // waiting for the consumer to ak the message.
- if (node != QueueMessageReference.NULL_MESSAGE && broker.isExpired(node)) {
- broker.messageExpired(getContext(), node);
- dequeueCounter++;
- continue;
+ synchronized(pendingLock) {
+ try {
+ int numberToDispatch = countBeforeFull();
+ if (numberToDispatch > 0) {
+ pending.setMaxBatchSize(numberToDispatch);
+ int count = 0;
+ pending.reset();
+ while (pending.hasNext() && !isFull()
+ && count < numberToDispatch) {
+ MessageReference node = pending.next();
+ if (node == null) {
+ break;
+ }
+ if (canDispatch(node)) {
+ pending.remove();
+ // Message may have been sitting in the pending
+ // list
+ // a while
+ // waiting for the consumer to ak the message.
+ if (node != QueueMessageReference.NULL_MESSAGE
+ && broker.isExpired(node)) {
+ broker.messageExpired(getContext(), node);
+ dequeueCounter++;
+ continue;
+ }
+ dispatch(node);
+ count++;
}
- dispatch(node);
- count++;
}
}
+ } finally {
+ pending.release();
}
- } finally {
- pending.release();
}
}
}
- protected synchronized boolean dispatch(final MessageReference node) throws IOException {
+ protected boolean dispatch(final MessageReference node) throws IOException {
final Message message = node.getMessage();
if (message == null) {
return false;
@@ -488,7 +521,9 @@
dispatchCounter++;
dispatched.add(node);
if(pending != null) {
- pending.dispatched(message);
+ synchronized(pendingLock) {
+ pending.dispatched(message);
+ }
}
} else {
prefetchExtension = Math.max(0, prefetchExtension - 1);
@@ -523,7 +558,7 @@
}
if (info.isDispatchAsync()) {
try {
- dispatchMatched();
+ dispatchPending();
} catch (IOException e) {
context.getConnection().serviceExceptionAsync(e);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Dec 27 03:06:50 2007
@@ -94,8 +94,8 @@
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
- private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
private final Object exclusiveLockMutex = new Object();
+ private final Object sendLock = new Object();
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
@@ -204,149 +204,144 @@
return true;
}
- public synchronized void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
+ public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
- MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
- try {
-
- //needs to be synchronized - so no contention with dispatching
- synchronized (consumers) {
- consumers.add(sub);
- if (sub.getConsumerInfo().isExclusive()) {
- LockOwner owner = (LockOwner)sub;
- if (exclusiveOwner == null) {
+ MessageEvaluationContext msgContext = new MessageEvaluationContext();
+
+ // needs to be synchronized - so no contention with dispatching
+ synchronized (consumers) {
+ consumers.add(sub);
+ if (sub.getConsumerInfo().isExclusive()) {
+ LockOwner owner = (LockOwner) sub;
+ if (exclusiveOwner == null) {
+ exclusiveOwner = owner;
+ } else {
+ // switch the owner if the priority is higher.
+ if (owner.getLockPriority() > exclusiveOwner
+ .getLockPriority()) {
exclusiveOwner = owner;
- } else {
- // switch the owner if the priority is higher.
- if (owner.getLockPriority() > exclusiveOwner.getLockPriority()) {
- exclusiveOwner = owner;
- }
}
}
}
-
- //we hold the lock on the dispatchValue - so lets build the paged in
- //list directly;
- buildList(false);
-
- // synchronize with dispatch method so that no new messages are sent
- // while
- // setting up a subscription. avoid out of order messages,
- // duplicates
- // etc.
-
-
-
- msgContext.setDestination(destination);
- synchronized (pagedInMessages) {
- // Add all the matching messages in the queue to the
- // subscription.
- for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
- QueueMessageReference node = (QueueMessageReference)i.next();
- if (node.isDropped() || (!sub.getConsumerInfo().isBrowser() && node.getLockOwner()!=null)) {
- continue;
- }
- try {
- msgContext.setMessageReference(node);
- if (sub.matches(node, msgContext)) {
- sub.add(node);
- }
- } catch (IOException e) {
- log.warn("Could not load message: " + e, e);
- }
+ }
+
+ // we hold the lock on the dispatchValue - so lets build the paged in
+ // list directly;
+ buildList(false);
+
+ // synchronize with dispatch method so that no new messages are sent
+ // while
+ // setting up a subscription. avoid out of order messages,
+ // duplicates
+ // etc.
+
+ msgContext.setDestination(destination);
+ synchronized (pagedInMessages) {
+ // Add all the matching messages in the queue to the
+ // subscription.
+ for (Iterator<MessageReference> i = pagedInMessages.iterator(); i
+ .hasNext();) {
+ QueueMessageReference node = (QueueMessageReference) i.next();
+ if (node.isDropped()
+ || (!sub.getConsumerInfo().isBrowser() && node
+ .getLockOwner() != null)) {
+ continue;
+ }
+ try {
+ msgContext.setMessageReference(node);
+ if (sub.matches(node, msgContext)) {
+ sub.add(node);
}
+ } catch (IOException e) {
+ log.warn("Could not load message: " + e, e);
}
-
-
-
- } finally {
- msgContext.clear();
- }
- }
-
- public synchronized void removeSubscription(ConnectionContext context,
- Subscription sub) throws Exception{
- destinationStatistics.getConsumers().decrement();
- maximumPagedInMessages-=sub.getConsumerInfo().getPrefetchSize();
- // synchronize with dispatch method so that no new messages are sent
- // while
- // removing up a subscription.
- synchronized(consumers){
- consumers.remove(sub);
- if(sub.getConsumerInfo().isExclusive()){
- LockOwner owner=(LockOwner)sub;
- // Did we loose the exclusive owner??
- if(exclusiveOwner==owner){
- // Find the exclusive consumer with the higest Lock
- // Priority.
- exclusiveOwner=null;
- for(Iterator<Subscription> iter=consumers.iterator();iter
- .hasNext();){
- Subscription s=iter.next();
- LockOwner so=(LockOwner)s;
- if(s.getConsumerInfo().isExclusive()
- &&(exclusiveOwner==null||so.getLockPriority()>exclusiveOwner
- .getLockPriority())){
- exclusiveOwner=so;
- }
- }
- }
- }
- if(consumers.isEmpty()){
- messages.gc();
- }
- }
- sub.remove(context,this);
- boolean wasExclusiveOwner=false;
- if(exclusiveOwner==sub){
- exclusiveOwner=null;
- wasExclusiveOwner=true;
- }
- ConsumerId consumerId=sub.getConsumerInfo().getConsumerId();
- MessageGroupSet ownedGroups=getMessageGroupOwners().removeConsumer(
- consumerId);
- if(!sub.getConsumerInfo().isBrowser()){
- MessageEvaluationContext msgContext=context
- .getMessageEvaluationContext();
- try{
- msgContext.setDestination(destination);
- // lets copy the messages to dispatch to avoid deadlock
- List<QueueMessageReference> messagesToDispatch=new ArrayList<QueueMessageReference>();
- synchronized(pagedInMessages){
- for(Iterator<MessageReference> i=pagedInMessages.iterator();i
- .hasNext();){
- QueueMessageReference node=(QueueMessageReference)i
- .next();
- if(node.isDropped()){
- continue;
- }
- String groupID=node.getGroupID();
- // Re-deliver all messages that the sub locked
- if(node.getLockOwner()==sub
- ||wasExclusiveOwner
- ||(groupID!=null&&ownedGroups.contains(groupID))){
- messagesToDispatch.add(node);
- }
- }
- }
- // now lets dispatch from the copy of the collection to
- // avoid deadlocks
- for(Iterator<QueueMessageReference> iter=messagesToDispatch
- .iterator();iter.hasNext();){
- QueueMessageReference node=iter.next();
- node.incrementRedeliveryCounter();
- node.unlock();
- msgContext.setMessageReference(node);
- dispatchPolicy.dispatch(node,msgContext,consumers);
- }
- }finally{
- msgContext.clear();
- }
- }
- }
+ }
+ }
+
+ }
+
+ public void removeSubscription(ConnectionContext context, Subscription sub)
+ throws Exception {
+ destinationStatistics.getConsumers().decrement();
+ maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
+ // synchronize with dispatch method so that no new messages are sent
+ // while
+ // removing up a subscription.
+ synchronized (consumers) {
+ consumers.remove(sub);
+ if (sub.getConsumerInfo().isExclusive()) {
+ LockOwner owner = (LockOwner) sub;
+ // Did we loose the exclusive owner??
+ if (exclusiveOwner == owner) {
+ // Find the exclusive consumer with the higest Lock
+ // Priority.
+ exclusiveOwner = null;
+ for (Iterator<Subscription> iter = consumers.iterator(); iter
+ .hasNext();) {
+ Subscription s = iter.next();
+ LockOwner so = (LockOwner) s;
+ if (s.getConsumerInfo().isExclusive()
+ && (exclusiveOwner == null || so
+ .getLockPriority() > exclusiveOwner
+ .getLockPriority())) {
+ exclusiveOwner = so;
+ }
+ }
+ }
+ }
+ if (consumers.isEmpty()) {
+ messages.gc();
+ }
+ }
+ sub.remove(context, this);
+ boolean wasExclusiveOwner = false;
+ if (exclusiveOwner == sub) {
+ exclusiveOwner = null;
+ wasExclusiveOwner = true;
+ }
+ ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
+ MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(
+ consumerId);
+ if (!sub.getConsumerInfo().isBrowser()) {
+ MessageEvaluationContext msgContext = new MessageEvaluationContext();
+
+ msgContext.setDestination(destination);
+ // lets copy the messages to dispatch to avoid deadlock
+ List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
+ synchronized (pagedInMessages) {
+ for (Iterator<MessageReference> i = pagedInMessages.iterator(); i
+ .hasNext();) {
+ QueueMessageReference node = (QueueMessageReference) i
+ .next();
+ if (node.isDropped()) {
+ continue;
+ }
+ String groupID = node.getGroupID();
+ // Re-deliver all messages that the sub locked
+ if (node.getLockOwner() == sub
+ || wasExclusiveOwner
+ || (groupID != null && ownedGroups
+ .contains(groupID))) {
+ messagesToDispatch.add(node);
+ }
+ }
+ }
+ // now lets dispatch from the copy of the collection to
+ // avoid deadlocks
+ for (Iterator<QueueMessageReference> iter = messagesToDispatch
+ .iterator(); iter.hasNext();) {
+ QueueMessageReference node = iter.next();
+ node.incrementRedeliveryCounter();
+ node.unlock();
+ msgContext.setMessageReference(node);
+ dispatchPolicy.dispatch(node, msgContext, consumers);
+ }
+
+ }
+ }
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
@@ -445,16 +440,21 @@
}
}
- synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
+ void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
- message.setRegionDestination(this);
- if (store != null && message.isPersistent()) {
- while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
- if (context.getStopping().get()) {
- throw new IOException("Connection closed, send aborted.");
+ synchronized (sendLock) {
+ message.setRegionDestination(this);
+ if (store != null && message.isPersistent()) {
+ while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
+ if (context.getStopping().get()) {
+ throw new IOException(
+ "Connection closed, send aborted.");
+ }
}
+
+ store.addMessage(context, message);
+
}
- store.addMessage(context, message);
}
if (context.isInTransaction()) {
// If this is a transacted message.. increase the usage now so that
@@ -1010,57 +1010,51 @@
return result;
}
- private synchronized List<MessageReference> buildList(boolean force) throws Exception {
-
+ private List<MessageReference> buildList(boolean force) throws Exception {
final int toPageIn = maximumPagedInMessages - pagedInMessages.size();
List<MessageReference> result = null;
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn);
- try {
- int count = 0;
- result = new ArrayList<MessageReference>(toPageIn);
- synchronized (messages) {
+ int count = 0;
+ result = new ArrayList<MessageReference>(toPageIn);
+ synchronized (messages) {
- try {
- messages.reset();
- while (messages.hasNext() && count < toPageIn) {
- MessageReference node = messages.next();
- messages.remove();
- if (!broker.isExpired(node)) {
- node = createMessageReference(node.getMessage());
- result.add(node);
- count++;
- } else {
- broker.messageExpired(createConnectionContext(), node);
- destinationStatistics.getMessages().decrement();
- }
+ try {
+ messages.reset();
+ while (messages.hasNext() && count < toPageIn) {
+ MessageReference node = messages.next();
+ messages.remove();
+ if (!broker.isExpired(node)) {
+ node = createMessageReference(node.getMessage());
+ result.add(node);
+ count++;
+ } else {
+ broker.messageExpired(createConnectionContext(),
+ node);
+ destinationStatistics.getMessages().decrement();
}
- } finally {
- messages.release();
}
+ } finally {
+ messages.release();
}
- synchronized (pagedInMessages) {
- pagedInMessages.addAll(result);
- }
- } finally {
- queueMsgConext.clear();
+ }
+ synchronized (pagedInMessages) {
+ pagedInMessages.addAll(result);
}
}
return result;
}
- private synchronized void doDispatch(List<MessageReference> list) throws Exception {
+ private synchronized void doDispatch(List<MessageReference> list) throws Exception {
if (list != null && !list.isEmpty()) {
- try {
- for (int i = 0; i < list.size(); i++) {
- MessageReference node = list.get(i);
- queueMsgConext.setDestination(destination);
- queueMsgConext.setMessageReference(node);
- dispatchPolicy.dispatch(node, queueMsgConext, consumers);
- }
- } finally {
- queueMsgConext.clear();
+ MessageEvaluationContext msgContext = new MessageEvaluationContext();
+ for (int i = 0; i < list.size(); i++) {
+ MessageReference node = list.get(i);
+ msgContext.setDestination(destination);
+ msgContext.setMessageReference(node);
+ dispatchPolicy.dispatch(node, msgContext, consumers);
}
+
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java Thu Dec 27 03:06:50 2007
@@ -44,13 +44,6 @@
* org.apache.activemq.filter.MessageEvaluationContext, java.util.List)
*/
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers) throws Exception {
-
- // Big synch here so that only 1 message gets dispatched at a time.
- // Ensures
- // Everyone sees the same order and that the consumer list is not used
- // while
- // it's being rotated.
- synchronized (consumers) {
int count = 0;
Subscription firstMatchingConsumer = null;
@@ -79,7 +72,5 @@
}
}
return count > 0;
- }
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Thu Dec 27 03:06:50 2007
@@ -25,8 +25,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@@ -47,7 +47,6 @@
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java Thu Dec 27 03:06:50 2007
@@ -31,15 +31,15 @@
answer.setDeleteAllMessagesOnStartup(true);
AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
adaptor.setArchiveDataLogs(true);
- adaptor.setMaxFileLength(1024 * 64);
+ //adaptor.setMaxFileLength(1024 * 64);
answer.setDataDirectoryFile(dataFileDir);
answer.setPersistenceAdapter(adaptor);
answer.addConnector(uri);
}
protected void setUp() throws Exception {
- numberofProducers=6;
- numberOfConsumers=6;
+ numberofProducers=2;
+ numberOfConsumers=10;
this.consumerSleepDuration=0;
super.setUp();
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java Thu Dec 27 03:06:50 2007
@@ -79,7 +79,7 @@
public void onMessage(Message msg) {
rate.increment();
try {
- if (!this.audit.isInOrder(msg.getJMSMessageID())) {
+ if (msg.getJMSDestination() instanceof Topic && !this.audit.isInOrder(msg.getJMSMessageID())) {
LOG.error("Message out of order!!" + msg);
}
if (this.audit.isDuplicate(msg)){
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java Thu Dec 27 03:06:50 2007
@@ -30,8 +30,8 @@
protected void setUp() throws Exception {
numberofProducers=6;
numberOfConsumers=6;
- samepleCount=100;
- playloadSize = 1;
+ samepleCount=1000;
+ playloadSize = 1024;
super.setUp();
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java Thu Dec 27 03:06:50 2007
@@ -30,7 +30,9 @@
}
protected void setUp() throws Exception {
- this.consumerSleepDuration=2000;
+ numberOfConsumers = 50;
+ numberofProducers = 50;
+ this.consumerSleepDuration=10;
super.setUp();
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java Thu Dec 27 03:06:50 2007
@@ -66,8 +66,10 @@
private ActiveMQConnection connection;
private AtomicBoolean stop = new AtomicBoolean(false);
private Throwable error;
+ private String name;
- public Worker() throws URISyntaxException, JMSException {
+ public Worker(String name) throws URISyntaxException, JMSException {
+ this.name=name;
URI uri = new URI("failover://(mock://(" + tcpUri + "))");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
connection = (ActiveMQConnection)factory.createConnection();
@@ -115,7 +117,7 @@
public void run() {
try {
- ActiveMQQueue queue = new ActiveMQQueue("FOO");
+ ActiveMQQueue queue = new ActiveMQQueue("FOO_"+name);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
@@ -213,7 +215,7 @@
workers = new Worker[WORKER_COUNT];
for (int i = 0; i < WORKER_COUNT; i++) {
- workers[i] = new Worker();
+ workers[i] = new Worker(""+i);
workers[i].start();
}