You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/09/17 16:58:40 UTC
git commit: add info log message for a queue purge event
Repository: activemq
Updated Branches:
refs/heads/trunk a0c42a61d -> f19add11d
add info log message for a queue purge event
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f19add11
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f19add11
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f19add11
Branch: refs/heads/trunk
Commit: f19add11deb07ce31c5f9bfa5aeccf6960b22f12
Parents: a0c42a6
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 17 15:57:55 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Sep 17 15:57:55 2014 +0100
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 5 ++-
.../activemq/broker/region/QueuePurgeTest.java | 36 +++++++++++++++++++-
2 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/f19add11/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index c7f768e..c4d49bd 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1213,6 +1213,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
public void purge() throws Exception {
ConnectionContext c = createConnectionContext();
List<MessageReference> list = null;
+ long originalMessageCount = this.destinationStatistics.getMessages().getCount();
do {
doPageIn(true, false); // signal no expiry processing needed.
pagedInMessagesLock.readLock().lock();
@@ -1234,7 +1235,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
if (this.destinationStatistics.getMessages().getCount() > 0) {
- LOG.warn("{} after purge complete, message count stats report: {}", getActiveMQDestination().getQualifiedName(), this.destinationStatistics.getMessages().getCount());
+ LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount());
+ } else {
+ LOG.info("{} purged of {} messages", getActiveMQDestination().getQualifiedName(), originalMessageCount);
}
gc();
this.destinationStatistics.getMessages().setCount(0);
http://git-wip-us.apache.org/repos/asf/activemq/blob/f19add11/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
index a03e7bc..c017e87 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -38,6 +39,10 @@ import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,10 +94,39 @@ public class QueuePurgeTest extends CombinationTestSupport {
createProducerAndSendMessages(NUM_TO_SEND);
QueueViewMBean proxy = getProxyToQueueViewMBean();
LOG.info("purging..");
- proxy.purge();
+
+ org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(org.apache.activemq.broker.region.Queue.class);
+ final AtomicBoolean gotPurgeLogMessage = new AtomicBoolean(false);
+
+ Appender appender = new DefaultTestAppender() {
+ @Override
+ public void doAppend(LoggingEvent event) {
+ if (event.getMessage() instanceof String) {
+ String message = (String) event.getMessage();
+ if (message.contains("purged of " + NUM_TO_SEND +" messages")) {
+ LOG.info("Received a log message: {} ", event.getMessage());
+ gotPurgeLogMessage.set(true);
+ }
+ }
+ }
+ };
+
+ Level level = log4jLogger.getLevel();
+ log4jLogger.setLevel(Level.INFO);
+ log4jLogger.addAppender(appender);
+ try {
+
+ proxy.purge();
+
+ } finally {
+ log4jLogger.setLevel(level);
+ log4jLogger.removeAppender(appender);
+ }
+
assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
proxy.getQueueSize());
assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled());
+ assertTrue("got expected info purge log message", gotPurgeLogMessage.get());
}
public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {