You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/07/09 18:26:42 UTC
svn commit: r792598 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Thu Jul 9 16:26:42 2009
New Revision: 792598
URL: http://svn.apache.org/viewvc?rev=792598&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2322 - test and correction
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=792598&r1=792597&r2=792598&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jul 9 16:26:42 2009
@@ -205,7 +205,9 @@
// Message could have expired while it was being
// loaded..
if (broker.isExpired(message)) {
- messageExpired(createConnectionContext(), message);
+ messageExpired(createConnectionContext(), createMessageReference(message));
+ // drop message will decrement so counter balance here
+ destinationStatistics.getMessages().increment();
return true;
}
if (hasSpace()) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=792598&r1=792597&r2=792598&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Thu Jul 9 16:26:42 2009
@@ -16,9 +16,12 @@
*/
package org.apache.activemq.usecases;
+import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -34,8 +37,10 @@
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,7 +54,9 @@
MessageProducer producer;
MessageConsumer consumer;
public ActiveMQDestination destination = new ActiveMQQueue("test");
-
+ public boolean useTextMessage = true;
+ public boolean useVMCursor = true;
+
public static Test suite() {
return suite(ExpiredMessagesTest.class);
}
@@ -59,21 +66,8 @@
}
protected void setUp() throws Exception {
- broker = new BrokerService();
- broker.setBrokerName("localhost");
- broker.setDataDirectory("data/");
- broker.setUseJmx(true);
- broker.deleteAllMessages();
-
- PolicyEntry defaultPolicy = new PolicyEntry();
- defaultPolicy.setExpireMessagesPeriod(100);
- PolicyMap policyMap = new PolicyMap();
- policyMap.setDefaultEntry(defaultPolicy);
- broker.setDestinationPolicy(policyMap);
-
- broker.addConnector("tcp://localhost:61616");
- broker.start();
- broker.waitUntilStarted();
+ final boolean deleteAllMessages = true;
+ broker = createBroker(deleteAllMessages, 100);
}
public void testExpiredMessages() throws Exception {
@@ -129,8 +123,8 @@
producingThread.join();
session.close();
- Thread.sleep(5000);
-
+ Thread.sleep(2000);
+
DestinationViewMBean view = createView(destination);
LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
@@ -145,8 +139,107 @@
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount());
}
+
+
+ public void initCombosForTestRecoverExpiredMessages() {
+ addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
+ }
- protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+ public void testRecoverExpiredMessages() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ "failover://tcp://localhost:61616");
+ connection = factory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(destination);
+ producer.setTimeToLive(2000);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ Thread producingThread = new Thread("Producing Thread") {
+ public void run() {
+ try {
+ int i = 0;
+ while (i++ < 1000) {
+ Message message = useTextMessage ? session
+ .createTextMessage("test") : session
+ .createObjectMessage("test");
+ producer.send(message);
+ }
+ producer.close();
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+
+ producingThread.start();
+ producingThread.join();
+
+ DestinationViewMBean view = createView(destination);
+ LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: "
+ + view.getDequeueCount() + ", dequeues: "
+ + view.getDequeueCount() + ", dispatched: "
+ + view.getDispatchCount() + ", inflight: "
+ + view.getInFlightCount() + ", expiries: "
+ + view.getExpiredCount());
+
+ LOG.info("stopping broker");
+ broker.stop();
+ broker.waitUntilStopped();
+
+ Thread.sleep(5000);
+
+ LOG.info("recovering broker");
+ final boolean deleteAllMessages = false;
+ broker = createBroker(deleteAllMessages, 5000);
+
+ view = createView(destination);
+ LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: "
+ + view.getDequeueCount() + ", dequeues: "
+ + view.getDequeueCount() + ", dispatched: "
+ + view.getDispatchCount() + ", inflight: "
+ + view.getInFlightCount() + ", expiries: "
+ + view.getExpiredCount());
+
+ long expiry = System.currentTimeMillis() + 30000;
+ while (view.getQueueSize() > 0 && System.currentTimeMillis() < expiry) {
+ Thread.sleep(500);
+ }
+ LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: "
+ + view.getDequeueCount() + ", dequeues: "
+ + view.getDequeueCount() + ", dispatched: "
+ + view.getDispatchCount() + ", inflight: "
+ + view.getInFlightCount() + ", expiries: "
+ + view.getExpiredCount());
+ assertEquals("Wrong QueueSize: ", 0, view.getQueueSize());
+ assertEquals("all dequeues were expired", view.getDequeueCount(), view.getExpiredCount());
+ }
+
+ private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName("localhost");
+ AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
+ adaptor.setDirectory(new File("data/"));
+ adaptor.setForceRecoverReferenceStore(true);
+ broker.setPersistenceAdapter(adaptor);
+
+ PolicyEntry defaultPolicy = new PolicyEntry();
+ if (useVMCursor) {
+ defaultPolicy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+ }
+ defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(defaultPolicy);
+ broker.setDestinationPolicy(policyMap);
+ broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ broker.addConnector("tcp://localhost:61616");
+ broker.start();
+ broker.waitUntilStarted();
+ return broker;
+ }
+
+ protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
String domain = "org.apache.activemq";
ObjectName name;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=792598&r1=792597&r2=792598&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Thu Jul 9 16:26:42 2009
@@ -137,7 +137,7 @@
assertTrue("producer completed within time ", !producingThread.isAlive());
- Thread.sleep(2*expiryPeriod);
+ Thread.sleep(3*expiryPeriod);
DestinationViewMBean view = createView(destination);
assertEquals("All sent have expired ", sendCount, view.getExpiredCount());
}