You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/10/23 11:37:47 UTC

svn commit: r828975 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/bugs/ test/resources/

Author: gtully
Date: Fri Oct 23 09:37:47 2009
New Revision: 828975

URL: http://svn.apache.org/viewvc?rev=828975&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2314 - spooled non persistent messages for a topic sub remain after disconnect, pendin messages are now removed so that the temp store can be reclaimed

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
    activemq/trunk/activemq-core/src/test/resources/log4j.properties

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=828975&r1=828974&r2=828975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Oct 23 09:37:47 2009
@@ -82,12 +82,9 @@
 
     public void add(MessageReference node) throws Exception {
         enqueueCounter.incrementAndGet();
-        node.incrementReferenceCount();
         if (!isFull() && matched.isEmpty()  && !isSlave()) {
-            // if maximumPendingMessages is set we will only discard messages
-            // which
-            // have not been dispatched (i.e. we allow the prefetch buffer to be
-            // filled)
+            // if maximumPendingMessages is set we will only discard messages which
+            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
             dispatch(node);
             slowConsumer=false;
         } else {
@@ -402,6 +399,7 @@
 
     private void dispatch(final MessageReference node) throws IOException {
         Message message = (Message)node;
+        node.incrementReferenceCount();
         // Make sure we can dispatch a message.
         MessageDispatch md = new MessageDispatch();
         md.setMessage(message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=828975&r1=828974&r2=828975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Fri Oct 23 09:37:47 2009
@@ -138,8 +138,17 @@
             node.decrementReferenceCount();
         }
         memoryList.clear();
+        destroyDiskList();
+    }
+
+    private void destroyDiskList() {
         if (!isDiskListEmpty()) {
-            getDiskList().clear();
+            Iterator<MessageReference> iterator = diskList.iterator();
+            while (iterator.hasNext()) {
+                iterator.next();
+                iterator.remove();
+            }
+            diskList.clear();
         }
     }
 
@@ -384,7 +393,7 @@
                 diskList = store.getListContainer(name, "TopicSubscription", true);
                 diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
             } catch (IOException e) {
-                LOG.error("Caught an IO Exception getting the DiskList ",e);
+                LOG.error("Caught an IO Exception getting the DiskList " + name, e);
                 throw new RuntimeException(e);
             }
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=828975&r1=828974&r2=828975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Fri Oct 23 09:37:47 2009
@@ -499,6 +499,9 @@
             }
         }
         assertTrue("No timeout waiting for senders/receivers to complete", System.currentTimeMillis() < expiry);
+        if (!exceptions.isEmpty()) {
+            exceptions.get(0).printStackTrace();
+        }
         assertTrue("No exceptions", exceptions.isEmpty());
     }
 

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java?rev=828975&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java Fri Oct 23 09:37:47 2009
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class AMQ2314Test extends CombinationTestSupport {
+
+    public boolean consumeAll = false;
+    public int deliveryMode = DeliveryMode.NON_PERSISTENT;
+    
+    private static final Log LOG = LogFactory.getLog(AMQ2314Test.class);
+    private static final int MESSAGES_COUNT = 30000;
+    private static byte[]  buf = new byte[1024];
+    private BrokerService broker;
+    
+    protected long messageReceiveTimeout = 500L;
+
+    Destination destination = new ActiveMQTopic("FooTwo");
+    
+    public void testRemoveSlowSubscriberWhacksTempStore() throws Exception {
+        runProducerWithHungConsumer();
+    }
+    
+    public void testMemoryUsageReleasedOnAllConsumed() throws Exception {
+        consumeAll = true;
+        runProducerWithHungConsumer();
+        // do it again to ensure memory limits are decreased
+        runProducerWithHungConsumer();
+    }
+    
+    
+    public void runProducerWithHungConsumer() throws Exception {
+    
+        final CountDownLatch consumerContinue = new CountDownLatch(1);
+        final CountDownLatch consumerReady = new CountDownLatch(1);
+        
+        final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage();
+        
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+        factory.setAlwaysSyncSend(true);
+        
+        // ensure messages are spooled to disk for this consumer
+        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+        prefetch.setTopicPrefetch(500);
+        factory.setPrefetchPolicy(prefetch);
+        final Connection connection = factory.createConnection();
+        connection.start();
+
+        Thread producingThread = new Thread("Producing thread") {
+            public void run() {
+                try {
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(deliveryMode);
+                    for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
+                        Message message = session.createTextMessage(new String(buf) + idx);
+                        producer.send(message);
+                    }
+                    producer.close();
+                    session.close();
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+        
+        Thread consumingThread = new Thread("Consuming thread") {
+            public void run() {
+                try {
+                    int count = 0;
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageConsumer consumer = session.createConsumer(destination);
+                    
+                    while (consumer.receive(messageReceiveTimeout) == null) {
+                        consumerReady.countDown();
+                    }
+                    count++;
+                    LOG.info("Received one... waiting");  
+                    consumerContinue.await();
+                    if (consumeAll) {
+                        LOG.info("Consuming the rest of the messages...");
+                        while (consumer.receive(messageReceiveTimeout) != null) {
+                            count++;
+                        }
+                    }
+                    LOG.info("consumer session closing: consumed count: " + count);
+                    session.close();
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+        consumingThread.start();
+        consumerReady.await();
+        
+        producingThread.start();
+        producingThread.join();
+        
+        final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage();
+        LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription);
+        assertTrue("some temp store has been used", tempUsageBySubscription != origTempUsage);
+        consumerContinue.countDown();
+        consumingThread.join();
+        connection.close();
+       
+        LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+                + broker.getSystemUsage().getTempUsage().getUsage());
+        
+        assertTrue("temp usage decreased with removed sub", Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception {
+                return broker.getSystemUsage().getTempUsage().getUsage()  < tempUsageBySubscription;
+            }
+        }));
+    }
+    
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        broker.addConnector("tcp://localhost:61616").setName("Default");
+        broker.start();
+    }
+    
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+    
+    
+    public static Test suite() {
+        return suite(AMQ2314Test.class);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=828975&r1=828974&r2=828975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Fri Oct 23 09:37:47 2009
@@ -18,9 +18,9 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=DEBUG, out, stdout
+log4j.rootLogger=INFO, out, stdout
 
-log4j.logger.org.apache.activemq=DEBUG
+#log4j.logger.org.apache.activemq=DEBUG
 
 # CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender