You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/15 16:21:23 UTC

svn commit: r1503263 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-unit-tests/src/test/java/org/apache/activemq/bugs/

Author: tabish
Date: Mon Jul 15 14:21:23 2013
New Revision: 1503263

URL: http://svn.apache.org/r1503263
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4595

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java   (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1503263&r1=1503262&r2=1503263&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Jul 15 14:21:23 2013
@@ -108,8 +108,7 @@ public class Queue extends BaseDestinati
     protected PendingMessageCursor messages;
     private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
     private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
-    // Messages that are paged in but have not yet been targeted at a
-    // subscription
+    // Messages that are paged in but have not yet been targeted at a subscription
     private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
     protected PendingList pagedInPendingDispatch = new OrderedPendingList();
     protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
@@ -177,7 +176,6 @@ public class Queue extends BaseDestinati
             }
             return returnValue;
         }
-
     }
 
     DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
@@ -246,7 +244,7 @@ public class Queue extends BaseDestinati
         consumersLock.readLock().lock();
         try {
             return new ArrayList<Subscription>(consumers);
-        }finally {
+        } finally {
             consumersLock.readLock().unlock();
         }
     }
@@ -447,10 +445,9 @@ public class Queue extends BaseDestinati
             sub.add(context, this);
 
             // needs to be synchronized - so no contention with dispatching
-           // consumersLock.
+            // consumersLock.
             consumersLock.writeLock().lock();
             try {
-
                 // set a flag if this is a first consumer
                 if (consumers.size() == 0) {
                     firstConsumer = true;
@@ -474,7 +471,7 @@ public class Queue extends BaseDestinati
                     }
                     dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                 }
-            }finally {
+            } finally {
                 consumersLock.writeLock().unlock();
             }
 
@@ -488,7 +485,7 @@ public class Queue extends BaseDestinati
             if (!this.optimizedDispatch) {
                 wakeup();
             }
-        }finally {
+        } finally {
             pagedInPendingDispatchLock.writeLock().unlock();
         }
         if (this.optimizedDispatch) {
@@ -593,13 +590,13 @@ public class Queue extends BaseDestinati
                 if (!redeliveredWaitingDispatch.isEmpty()) {
                     doDispatch(new OrderedPendingList());
                 }
-            }finally {
+            } finally {
                 consumersLock.writeLock().unlock();
             }
             if (!this.optimizedDispatch) {
                 wakeup();
             }
-        }finally {
+        } finally {
             pagedInPendingDispatchLock.writeLock().unlock();
         }
         if (this.optimizedDispatch) {
@@ -639,13 +636,12 @@ public class Queue extends BaseDestinati
             if (isProducerFlowControl() && context.isProducerFlowControl()) {
                 if (warnOnProducerFlowControl) {
                     warnOnProducerFlowControl = false;
-                    LOG
-                            .info("Usage Manager Memory Limit ("
-                                    + memoryUsage.getLimit()
-                                    + ") reached on "
-                                    + getActiveMQDestination().getQualifiedName()
-                                    + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
-                                    + " See http://activemq.apache.org/producer-flow-control.html for more info");
+                    LOG.info("Usage Manager Memory Limit ("
+                             + memoryUsage.getLimit()
+                             + ") reached on "
+                             + getActiveMQDestination().getQualifiedName()
+                             + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+                             + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
 
                 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
@@ -979,9 +975,9 @@ public class Queue extends BaseDestinati
     public String toString() {
         int size = 0;
         messagesLock.readLock().lock();
-        try{
+        try {
             size = messages.size();
-        }finally {
+        } finally {
             messagesLock.readLock().unlock();
         }
         return destination.getQualifiedName() + ", subscriptions=" + consumers.size()
@@ -1120,7 +1116,6 @@ public class Queue extends BaseDestinati
         return allConsumersExclusiveByDefault;
     }
 
-
     // Implementation methods
     // -------------------------------------------------------------------------
     private QueueMessageReference createMessageReference(Message message) {
@@ -1209,7 +1204,6 @@ public class Queue extends BaseDestinati
                     messagesLock.writeLock().unlock();
                 }
             }
-
         } catch (Exception e) {
             LOG.error("Problem retrieving message for browse", e);
         }
@@ -1230,12 +1224,12 @@ public class Queue extends BaseDestinati
     public QueueMessageReference getMessage(String id) {
         MessageId msgId = new MessageId(id);
         pagedInMessagesLock.readLock().lock();
-        try{
+        try {
             QueueMessageReference ref = this.pagedInMessages.get(msgId);
             if (ref != null) {
                 return ref;
             }
-        }finally {
+        } finally {
             pagedInMessagesLock.readLock().unlock();
         }
         messagesLock.readLock().lock();
@@ -1282,6 +1276,7 @@ public class Queue extends BaseDestinati
             // don't spin/hang if stats are out and there is nothing left in the
             // store
         } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
+
         if (this.destinationStatistics.getMessages().getCount() > 0) {
             LOG.warn(getActiveMQDestination().getQualifiedName()
                     + " after purge complete, message count stats report: "
@@ -1346,9 +1341,9 @@ public class Queue extends BaseDestinati
         do {
             doPageIn(true);
             pagedInMessagesLock.readLock().lock();
-            try{
+            try {
                 set.addAll(pagedInMessages.values());
-            }finally {
+            } finally {
                 pagedInMessagesLock.readLock().unlock();
             }
             List<MessageReference> list = new ArrayList<MessageReference>(set);
@@ -1415,7 +1410,7 @@ public class Queue extends BaseDestinati
             pagedInMessagesLock.readLock().lock();
             try {
                 set.addAll(pagedInMessages.values());
-            }finally {
+            } finally {
                 pagedInMessagesLock.readLock().unlock();
             }
             List<MessageReference> list = new ArrayList<MessageReference>(set);
@@ -1505,9 +1500,9 @@ public class Queue extends BaseDestinati
         do {
             doPageIn(true);
             pagedInMessagesLock.readLock().lock();
-            try{
+            try {
                 set.addAll(pagedInMessages.values());
-            }finally {
+            } finally {
                 pagedInMessagesLock.readLock().unlock();
             }
             List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
@@ -1534,9 +1529,9 @@ public class Queue extends BaseDestinati
         do {
             doPageIn(true);
             pagedInMessagesLock.readLock().lock();
-            try{
+            try {
                 set.addAll(pagedInMessages.values());
-            }finally {
+            } finally {
                 pagedInMessagesLock.readLock().unlock();
             }
             List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
@@ -1615,7 +1610,6 @@ public class Queue extends BaseDestinati
                 }
             }
 
-
             messagesLock.readLock().lock();
             try{
                 pageInMoreMessages |= !messages.isEmpty();
@@ -1640,7 +1634,6 @@ public class Queue extends BaseDestinati
             if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) {
                 try {
                     pageInMessages(hasBrowsers);
-
                 } catch (Throwable e) {
                     LOG.error("Failed to page in more queue messages ", e);
                 }
@@ -1670,7 +1663,7 @@ public class Queue extends BaseDestinati
                         }
                         boolean added = false;
                         for (QueueMessageReference node : alreadyDispatchedMessages) {
-                            if (!node.isAcked() && !browser.getPending().getMessageAudit().isDuplicate(node.getMessageId())) {
+                            if (!node.isAcked() && !browser.isDuplicate(node.getMessageId())) {
                                 msgContext.setMessageReference(node);
                                 if (browser.matches(node, msgContext)) {
                                     browser.add(node);
@@ -1750,7 +1743,6 @@ public class Queue extends BaseDestinati
         } finally {
             pagedInPendingDispatchLock.writeLock().unlock();
         }
-
     }
 
     protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
@@ -1793,9 +1785,9 @@ public class Queue extends BaseDestinati
         if (ack.isPoisonAck() || (sub != null && sub.getConsumerInfo().isNetworkSubscription())) {
             // message gone to DLQ, is ok to allow redelivery
             messagesLock.writeLock().lock();
-            try{
+            try {
                 messages.rollback(reference.getMessageId());
-            }finally {
+            } finally {
                 messagesLock.writeLock().unlock();
             }
         }
@@ -1841,9 +1833,9 @@ public class Queue extends BaseDestinati
 
     final void sendMessage(final Message msg) throws Exception {
         messagesLock.writeLock().lock();
-        try{
+        try {
             messages.addMessageLast(msg);
-        }finally {
+        } finally {
             messagesLock.writeLock().unlock();
         }
     }
@@ -2044,7 +2036,7 @@ public class Queue extends BaseDestinati
                 return list;
             }
             consumers = new ArrayList<Subscription>(this.consumers);
-        }finally {
+        } finally {
             consumersLock.writeLock().unlock();
         }
 
@@ -2104,7 +2096,7 @@ public class Queue extends BaseDestinati
                         addToConsumerList(target);
                         consumers = new ArrayList<Subscription>(this.consumers);
                     }
-                }finally {
+                } finally {
                     consumersLock.writeLock().unlock();
                 }
             }
@@ -2149,7 +2141,6 @@ public class Queue extends BaseDestinati
         }
 
         return result;
-
     }
 
     protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
@@ -2187,13 +2178,13 @@ public class Queue extends BaseDestinati
         int total = 0;
         boolean zeroPrefetch = false;
         consumersLock.readLock().lock();
-        try{
+        try {
             for (Subscription s : consumers) {
                 zeroPrefetch |= s.getPrefetchSize() == 0;
                 int countBeforeFull = s.countBeforeFull();
                 total += countBeforeFull;
             }
-        }finally {
+        } finally {
             consumersLock.readLock().unlock();
         }
         if (total == 0 && zeroPrefetch) {
@@ -2306,7 +2297,7 @@ public class Queue extends BaseDestinati
                     break;
                 }
             }
-        }finally {
+        } finally {
             consumersLock.readLock().unlock();
         }
         return sub;
@@ -2346,7 +2337,7 @@ public class Queue extends BaseDestinati
                         }
                     }
                 }
-            }finally {
+            } finally {
                 consumersLock.readLock().unlock();
             }
         }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=1503263&r1=1503262&r2=1503263&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Mon Jul 15 14:21:23 2013
@@ -18,7 +18,9 @@ package org.apache.activemq.broker.regio
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import javax.jms.JMSException;
 
@@ -26,15 +28,22 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.usage.SystemUsage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class QueueBrowserSubscription extends QueueSubscription {
 
+    protected static final Logger LOG = LoggerFactory.getLogger(QueueBrowserSubscription.class);
+
     int queueRefs;
     boolean browseDone;
     boolean destinationsAdded;
 
+    private final Map<MessageId, Object> audit = new HashMap<MessageId, Object>();
+
     public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
         super(broker, usageManager, context, info);
     }
@@ -56,6 +65,16 @@ public class QueueBrowserSubscription ex
         checkDone();
     }
 
+    public boolean isDuplicate(MessageId messageId) {
+
+        if (!audit.containsKey(messageId)) {
+            audit.put(messageId, Boolean.TRUE);
+            return false;
+        }
+
+        return true;
+    }
+
     private void checkDone() throws Exception {
         if (!browseDone && queueRefs == 0 && destinationsAdded) {
             browseDone = true;

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java?rev=1503263&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java Mon Jul 15 14:21:23 2013
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.Date;
+import java.util.Enumeration;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4595Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4595Test.class);
+
+    private BrokerService broker;
+    private URI connectUri;
+    private ActiveMQConnectionFactory factory;
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        TransportConnector connector = broker.addConnector("vm://localhost");
+        broker.deleteAllMessages();
+
+//        PolicyEntry policy = new PolicyEntry();
+//        policy.setQueue(">");
+//        policy.setMaxAuditDepth(16384);
+//        policy.setCursorMemoryHighWaterMark(95);  // More breathing room.
+//        PolicyMap pMap = new PolicyMap();
+//        pMap.setDefaultEntry(policy);
+//        broker.setDestinationPolicy(pMap);
+
+        broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
+        broker.start();
+        broker.waitUntilStarted();
+        connectUri = connector.getConnectUri();
+        factory = new ActiveMQConnectionFactory(connectUri);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test(timeout=120000)
+    public void testBrowsingSmallBatch() throws JMSException {
+        doTestBrowsing(100);
+    }
+
+    @Test(timeout=160000)
+    public void testBrowsingMediumBatch() throws JMSException {
+        doTestBrowsing(1000);
+    }
+
+    @Test(timeout=300000)
+    public void testBrowsingLargeBatch() throws JMSException {
+        doTestBrowsing(10000);
+    }
+
+    private void doTestBrowsing(int messageToSend) throws JMSException {
+        ActiveMQQueue queue = new ActiveMQQueue("TEST");
+
+        // Send the messages to the Queue.
+        ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
+        producerConnection.setUseAsyncSend(true);
+        producerConnection.start();
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        for (int i = 1; i <= messageToSend; i++) {
+            String msgStr = provideMessageText(i, 8192);
+            producer.send(producerSession.createTextMessage(msgStr));
+            if ((i % 1000) == 0) {
+                LOG.info("P&C: {}", msgStr.substring(0, 100));
+            }
+        }
+        producerConnection.close();
+
+        // Browse the queue.
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+        QueueBrowser browser = session.createBrowser(queue);
+        Enumeration<?> enumeration = browser.getEnumeration();
+        int browsed = 0;
+        while (enumeration.hasMoreElements()) {
+            TextMessage m = (TextMessage) enumeration.nextElement();
+            browsed++;
+            if ((browsed % 1000) == 0) {
+                LOG.info("B[{}]: {}", browsed, m.getText().substring(0, 100));
+            }
+        }
+        browser.close();
+        session.close();
+        connection.close();
+
+        // The number of messages browsed should be equal to the number of messages sent.
+        assertEquals(messageToSend, browsed);
+
+        browser.close();
+    }
+
+    public String provideMessageText(int messageNumber, int messageSize) {
+        StringBuilder buf = new StringBuilder();
+        buf.append("Message: ");
+        if (messageNumber > 0) {
+            buf.append(messageNumber);
+        }
+        buf.append(" sent at: ").append(new Date());
+
+        if (buf.length() > messageSize) {
+            return buf.substring(0, messageSize);
+        }
+        for (int i = buf.length(); i < messageSize; i++) {
+            buf.append(' ');
+        }
+        return buf.toString();
+    }
+
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
------------------------------------------------------------------------------
    svn:eol-style = native