You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/01/17 17:43:27 UTC

svn commit: r1059981 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/usage/StoreUsage.java test/java/org/apache/activemq/usage/StoreUsageTest.java test/java/org/apache/activemq/util/ProducerThread.java

Author: dejanb
Date: Mon Jan 17 16:43:26 2011
New Revision: 1059981

URL: http://svn.apache.org/viewvc?rev=1059981&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3143 - changing store usage via JMX

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java?rev=1059981&r1=1059980&r2=1059981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java Mon Jan 17 16:43:26 2011
@@ -66,4 +66,15 @@ public class StoreUsage extends Usage<St
             return super.getPercentUsage();
         }
     }
+
+    @Override
+    public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException {
+        if (parent != null) {
+            if (parent.waitForSpace(timeout, highWaterMark)) {
+                return true;
+            }
+        }
+
+        return super.waitForSpace(timeout, highWaterMark);
+    }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/StoreUsageTest.java?rev=1059981&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/StoreUsageTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/StoreUsageTest.java Mon Jan 17 16:43:26 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.ProducerThread;
+import org.apache.activemq.util.Wait;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Session;
+
+public class StoreUsageTest extends EmbeddedBrokerTestSupport {
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        broker.getSystemUsage().getStoreUsage().setLimit(10 * 1024);
+        broker.deleteAllMessages();
+        return broker;
+    }
+
+    protected boolean isPersistent() {
+        return true;
+    }
+
+    public void testJmx() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination dest = sess.createQueue(this.getClass().getName());
+        final ProducerThread producer = new ProducerThread(sess, dest);
+        producer.start();
+
+        // wait for the producer to block
+        Thread.sleep(5000);
+
+        broker.getAdminView().setStoreLimit(1024 * 1024);
+
+        Thread.sleep(5000);
+
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return producer.getSentCount() == producer.getMessageCount();
+            }
+        }, 5000);
+
+        assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
+
+    }
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java?rev=1059981&r1=1059980&r2=1059981&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/ProducerThread.java Mon Jan 17 16:43:26 2011
@@ -32,6 +32,7 @@ public class ProducerThread extends Thre
     Destination dest;
     Session sess;
     int sleep = 0;
+    int sentCount = 0;
 
     public ProducerThread(Session sess, Destination dest) {
         this.dest = dest;
@@ -42,9 +43,9 @@ public class ProducerThread extends Thre
         MessageProducer producer = null;
         try {
             producer = sess.createProducer(dest);
-            for (int i = 0; i < messageCount; i++) {
-                producer.send(sess.createTextMessage("test message: " + i));
-                LOG.info("Sent 'test message: " + i + "'");
+            for (sentCount = 0; sentCount < messageCount; sentCount++) {
+                producer.send(sess.createTextMessage("test message: " + sentCount));
+                LOG.info("Sent 'test message: " + sentCount + "'");
                 if (sleep > 0) {
                     Thread.sleep(sleep);
                 }
@@ -70,4 +71,12 @@ public class ProducerThread extends Thre
     public void setSleep(int sleep) {
         this.sleep = sleep;
     }
+
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public int getSentCount() {
+        return sentCount;
+    }
 }