You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/10/07 13:32:06 UTC

activemq git commit: AMQ-6459 - store based % usage needs to poll the store for the current value on each access

Repository: activemq
Updated Branches:
  refs/heads/master e91f5c806 -> 8493f6b4d


AMQ-6459 - store based % usage needs to poll the store for the current value on each access


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8493f6b4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8493f6b4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8493f6b4

Branch: refs/heads/master
Commit: 8493f6b4d79146509e554e2b640f486d79f40bbb
Parents: e91f5c8
Author: gtully <ga...@gmail.com>
Authored: Fri Oct 7 14:31:52 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Oct 7 14:31:52 2016 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/usage/TempUsage.java    |  13 ++
 .../org/apache/activemq/bugs/AMQ6459Test.java   | 213 +++++++++++++++++++
 2 files changed, 226 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8493f6b4/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java b/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java
index f068dbe..885afbe 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java
@@ -51,6 +51,19 @@ public class TempUsage extends PercentLimitUsage<TempUsage> {
     }
 
     @Override
+    public int getPercentUsage() {
+        if (store != null) {
+            usageLock.writeLock().lock();
+            try {
+                percentUsage = caclPercentUsage();
+            } finally {
+                usageLock.writeLock().unlock();
+            }
+        }
+        return super.getPercentUsage();
+    }
+
+    @Override
     protected long retrieveUsage() {
         if (store == null) {
             return 0;

http://git-wip-us.apache.org/repos/asf/activemq/blob/8493f6b4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6459Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6459Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6459Test.java
new file mode 100644
index 0000000..1734d6a
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6459Test.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Ensure the tempPercentUsage JMX attribute decreases after temp store usage is decreased
+ *
+ */
+public class AMQ6459Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ6459Test.class);
+
+    private static final String DESTINATION = "testQ1";
+    private static final int MESSAGES_TO_SEND = 4000;
+
+    private String TRANSPORT_URL = "tcp://0.0.0.0:0";
+
+    BrokerService broker;
+
+    @Before
+    public void createBroker() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setUseJmx(true);
+
+        PolicyMap policyMap = new PolicyMap();
+        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
+        PolicyEntry pe = new PolicyEntry();
+        pe.setTopicPrefetch(50);
+        pe.setTopic(">");
+        entries.add(pe);
+        policyMap.setPolicyEntries(entries);
+        broker.setDestinationPolicy(policyMap);
+
+        SystemUsage systemUsage = broker.getSystemUsage();
+        systemUsage.getTempUsage().setLimit(50 * 1024 * 1024);
+
+
+        systemUsage.getMemoryUsage().setLimit(800 * 1024);
+
+        PListStoreImpl pListStore = (PListStoreImpl) broker.getTempDataStore();
+        pListStore.setJournalMaxFileLength(24 * 1024);
+        pListStore.setCleanupInterval(2000);
+
+        broker.addConnector(TRANSPORT_URL);
+
+        broker.start();
+        broker.waitUntilStarted();
+        TRANSPORT_URL = broker.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+    }
+
+    @Test
+    public void testTempPercentUsageDecreases() throws Exception {
+
+        //create a topic subscriber, but do not consume messages
+        MessageConsumer messageConsumer = createConsumer();
+
+
+        //send X messages with with a sequence number number in the message property.
+        sendMessages(MESSAGES_TO_SEND);
+
+
+        final BrokerViewMBean brokerView = getBrokerView(broker);
+
+        LOG.info("tempPercentageUsage is " + brokerView.getTempPercentUsage());
+
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("tempPercentageUsage now " + brokerView.getTempPercentUsage());
+                return brokerView.getTempPercentUsage() > 50;
+            }
+        });
+
+        final int tempPercentUsageWithConsumer = brokerView.getTempPercentUsage();
+
+        //ensure the tempPercentageUsage is at a high number
+        assertTrue(" tempPercentageUsage ", (50 < tempPercentUsageWithConsumer));
+
+        //close the consumer, releasing the temp storage
+        messageConsumer.close();
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("tempPercentageUsage now (after consumer closed) " + brokerView.getTempPercentUsage());
+                return tempPercentUsageWithConsumer > brokerView.getTempPercentUsage();
+            }
+        });
+
+        assertTrue("tempPercentageUsage should be less after consumer has closed",
+                tempPercentUsageWithConsumer > brokerView.getTempPercentUsage());
+
+
+    }
+
+    private MessageConsumer createConsumer() throws Exception {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL);
+        Connection connection = connectionFactory.createConnection();
+
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createTopic(DESTINATION);
+
+        MessageConsumer messageConsumer = session.createConsumer(destination);
+        return messageConsumer;
+    }
+
+
+    /*
+     Send X message with a sequence number held in "appID"
+     */
+    private Long sendMessages(int messageCount) throws Exception {
+
+        long numberOfMessageSent = 0;
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL);
+
+
+        Connection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+
+        final String blob = new String(new byte[4 * 1024]);
+        try {
+
+            Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageProducer jmsProducer = producerSession.createProducer(producerSession.createTopic(DESTINATION));
+
+            Message sendMessage = producerSession.createTextMessage(blob);
+
+            for (int i = 0; i < messageCount; i++) {
+
+                jmsProducer.send(sendMessage);
+                producerSession.commit();
+                numberOfMessageSent++;
+
+            }
+
+            LOG.info(" Finished after producing : " + numberOfMessageSent);
+            return numberOfMessageSent;
+
+        } catch (Exception ex) {
+            LOG.info("Exception received producing ", ex);
+            LOG.info("finishing after exception :" + numberOfMessageSent);
+            return numberOfMessageSent;
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+
+    }
+
+
+    private BrokerViewMBean getBrokerView(BrokerService broker) throws Exception {
+        BrokerViewMBean brokerViewMBean = broker.getAdminView();
+        return brokerViewMBean;
+
+    }
+
+}
+