You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/12/27 12:06:53 UTC

svn commit: r607038 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/store/amq/ test/java/org/ap...

Author: rajdavies
Date: Thu Dec 27 03:06:50 2007
New Revision: 607038

URL: http://svn.apache.org/viewvc?rev=607038&view=rev
Log:
Reduce contention around Queues

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Dec 27 03:06:50 2007
@@ -39,7 +39,16 @@
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.broker.ft.MasterConnector;
-import org.apache.activemq.broker.jmx.*;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.ConnectorView;
+import org.apache.activemq.broker.jmx.ConnectorViewMBean;
+import org.apache.activemq.broker.jmx.FTConnectorView;
+import org.apache.activemq.broker.jmx.JmsConnectorView;
+import org.apache.activemq.broker.jmx.ManagedRegionBroker;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.broker.jmx.NetworkConnectorView;
+import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
+import org.apache.activemq.broker.jmx.ProxyConnectorView;
 import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFactory;
@@ -151,6 +160,7 @@
     private CountDownLatch stoppedLatch = new CountDownLatch(1);
     private boolean supportFailOver;
     private boolean clustered;
+   
 
     static {
         String localHostName = "localhost";
@@ -363,7 +373,7 @@
     /**
      * @return true if this Broker is a slave to a Master
      */
-    public synchronized boolean isSlave() {
+    public boolean isSlave() {
         return masterConnector != null && masterConnector.isSlave();
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Thu Dec 27 03:06:50 2007
@@ -56,18 +56,18 @@
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
     }
 
-    public synchronized boolean isActive() {
+    public boolean isActive() {
         return active;
     }
 
-    protected synchronized boolean isFull() {
+    protected boolean isFull() {
         return !active || super.isFull();
     }
 
-    public synchronized void gc() {
+    public void gc() {
     }
 
-    public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
+    public void add(ConnectionContext context, Destination destination) throws Exception {
         super.add(context, destination);
         destinations.put(destination.getActiveMQDestination(), destination);
         if (active || keepDurableSubsActive) {
@@ -77,38 +77,43 @@
                 topic.recoverRetroactiveMessages(context, this);
             }
         }
-        dispatchMatched();
+        dispatchPending();
     }
 
-    public synchronized void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info) throws Exception {
+    public void activate(SystemUsage memoryManager, ConnectionContext context,
+            ConsumerInfo info) throws Exception {
         LOG.debug("Activating " + this);
         if (!active) {
             this.active = true;
             this.context = context;
             this.info = info;
             if (!keepDurableSubsActive) {
-                for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
-                    Topic topic = (Topic)iter.next();
+                for (Iterator<Destination> iter = destinations.values()
+                        .iterator(); iter.hasNext();) {
+                    Topic topic = (Topic) iter.next();
                     topic.activate(context, this);
                 }
             }
-            pending.setSystemUsage(memoryManager);
-            pending.start();
+            synchronized (pending) {
+                pending.setSystemUsage(memoryManager);
+                pending.start();
 
-            // If nothing was in the persistent store, then try to use the
-            // recovery policy.
-            if (pending.isEmpty()) {
-                for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
-                    Topic topic = (Topic)iter.next();
-                    topic.recoverRetroactiveMessages(context, this);
+                // If nothing was in the persistent store, then try to use the
+                // recovery policy.
+                if (pending.isEmpty()) {
+                    for (Iterator<Destination> iter = destinations.values()
+                            .iterator(); iter.hasNext();) {
+                        Topic topic = (Topic) iter.next();
+                        topic.recoverRetroactiveMessages(context, this);
+                    }
                 }
             }
-            dispatchMatched();
+            dispatchPending();
             this.usageManager.getMemoryUsage().addUsageListener(this);
         }
     }
 
-    public synchronized void deactivate(boolean keepDurableSubsActive) throws Exception {
+    public void deactivate(boolean keepDurableSubsActive) throws Exception {
         active = false;
         this.usageManager.getMemoryUsage().removeUsageListener(this);
         synchronized (pending) {
@@ -136,7 +141,9 @@
                 node.decrementReferenceCount();
             }
         }
-        dispatched.clear();
+        synchronized(dispatched) {
+            dispatched.clear();
+        }
         if (!keepDurableSubsActive && pending.isTransient()) {
             synchronized (pending) {
                 try {
@@ -163,7 +170,7 @@
         return md;
     }
 
-    public synchronized void add(MessageReference node) throws Exception {
+    public void add(MessageReference node) throws Exception {
         if (!active && !keepDurableSubsActive) {
             return;
         }
@@ -171,11 +178,13 @@
         super.add(node);
     }
 
-    protected synchronized void doAddRecoveredMessage(MessageReference message) throws Exception {
+    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
+        synchronized(pending) {
         pending.addRecoveredMessage(message);
+        }
     }
 
-    public synchronized int getPendingQueueSize() {
+    public int getPendingQueueSize() {
         if (active || keepDurableSubsActive) {
             return super.getPendingQueueSize();
         }
@@ -187,7 +196,7 @@
         throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
     }
 
-    protected synchronized boolean canDispatch(MessageReference node) {
+    protected boolean canDispatch(MessageReference node) {
         return active;
     }
 
@@ -217,24 +226,28 @@
     /**
      * Release any references that we are holding.
      */
-    public synchronized void destroy() {
-        try {
-            synchronized (pending) {
+    public void destroy() {
+        synchronized (pending) {
+            try {
+
                 pending.reset();
                 while (pending.hasNext()) {
                     MessageReference node = pending.next();
                     node.decrementReferenceCount();
                 }
+
+            } finally {
+                pending.release();
+                pending.clear();
+            }
+        }
+        synchronized(dispatched) {
+            for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
+                MessageReference node = (MessageReference) iter.next();
+                node.decrementReferenceCount();
             }
-        } finally {
-            pending.release();
-            pending.clear();
-        }
-        for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-            MessageReference node = (MessageReference)iter.next();
-            node.decrementReferenceCount();
+            dispatched.clear();
         }
-        dispatched.clear();
     }
 
     /**
@@ -247,7 +260,7 @@
     public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
             try {
-                dispatchMatched();
+                dispatchPending();
             } catch (IOException e) {
                 LOG.warn("problem calling dispatchMatched", e);
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu Dec 27 03:06:50 2007
@@ -64,6 +64,8 @@
     private int maxAuditDepth=2048;
     protected final SystemUsage usageManager;
     protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
+    private final Object pendingLock = new Object();
+    private final Object dispatchLock = new Object();
 
     public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
         super(broker, context, info);
@@ -87,14 +89,14 @@
         if (getPrefetchSize() == 0 && !isSlave()) {
             prefetchExtension++;
             final long dispatchCounterBeforePull = dispatchCounter;
-            dispatchMatched();
+            dispatchPending();
             // If there was nothing dispatched.. we may need to setup a timeout.
             if (dispatchCounterBeforePull == dispatchCounter) {
                 // imediate timeout used by receiveNoWait()
                 if (pull.getTimeout() == -1) {
                     // Send a NULL message.
                     add(QueueMessageReference.NULL_MESSAGE);
-                    dispatchMatched();
+                    dispatchPending();
                 }
                 if (pull.getTimeout() > 0) {
                     Scheduler.executeAfterDelay(new Runnable() {
@@ -117,216 +119,238 @@
         if (dispatchCounterBeforePull == dispatchCounter) {
             try {
                 add(QueueMessageReference.NULL_MESSAGE);
-                dispatchMatched();
+                dispatchPending();
             } catch (Exception e) {
                 context.getConnection().serviceException(e);
             }
         }
     }
 
-    public synchronized void add(MessageReference node) throws Exception {
+    public void add(MessageReference node) throws Exception {
         boolean pendingEmpty = false;
-        pendingEmpty = pending.isEmpty();
+        synchronized(pendingLock) {
+            pendingEmpty = pending.isEmpty();
+        }
         enqueueCounter++;
         if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
             dispatch(node);
         } else {
             optimizePrefetch();
-            synchronized (pending) {
+            synchronized(pendingLock) {
                 if (pending.isEmpty() && LOG.isDebugEnabled()) {
                     LOG.debug("Prefetch limit.");
                 }
                 pending.addMessageLast(node);
-                dispatchMatched();
+               
             }
+            dispatchPending();
         }
     }
 
-    public synchronized void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
-        try {
-            pending.reset();
-            while (pending.hasNext()) {
-                MessageReference node = pending.next();
-                if (node.getMessageId().equals(mdn.getMessageId())) {
-                    pending.remove();
-                    createMessageDispatch(node, node.getMessage());
-                    dispatched.add(node);
-                    return;
+    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
+        synchronized(pendingLock) {
+            try {
+                pending.reset();
+                while (pending.hasNext()) {
+                    MessageReference node = pending.next();
+                    if (node.getMessageId().equals(mdn.getMessageId())) {
+                        pending.remove();
+                        createMessageDispatch(node, node.getMessage());
+                        synchronized(dispatchLock) {
+                            dispatched.add(node);
+                        }
+                        return;
+                    }
                 }
+            } finally {
+                pending.release();
             }
-        } finally {
-            pending.release();
         }
-        throw new JMSException("Slave broker out of sync with master: Dispatched message (" + mdn.getMessageId() + ") was not in the pending list");
+        throw new JMSException(
+                "Slave broker out of sync with master: Dispatched message ("
+                        + mdn.getMessageId() + ") was not in the pending list");
     }
 
-    public synchronized void acknowledge(final ConnectionContext context,
-            final MessageAck ack) throws Exception {
+    public  void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
         // Handle the standard acknowledgment case.
         boolean callDispatchMatched = false;
-        if (ack.isStandardAck()) {
-            // Acknowledge all dispatched messages up till the message id of the
-            // acknowledgment.
-            int index = 0;
-            boolean inAckRange = false;
-            List<MessageReference> removeList = new ArrayList<MessageReference>();
-            for (final MessageReference node : dispatched) {
-                MessageId messageId = node.getMessageId();
-                if (ack.getFirstMessageId() == null
-                        || ack.getFirstMessageId().equals(messageId)) {
-                    inAckRange = true;
-                }
-                if (inAckRange) {
-                    // Don't remove the nodes until we are committed.
-                    if (!context.isInTransaction()) {
-                        dequeueCounter++;
-                        node.getRegionDestination().getDestinationStatistics()
-                                .getDequeues().increment();
-                        removeList.add(node);
-                    } else {
-                        // setup a Synchronization to remove nodes from the
-                        // dispatched list.
-                        context.getTransaction().addSynchronization(
-                                new Synchronization() {
-
-                                    public void afterCommit() throws Exception {
-                                        synchronized (PrefetchSubscription.this) {
-                                            dequeueCounter++;
-                                            dispatched.remove(node);
-                                            node.getRegionDestination()
-                                                    .getDestinationStatistics()
-                                                    .getDequeues().increment();
-                                            prefetchExtension--;
+        synchronized(dispatchLock) {
+            if (ack.isStandardAck()) {
+                // Acknowledge all dispatched messages up till the message id of
+                // the
+                // acknowledgment.
+                int index = 0;
+                boolean inAckRange = false;
+                List<MessageReference> removeList = new ArrayList<MessageReference>();
+                for (final MessageReference node : dispatched) {
+                    MessageId messageId = node.getMessageId();
+                    if (ack.getFirstMessageId() == null
+                            || ack.getFirstMessageId().equals(messageId)) {
+                        inAckRange = true;
+                    }
+                    if (inAckRange) {
+                        // Don't remove the nodes until we are committed.
+                        if (!context.isInTransaction()) {
+                            dequeueCounter++;
+                            node.getRegionDestination()
+                                    .getDestinationStatistics().getDequeues()
+                                    .increment();
+                            removeList.add(node);
+                        } else {
+                            // setup a Synchronization to remove nodes from the
+                            // dispatched list.
+                            context.getTransaction().addSynchronization(
+                                    new Synchronization() {
+
+                                        public void afterCommit()
+                                                throws Exception {
+                                            synchronized(dispatchLock) {
+                                            
+                                                dequeueCounter++;
+                                                dispatched.remove(node);
+                                                node
+                                                        .getRegionDestination()
+                                                        .getDestinationStatistics()
+                                                        .getDequeues()
+                                                        .increment();
+                                                prefetchExtension--;
+                                            }
                                         }
-                                    }
 
-                                    public void afterRollback()
-                                            throws Exception {
-                                        super.afterRollback();
-                                    }
-                                });
-                    }
-                    index++;
-                    acknowledge(context, ack, node);
-                    if (ack.getLastMessageId().equals(messageId)) {
-                        if (context.isInTransaction()) {
-                            // extend prefetch window only if not a pulling
-                            // consumer
-                            if (getPrefetchSize() != 0) {
-                                prefetchExtension = Math.max(prefetchExtension,
-                                        index + 1);
+                                        public void afterRollback()
+                                                throws Exception {
+                                            super.afterRollback();
+                                        }
+                                    });
+                        }
+                        index++;
+                        acknowledge(context, ack, node);
+                        if (ack.getLastMessageId().equals(messageId)) {
+                            if (context.isInTransaction()) {
+                                // extend prefetch window only if not a pulling
+                                // consumer
+                                if (getPrefetchSize() != 0) {
+                                    prefetchExtension = Math.max(
+                                            prefetchExtension, index + 1);
+                                }
+                            } else {
+                                prefetchExtension = Math.max(0,
+                                        prefetchExtension - (index + 1));
                             }
-                        } else {
-                            prefetchExtension = Math.max(0, prefetchExtension
-                                    - (index + 1));
+                            callDispatchMatched = true;
+                            break;
                         }
-                        callDispatchMatched = true;
-                        break;
                     }
                 }
-            }
-            for (final MessageReference node : removeList) {
-                dispatched.remove(node);
-            }
-            // this only happens after a reconnect - get an ack which is not
-            // valid
-            if (!callDispatchMatched) {
-                if (LOG.isDebugEnabled()) {
-                    LOG
-                            .debug("Could not correlate acknowledgment with dispatched message: "
-                                    + ack);
+                for (final MessageReference node : removeList) {
+                    dispatched.remove(node);
                 }
-            }
-        } else if (ack.isDeliveredAck()) {
-            // Message was delivered but not acknowledged: update pre-fetch
-            // counters.
-            // Acknowledge all dispatched messages up till the message id of the
-            // acknowledgment.
-            int index = 0;
-            for (Iterator<MessageReference> iter = dispatched.iterator(); iter
-                    .hasNext(); index++) {
-                final MessageReference node = iter.next();
-                if (ack.getLastMessageId().equals(node.getMessageId())) {
-                    prefetchExtension = Math.max(prefetchExtension, index + 1);
-                    callDispatchMatched = true;
-                    break;
+                // this only happens after a reconnect - get an ack which is not
+                // valid
+                if (!callDispatchMatched) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG
+                                .debug("Could not correlate acknowledgment with dispatched message: "
+                                        + ack);
+                    }
                 }
-            }
-            if (!callDispatchMatched) {
-                throw new JMSException(
-                        "Could not correlate acknowledgment with dispatched message: "
-                                + ack);
-            }
-        } else if (ack.isRedeliveredAck()) {
-            // Message was re-delivered but it was not yet considered to be a
-            // DLQ message.
-            // Acknowledge all dispatched messages up till the message id of the
-            // acknowledgment.
-            boolean inAckRange = false;
-            for (final MessageReference node : dispatched) {
-                MessageId messageId = node.getMessageId();
-                if (ack.getFirstMessageId() == null
-                        || ack.getFirstMessageId().equals(messageId)) {
-                    inAckRange = true;
-                }
-                if (inAckRange) {
-                    node.incrementRedeliveryCounter();
-                    if (ack.getLastMessageId().equals(messageId)) {
+            } else if (ack.isDeliveredAck()) {
+                // Message was delivered but not acknowledged: update pre-fetch
+                // counters.
+                // Acknowledge all dispatched messages up till the message id of
+                // the
+                // acknowledgment.
+                int index = 0;
+                for (Iterator<MessageReference> iter = dispatched.iterator(); iter
+                        .hasNext(); index++) {
+                    final MessageReference node = iter.next();
+                    if (ack.getLastMessageId().equals(node.getMessageId())) {
+                        prefetchExtension = Math.max(prefetchExtension,
+                                index + 1);
                         callDispatchMatched = true;
                         break;
                     }
                 }
-            }
-            if (!callDispatchMatched) {
-                throw new JMSException(
-                        "Could not correlate acknowledgment with dispatched message: "
-                                + ack);
-            }
-        } else if (ack.isPoisonAck()) {
-            // TODO: what if the message is already in a DLQ???
-            // Handle the poison ACK case: we need to send the message to a DLQ
-            if (ack.isInTransaction()) {
-                throw new JMSException("Poison ack cannot be transacted: "
-                        + ack);
-            }
-            // Acknowledge all dispatched messages up till the message id of the
-            // acknowledgment.
-            int index = 0;
-            boolean inAckRange = false;
-            List<MessageReference> removeList = new ArrayList<MessageReference>();
-            for (final MessageReference node : dispatched) {
-                MessageId messageId = node.getMessageId();
-                if (ack.getFirstMessageId() == null
-                        || ack.getFirstMessageId().equals(messageId)) {
-                    inAckRange = true;
-                }
-                if (inAckRange) {
-                    sendToDLQ(context, node);
-                    node.getRegionDestination().getDestinationStatistics()
-                            .getDequeues().increment();
-                    removeList.add(node);
-                    dequeueCounter++;
-                    index++;
-                    acknowledge(context, ack, node);
-                    if (ack.getLastMessageId().equals(messageId)) {
-                        prefetchExtension = Math.max(0, prefetchExtension
-                                - (index + 1));
-                        callDispatchMatched = true;
-                        break;
+                if (!callDispatchMatched) {
+                    throw new JMSException(
+                            "Could not correlate acknowledgment with dispatched message: "
+                                    + ack);
+                }
+            } else if (ack.isRedeliveredAck()) {
+                // Message was re-delivered but it was not yet considered to be
+                // a
+                // DLQ message.
+                // Acknowledge all dispatched messages up till the message id of
+                // the
+                // acknowledgment.
+                boolean inAckRange = false;
+                for (final MessageReference node : dispatched) {
+                    MessageId messageId = node.getMessageId();
+                    if (ack.getFirstMessageId() == null
+                            || ack.getFirstMessageId().equals(messageId)) {
+                        inAckRange = true;
+                    }
+                    if (inAckRange) {
+                        node.incrementRedeliveryCounter();
+                        if (ack.getLastMessageId().equals(messageId)) {
+                            callDispatchMatched = true;
+                            break;
+                        }
                     }
                 }
-            }
-            for (final MessageReference node : removeList) {
-                dispatched.remove(node);
-            }
-            if (!callDispatchMatched) {
-                throw new JMSException(
-                        "Could not correlate acknowledgment with dispatched message: "
-                                + ack);
+                if (!callDispatchMatched) {
+                    throw new JMSException(
+                            "Could not correlate acknowledgment with dispatched message: "
+                                    + ack);
+                }
+            } else if (ack.isPoisonAck()) {
+                // TODO: what if the message is already in a DLQ???
+                // Handle the poison ACK case: we need to send the message to a
+                // DLQ
+                if (ack.isInTransaction()) {
+                    throw new JMSException("Poison ack cannot be transacted: "
+                            + ack);
+                }
+                // Acknowledge all dispatched messages up till the message id of
+                // the
+                // acknowledgment.
+                int index = 0;
+                boolean inAckRange = false;
+                List<MessageReference> removeList = new ArrayList<MessageReference>();
+                for (final MessageReference node : dispatched) {
+                    MessageId messageId = node.getMessageId();
+                    if (ack.getFirstMessageId() == null
+                            || ack.getFirstMessageId().equals(messageId)) {
+                        inAckRange = true;
+                    }
+                    if (inAckRange) {
+                        sendToDLQ(context, node);
+                        node.getRegionDestination().getDestinationStatistics()
+                                .getDequeues().increment();
+                        removeList.add(node);
+                        dequeueCounter++;
+                        index++;
+                        acknowledge(context, ack, node);
+                        if (ack.getLastMessageId().equals(messageId)) {
+                            prefetchExtension = Math.max(0, prefetchExtension
+                                    - (index + 1));
+                            callDispatchMatched = true;
+                            break;
+                        }
+                    }
+                }
+                for (final MessageReference node : removeList) {
+                    dispatched.remove(node);
+                }
+                if (!callDispatchMatched) {
+                    throw new JMSException(
+                            "Could not correlate acknowledgment with dispatched message: "
+                                    + ack);
+                }
             }
         }
         if (callDispatchMatched) {
-            dispatchMatched();
+            dispatchPending();
         } else {
             if (isSlave()) {
                 throw new JMSException(
@@ -356,45 +380,45 @@
      * 
      * @return
      */
-    protected synchronized boolean isFull() {
+    protected boolean isFull() {
         return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize();
     }
 
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
-    public synchronized boolean isLowWaterMark() {
+    public boolean isLowWaterMark() {
         return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4);
     }
 
     /**
      * @return true when 10% or less room is left for dispatching messages
      */
-    public synchronized boolean isHighWaterMark() {
+    public boolean isHighWaterMark() {
         return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
     }
 
-    public synchronized int countBeforeFull() {
+    public int countBeforeFull() {
         return info.getPrefetchSize() + prefetchExtension - dispatched.size();
     }
 
-    public synchronized int getPendingQueueSize() {
+    public int getPendingQueueSize() {
         return pending.size();
     }
 
-    public synchronized int getDispatchedQueueSize() {
+    public int getDispatchedQueueSize() {
         return dispatched.size();
     }
 
-    public synchronized long getDequeueCounter() {
+    public long getDequeueCounter() {
         return dequeueCounter;
     }
 
-    public synchronized long getDispatchedCounter() {
+    public long getDispatchedCounter() {
         return dispatchCounter;
     }
 
-    public synchronized long getEnqueueCounter() {
+    public long getEnqueueCounter() {
         return enqueueCounter;
     }
 
@@ -402,11 +426,11 @@
         return pending.isRecoveryRequired();
     }
 
-    public synchronized PendingMessageCursor getPending() {
+    public PendingMessageCursor getPending() {
         return this.pending;
     }
 
-    public synchronized void setPending(PendingMessageCursor pending) {
+    public void setPending(PendingMessageCursor pending) {
         this.pending = pending;
         if (this.pending!=null) {
             this.pending.setSystemUsage(usageManager);
@@ -430,51 +454,60 @@
          */
     }
 
-    public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
-        super.add(context, destination);
-        pending.add(context, destination);
+    public void add(ConnectionContext context, Destination destination) throws Exception {
+        synchronized(pendingLock) {
+            super.add(context, destination);
+            pending.add(context, destination);
+        }
     }
 
-    public synchronized void remove(ConnectionContext context, Destination destination) throws Exception {
-        super.remove(context, destination);
-        pending.remove(context, destination);
+    public void remove(ConnectionContext context, Destination destination) throws Exception {
+        synchronized(pendingLock) {
+            super.remove(context, destination);
+            pending.remove(context, destination);
+        }
     }
 
-    protected synchronized void dispatchMatched() throws IOException {
+    protected void dispatchPending() throws IOException {
         if (!isSlave()) {
-            try {
-                int numberToDispatch = countBeforeFull();
-                if (numberToDispatch > 0) {
-                    pending.setMaxBatchSize(numberToDispatch);
-                    int count = 0;
-                    pending.reset();
-                    while (pending.hasNext() && !isFull() && count < numberToDispatch) {
-                        MessageReference node = pending.next();
-                        if (node == null) {
-                            break;
-                        }
-                        if (canDispatch(node)) {
-                            pending.remove();
-                            // Message may have been sitting in the pending list
-                            // a while
-                            // waiting for the consumer to ak the message.
-                            if (node != QueueMessageReference.NULL_MESSAGE && broker.isExpired(node)) {
-                                broker.messageExpired(getContext(), node);
-                                dequeueCounter++;
-                                continue;
+           synchronized(pendingLock) {
+                try {
+                    int numberToDispatch = countBeforeFull();
+                    if (numberToDispatch > 0) {
+                        pending.setMaxBatchSize(numberToDispatch);
+                        int count = 0;
+                        pending.reset();
+                        while (pending.hasNext() && !isFull()
+                                && count < numberToDispatch) {
+                            MessageReference node = pending.next();
+                            if (node == null) {
+                                break;
+                            }
+                            if (canDispatch(node)) {
+                                pending.remove();
+                                // Message may have been sitting in the pending
+                                // list
+                                // a while
+                                // waiting for the consumer to ak the message.
+                                if (node != QueueMessageReference.NULL_MESSAGE
+                                        && broker.isExpired(node)) {
+                                    broker.messageExpired(getContext(), node);
+                                    dequeueCounter++;
+                                    continue;
+                                }
+                                dispatch(node);
+                                count++;
                             }
-                            dispatch(node);
-                            count++;
                         }
                     }
+                } finally {
+                    pending.release();
                 }
-            } finally {
-                pending.release();
             }
         }
     }
 
-    protected synchronized boolean dispatch(final MessageReference node) throws IOException {
+    protected boolean dispatch(final MessageReference node) throws IOException {
         final Message message = node.getMessage();
         if (message == null) {
             return false;
@@ -488,7 +521,9 @@
                 dispatchCounter++;
                 dispatched.add(node);
                 if(pending != null) {
-                    pending.dispatched(message);
+                   synchronized(pendingLock) {
+                        pending.dispatched(message);
+                    }
                 }
             } else {
                 prefetchExtension = Math.max(0, prefetchExtension - 1);
@@ -523,7 +558,7 @@
         }
         if (info.isDispatchAsync()) {
             try {
-                dispatchMatched();
+                dispatchPending();
             } catch (IOException e) {
                 context.getConnection().serviceExceptionAsync(e);
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Dec 27 03:06:50 2007
@@ -94,8 +94,8 @@
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
     private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
-    private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
     private final Object exclusiveLockMutex = new Object();
+    private final Object sendLock = new Object();
     private final TaskRunner taskRunner;
     
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
@@ -204,149 +204,144 @@
         return true;
     }
 
-    public synchronized  void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
+    public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
         sub.add(context, this);
         destinationStatistics.getConsumers().increment();
         maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
 
-        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
-        try {
-        	
-        	//needs to be synchronized - so no contention with dispatching
-            synchronized (consumers) {
-                consumers.add(sub);
-                if (sub.getConsumerInfo().isExclusive()) {
-                    LockOwner owner = (LockOwner)sub;
-                    if (exclusiveOwner == null) {
+        MessageEvaluationContext msgContext = new MessageEvaluationContext();
+
+        // needs to be synchronized - so no contention with dispatching
+        synchronized (consumers) {
+            consumers.add(sub);
+            if (sub.getConsumerInfo().isExclusive()) {
+                LockOwner owner = (LockOwner) sub;
+                if (exclusiveOwner == null) {
+                    exclusiveOwner = owner;
+                } else {
+                    // switch the owner if the priority is higher.
+                    if (owner.getLockPriority() > exclusiveOwner
+                            .getLockPriority()) {
                         exclusiveOwner = owner;
-                    } else {
-                        // switch the owner if the priority is higher.
-                        if (owner.getLockPriority() > exclusiveOwner.getLockPriority()) {
-                            exclusiveOwner = owner;
-                        }
                     }
                 }
             }
-            
-            //we hold the lock on the dispatchValue - so lets build the paged in
-            //list directly;
-            buildList(false);
-           
-            // synchronize with dispatch method so that no new messages are sent
-            // while
-            // setting up a subscription. avoid out of order messages,
-            // duplicates
-            // etc.  
-            
-         
-            
-                msgContext.setDestination(destination);
-                synchronized (pagedInMessages) {
-                    // Add all the matching messages in the queue to the
-                    // subscription.
-                    for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
-                        QueueMessageReference node = (QueueMessageReference)i.next();
-                        if (node.isDropped() ||  (!sub.getConsumerInfo().isBrowser() && node.getLockOwner()!=null)) {
-                            continue;
-                        }
-                        try {
-                            msgContext.setMessageReference(node);
-                            if (sub.matches(node, msgContext)) {
-                                sub.add(node);
-                            }
-                        } catch (IOException e) {
-                            log.warn("Could not load message: " + e, e);
-                        }
+        }
+
+        // we hold the lock on the dispatchValue - so lets build the paged in
+        // list directly;
+        buildList(false);
+
+        // synchronize with dispatch method so that no new messages are sent
+        // while
+        // setting up a subscription. avoid out of order messages,
+        // duplicates
+        // etc.
+
+        msgContext.setDestination(destination);
+        synchronized (pagedInMessages) {
+            // Add all the matching messages in the queue to the
+            // subscription.
+            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i
+                    .hasNext();) {
+                QueueMessageReference node = (QueueMessageReference) i.next();
+                if (node.isDropped()
+                        || (!sub.getConsumerInfo().isBrowser() && node
+                                .getLockOwner() != null)) {
+                    continue;
+                }
+                try {
+                    msgContext.setMessageReference(node);
+                    if (sub.matches(node, msgContext)) {
+                        sub.add(node);
                     }
+                } catch (IOException e) {
+                    log.warn("Could not load message: " + e, e);
                 }
-          
-            
-            
-        } finally {
-            msgContext.clear();
-        }
-    }
-
-    public synchronized void removeSubscription(ConnectionContext context,
-	        Subscription sub) throws Exception{
-		destinationStatistics.getConsumers().decrement();
-		maximumPagedInMessages-=sub.getConsumerInfo().getPrefetchSize();
-		// synchronize with dispatch method so that no new messages are sent
-		// while
-		// removing up a subscription.
-		synchronized(consumers){
-			consumers.remove(sub);
-			if(sub.getConsumerInfo().isExclusive()){
-				LockOwner owner=(LockOwner)sub;
-				// Did we loose the exclusive owner??
-				if(exclusiveOwner==owner){
-					// Find the exclusive consumer with the higest Lock
-					// Priority.
-					exclusiveOwner=null;
-					for(Iterator<Subscription> iter=consumers.iterator();iter
-					        .hasNext();){
-						Subscription s=iter.next();
-						LockOwner so=(LockOwner)s;
-						if(s.getConsumerInfo().isExclusive()
-						        &&(exclusiveOwner==null||so.getLockPriority()>exclusiveOwner
-						                .getLockPriority())){
-							exclusiveOwner=so;
-						}
-					}
-				}
-			}
-			if(consumers.isEmpty()){
-				messages.gc();
-			}
-		}
-		sub.remove(context,this);
-		boolean wasExclusiveOwner=false;
-		if(exclusiveOwner==sub){
-			exclusiveOwner=null;
-			wasExclusiveOwner=true;
-		}
-		ConsumerId consumerId=sub.getConsumerInfo().getConsumerId();
-		MessageGroupSet ownedGroups=getMessageGroupOwners().removeConsumer(
-		        consumerId);
-		if(!sub.getConsumerInfo().isBrowser()){
-			MessageEvaluationContext msgContext=context
-			        .getMessageEvaluationContext();
-			try{
-				msgContext.setDestination(destination);
-				// lets copy the messages to dispatch to avoid deadlock
-				List<QueueMessageReference> messagesToDispatch=new ArrayList<QueueMessageReference>();
-				synchronized(pagedInMessages){
-					for(Iterator<MessageReference> i=pagedInMessages.iterator();i
-					        .hasNext();){
-						QueueMessageReference node=(QueueMessageReference)i
-						        .next();
-						if(node.isDropped()){
-							continue;
-						}
-						String groupID=node.getGroupID();
-						// Re-deliver all messages that the sub locked
-						if(node.getLockOwner()==sub
-						        ||wasExclusiveOwner
-						        ||(groupID!=null&&ownedGroups.contains(groupID))){
-							messagesToDispatch.add(node);
-						}
-					}
-				}
-				// now lets dispatch from the copy of the collection to
-				// avoid deadlocks
-				for(Iterator<QueueMessageReference> iter=messagesToDispatch
-				        .iterator();iter.hasNext();){
-					QueueMessageReference node=iter.next();
-					node.incrementRedeliveryCounter();
-					node.unlock();
-					msgContext.setMessageReference(node);
-					dispatchPolicy.dispatch(node,msgContext,consumers);
-				}
-			}finally{
-				msgContext.clear();
-			}
-		}
-	}
+            }
+        }
+
+    }
+
+    public void removeSubscription(ConnectionContext context, Subscription sub)
+            throws Exception {
+        destinationStatistics.getConsumers().decrement();
+        maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
+        // synchronize with dispatch method so that no new messages are sent
+        // while
+        // removing up a subscription.
+        synchronized (consumers) {
+            consumers.remove(sub);
+            if (sub.getConsumerInfo().isExclusive()) {
+                LockOwner owner = (LockOwner) sub;
+                // Did we loose the exclusive owner??
+                if (exclusiveOwner == owner) {
+                    // Find the exclusive consumer with the higest Lock
+                    // Priority.
+                    exclusiveOwner = null;
+                    for (Iterator<Subscription> iter = consumers.iterator(); iter
+                            .hasNext();) {
+                        Subscription s = iter.next();
+                        LockOwner so = (LockOwner) s;
+                        if (s.getConsumerInfo().isExclusive()
+                                && (exclusiveOwner == null || so
+                                        .getLockPriority() > exclusiveOwner
+                                        .getLockPriority())) {
+                            exclusiveOwner = so;
+                        }
+                    }
+                }
+            }
+            if (consumers.isEmpty()) {
+                messages.gc();
+            }
+        }
+        sub.remove(context, this);
+        boolean wasExclusiveOwner = false;
+        if (exclusiveOwner == sub) {
+            exclusiveOwner = null;
+            wasExclusiveOwner = true;
+        }
+        ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
+        MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(
+                consumerId);
+        if (!sub.getConsumerInfo().isBrowser()) {
+            MessageEvaluationContext msgContext = new MessageEvaluationContext();
+
+            msgContext.setDestination(destination);
+            // lets copy the messages to dispatch to avoid deadlock
+            List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
+            synchronized (pagedInMessages) {
+                for (Iterator<MessageReference> i = pagedInMessages.iterator(); i
+                        .hasNext();) {
+                    QueueMessageReference node = (QueueMessageReference) i
+                            .next();
+                    if (node.isDropped()) {
+                        continue;
+                    }
+                    String groupID = node.getGroupID();
+                    // Re-deliver all messages that the sub locked
+                    if (node.getLockOwner() == sub
+                            || wasExclusiveOwner
+                            || (groupID != null && ownedGroups
+                                    .contains(groupID))) {
+                        messagesToDispatch.add(node);
+                    }
+                }
+            }
+            // now lets dispatch from the copy of the collection to
+            // avoid deadlocks
+            for (Iterator<QueueMessageReference> iter = messagesToDispatch
+                    .iterator(); iter.hasNext();) {
+                QueueMessageReference node = iter.next();
+                node.incrementRedeliveryCounter();
+                node.unlock();
+                msgContext.setMessageReference(node);
+                dispatchPolicy.dispatch(node, msgContext, consumers);
+            }
+
+        }
+    }
 
     public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
@@ -445,16 +440,21 @@
         }
     }
 
-    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
+    void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
-        message.setRegionDestination(this);
-        if (store != null && message.isPersistent()) {
-            while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
-                if (context.getStopping().get()) {
-                    throw new IOException("Connection closed, send aborted.");
+        synchronized (sendLock) {
+            message.setRegionDestination(this);
+            if (store != null && message.isPersistent()) {
+                while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
+                    if (context.getStopping().get()) {
+                        throw new IOException(
+                                "Connection closed, send aborted.");
+                    }
                 }
+
+                store.addMessage(context, message);
+
             }
-            store.addMessage(context, message);
         }
         if (context.isInTransaction()) {
             // If this is a transacted message.. increase the usage now so that
@@ -1010,57 +1010,51 @@
     	return  result;
     }
 
-    private   synchronized List<MessageReference> buildList(boolean force) throws Exception {
-
+    private List<MessageReference> buildList(boolean force) throws Exception {
         final int toPageIn = maximumPagedInMessages - pagedInMessages.size();
         List<MessageReference> result = null;
         if ((force || !consumers.isEmpty()) && toPageIn > 0) {
             messages.setMaxBatchSize(toPageIn);
-            try {
-                int count = 0;
-                result = new ArrayList<MessageReference>(toPageIn);
-                synchronized (messages) {
+            int count = 0;
+            result = new ArrayList<MessageReference>(toPageIn);
+            synchronized (messages) {
 
-                    try {
-                        messages.reset();
-                        while (messages.hasNext() && count < toPageIn) {
-                            MessageReference node = messages.next();
-                            messages.remove();
-                            if (!broker.isExpired(node)) {
-                                node = createMessageReference(node.getMessage());
-                                result.add(node);
-                                count++;
-                            } else {
-                                broker.messageExpired(createConnectionContext(), node);
-                                destinationStatistics.getMessages().decrement();
-                            }
+                try {
+                    messages.reset();
+                    while (messages.hasNext() && count < toPageIn) {
+                        MessageReference node = messages.next();
+                        messages.remove();
+                        if (!broker.isExpired(node)) {
+                            node = createMessageReference(node.getMessage());
+                            result.add(node);
+                            count++;
+                        } else {
+                            broker.messageExpired(createConnectionContext(),
+                                    node);
+                            destinationStatistics.getMessages().decrement();
                         }
-                    } finally {
-                        messages.release();
                     }
+                } finally {
+                    messages.release();
                 }
-                synchronized (pagedInMessages) {
-                    pagedInMessages.addAll(result);
-                }
-            } finally {
-                queueMsgConext.clear();
+            }
+            synchronized (pagedInMessages) {
+                pagedInMessages.addAll(result);
             }
         }
         return result;
     }
 
-    private  synchronized void doDispatch(List<MessageReference> list) throws Exception {
+    private synchronized void doDispatch(List<MessageReference> list) throws Exception {
         if (list != null && !list.isEmpty()) {
-            try {
-                for (int i = 0; i < list.size(); i++) {
-                    MessageReference node = list.get(i);
-                    queueMsgConext.setDestination(destination);
-                    queueMsgConext.setMessageReference(node);
-                    dispatchPolicy.dispatch(node, queueMsgConext, consumers);
-                }
-            } finally {
-                queueMsgConext.clear();
+            MessageEvaluationContext msgContext = new MessageEvaluationContext();
+            for (int i = 0; i < list.size(); i++) {
+                MessageReference node = list.get(i);
+                msgContext.setDestination(destination);
+                msgContext.setMessageReference(node);
+                dispatchPolicy.dispatch(node, msgContext, consumers);
             }
+
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java Thu Dec 27 03:06:50 2007
@@ -44,13 +44,6 @@
      *      org.apache.activemq.filter.MessageEvaluationContext, java.util.List)
      */
     public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers) throws Exception {
-
-        // Big synch here so that only 1 message gets dispatched at a time.
-        // Ensures
-        // Everyone sees the same order and that the consumer list is not used
-        // while
-        // it's being rotated.
-        synchronized (consumers) {
             int count = 0;
 
             Subscription firstMatchingConsumer = null;
@@ -79,7 +72,5 @@
                 }
             }
             return count > 0;
-        }
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Thu Dec 27 03:06:50 2007
@@ -25,8 +25,8 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -47,7 +47,6 @@
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.TransactionTemplate;
 import org.apache.commons.logging.Log;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java Thu Dec 27 03:06:50 2007
@@ -31,15 +31,15 @@
         answer.setDeleteAllMessagesOnStartup(true);
         AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
         adaptor.setArchiveDataLogs(true);
-        adaptor.setMaxFileLength(1024 * 64);
+        //adaptor.setMaxFileLength(1024 * 64);
         answer.setDataDirectoryFile(dataFileDir);
         answer.setPersistenceAdapter(adaptor);
         answer.addConnector(uri);
     }
     
     protected void setUp() throws Exception {
-        numberofProducers=6;
-        numberOfConsumers=6;
+        numberofProducers=2;
+        numberOfConsumers=10;
         this.consumerSleepDuration=0;
         super.setUp();
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java Thu Dec 27 03:06:50 2007
@@ -79,7 +79,7 @@
     public void onMessage(Message msg) {
         rate.increment();
         try {
-            if (!this.audit.isInOrder(msg.getJMSMessageID())) {
+            if (msg.getJMSDestination() instanceof Topic && !this.audit.isInOrder(msg.getJMSMessageID())) {
                 LOG.error("Message out of order!!" + msg);
             }
             if (this.audit.isDuplicate(msg)){

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java Thu Dec 27 03:06:50 2007
@@ -30,8 +30,8 @@
     protected void setUp() throws Exception {
         numberofProducers=6;
         numberOfConsumers=6;
-        samepleCount=100;
-        playloadSize = 1;
+        samepleCount=1000;
+        playloadSize = 1024;
         super.setUp();
     }
     

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java Thu Dec 27 03:06:50 2007
@@ -30,7 +30,9 @@
     }
     
     protected void setUp() throws Exception {
-        this.consumerSleepDuration=2000;
+        numberOfConsumers = 50;
+        numberofProducers = 50;
+        this.consumerSleepDuration=10;
         super.setUp();
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java?rev=607038&r1=607037&r2=607038&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java Thu Dec 27 03:06:50 2007
@@ -66,8 +66,10 @@
         private ActiveMQConnection connection;
         private AtomicBoolean stop = new AtomicBoolean(false);
         private Throwable error;
+        private String name;
 
-        public Worker() throws URISyntaxException, JMSException {
+        public Worker(String name) throws URISyntaxException, JMSException {
+            this.name=name;
             URI uri = new URI("failover://(mock://(" + tcpUri + "))");
             ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
             connection = (ActiveMQConnection)factory.createConnection();
@@ -115,7 +117,7 @@
 
         public void run() {
             try {
-                ActiveMQQueue queue = new ActiveMQQueue("FOO");
+                ActiveMQQueue queue = new ActiveMQQueue("FOO_"+name);
                 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                 MessageConsumer consumer = session.createConsumer(queue);
                 MessageProducer producer = session.createProducer(queue);
@@ -213,7 +215,7 @@
 
         workers = new Worker[WORKER_COUNT];
         for (int i = 0; i < WORKER_COUNT; i++) {
-            workers[i] = new Worker();
+            workers[i] = new Worker(""+i);
             workers[i].start();
         }