You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/03/05 15:45:52 UTC

[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5640 - fix by ensuring parent stat is updated on dest init, thanks for the nice test Torsten

https://issues.apache.org/jira/browse/AMQ-5640 -  fix by ensuring parent stat is updated on dest init, thanks for the nice test Torsten


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ab28b771
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ab28b771
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ab28b771

Branch: refs/heads/master
Commit: ab28b771e367d8fa4f9591fb88cb9c29ec41c25a
Parents: 42b606d
Author: gtully <ga...@gmail.com>
Authored: Thu Mar 5 14:46:59 2015 +0000
Committer: gtully <ga...@gmail.com>
Committed: Thu Mar 5 14:46:59 2015 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |   2 +-
 .../activemq/jmx/TotalMessageCountTest.java     | 186 +++++++++++++++++++
 2 files changed, 187 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ab28b771/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 67b9119..c3de541 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -396,7 +396,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                    listener.processExpired();
                } while (!listener.done());
             } else {
-                destinationStatistics.getMessages().setCount(messageCount);
+                destinationStatistics.getMessages().add(messageCount);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ab28b771/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/TotalMessageCountTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/TotalMessageCountTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/TotalMessageCountTest.java
new file mode 100644
index 0000000..796237b
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/TotalMessageCountTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jmx;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author <a href="http://tmielke.blogspot.com">Torsten Mielke</a>
+ */
+public class TotalMessageCountTest {
+    private static final Logger LOG = LoggerFactory.getLogger(TotalMessageCountTest.class);
+
+    private BrokerService brokerService;
+    private final String TESTQUEUE = "testQueue";
+    private ActiveMQConnectionFactory connectionFactory;
+    private final String BROKER_ADDRESS = "tcp://localhost:0";
+    private final ActiveMQQueue queue = new ActiveMQQueue(TESTQUEUE);
+
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        startBroker(true);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+
+    @Test
+    public void testNegativeTotalMessageCount() throws Exception {
+
+        LOG.info("Running test testNegativeTotalMessageCount()");
+        // send one msg first
+        sendMessage();
+
+        // restart the broker
+        restartBroker();
+
+        // receive one msg
+        receiveMessage();
+
+        // assert TotalMessageCount JMX property > 0
+        long totalMessageCount = getTotalMessageCount();
+        if (totalMessageCount < 0 ) {
+            LOG.error("Unexpected negative TotalMessageCount: " + totalMessageCount);
+        } else {
+            LOG.info("TotalMessageCount: " +  totalMessageCount);
+        }
+
+        assertTrue("Non negative TotalMessageCount " + totalMessageCount, totalMessageCount > -1);
+        LOG.info("Test testNegativeTotalMessageCount() completed.");
+    }
+
+
+    /**
+     * Sends one persistent TextMessage to the TESTQUEUE.
+     * Initializes a new JMS connection, session and consumer and closes them
+     * after use.
+     * @throws JMSException on failure
+     */
+    private void sendMessage() throws JMSException {
+        Connection conn = connectionFactory.createConnection();
+        try {
+            conn.start();
+            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination queue = session.createQueue(TESTQUEUE);
+            TextMessage msg = session.createTextMessage("This is a message.");
+            MessageProducer producer = session.createProducer(queue);
+            producer.send(queue, msg);
+            LOG.info("Message sent to " + TESTQUEUE);
+        } finally {
+            conn.close();
+        }
+    }
+
+
+    /**
+     * Receives a single JMS message from the broker.
+     * Initializes a new JMS connection, session and consumer and closes them
+     * after use.
+     * @return
+     * @throws JMSException
+     */
+    private Message receiveMessage() throws JMSException {
+        Connection conn = connectionFactory.createConnection();
+        Message msg = null;
+        try {
+            conn.start();
+            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination queue = session.createQueue(TESTQUEUE);
+            MessageConsumer consumer = session.createConsumer(queue);
+            msg = consumer.receive(TimeUnit.SECONDS.toMillis(10));
+            if (msg != null) {
+                LOG.info("Message received from " + TESTQUEUE);
+            }
+            consumer.close();
+            session.close();
+        } finally {
+            conn.close();
+        }
+        return msg;
+    }
+
+    /**
+     * restarts the broker
+     *
+     * @return true if restart was successful
+     * @throws Exception if restart failed.
+     */
+    private boolean restartBroker() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+        return startBroker(false);
+    }
+
+
+    /**
+     * starts the broker
+     *
+     * @return true if start was successful
+     * @throws Exception if restart failed.
+     */
+    private boolean startBroker(boolean deleteMessagesOnStartup) throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(true);
+        brokerService.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
+        brokerService.setUseJmx(true);
+        connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
+        brokerService.start();
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+        LOG.info("Broker started.");
+        return true;
+    }
+    /**
+     * Reads the brokers TotalMessageCount property on the JMX Broker MBean.
+     * @return the total message count for the broker
+     * @throws Exception if the JMX operation fails
+     */
+    private long getTotalMessageCount() throws Exception {
+
+        ObjectName brokerViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
+        BrokerViewMBean brokerMBean = (BrokerViewMBean)
+                brokerService.getManagementContext().newProxyInstance(brokerViewMBeanName, BrokerViewMBean.class, true);
+        LOG.debug("Broker TotalMessageCount: " + brokerMBean.getTotalMessageCount());
+        return brokerMBean.getTotalMessageCount();
+    }
+}
\ No newline at end of file