You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/09/17 16:58:40 UTC

git commit: add info log message for a queue purge event

Repository: activemq
Updated Branches:
  refs/heads/trunk a0c42a61d -> f19add11d


add info log message for a queue purge event


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f19add11
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f19add11
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f19add11

Branch: refs/heads/trunk
Commit: f19add11deb07ce31c5f9bfa5aeccf6960b22f12
Parents: a0c42a6
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 17 15:57:55 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Sep 17 15:57:55 2014 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  5 ++-
 .../activemq/broker/region/QueuePurgeTest.java  | 36 +++++++++++++++++++-
 2 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f19add11/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index c7f768e..c4d49bd 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1213,6 +1213,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
     public void purge() throws Exception {
         ConnectionContext c = createConnectionContext();
         List<MessageReference> list = null;
+        long originalMessageCount = this.destinationStatistics.getMessages().getCount();
         do {
             doPageIn(true, false);  // signal no expiry processing needed.
             pagedInMessagesLock.readLock().lock();
@@ -1234,7 +1235,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
         } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
 
         if (this.destinationStatistics.getMessages().getCount() > 0) {
-            LOG.warn("{} after purge complete, message count stats report: {}", getActiveMQDestination().getQualifiedName(), this.destinationStatistics.getMessages().getCount());
+            LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount());
+        } else {
+            LOG.info("{} purged of {} messages", getActiveMQDestination().getQualifiedName(), originalMessageCount);
         }
         gc();
         this.destinationStatistics.getMessages().setCount(0);

http://git-wip-us.apache.org/repos/asf/activemq/blob/f19add11/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
index a03e7bc..c017e87 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -38,6 +39,10 @@ import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,10 +94,39 @@ public class QueuePurgeTest extends CombinationTestSupport {
         createProducerAndSendMessages(NUM_TO_SEND);
         QueueViewMBean proxy = getProxyToQueueViewMBean();
         LOG.info("purging..");
-        proxy.purge();
+
+        org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(org.apache.activemq.broker.region.Queue.class);
+        final AtomicBoolean gotPurgeLogMessage = new AtomicBoolean(false);
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getMessage() instanceof String) {
+                    String message = (String) event.getMessage();
+                    if (message.contains("purged of " + NUM_TO_SEND +" messages")) {
+                        LOG.info("Received a log message: {} ", event.getMessage());
+                        gotPurgeLogMessage.set(true);
+                    }
+                }
+            }
+        };
+
+        Level level = log4jLogger.getLevel();
+        log4jLogger.setLevel(Level.INFO);
+        log4jLogger.addAppender(appender);
+        try {
+
+            proxy.purge();
+
+        } finally {
+            log4jLogger.setLevel(level);
+            log4jLogger.removeAppender(appender);
+        }
+
         assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
                 proxy.getQueueSize());
         assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled());
+        assertTrue("got expected info purge log message", gotPurgeLogMessage.get());
     }
 
     public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {