You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/07/18 01:18:30 UTC
svn commit: r795270 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/command/
main/java/org/apache/activemq/thread/ main/java/org/apache/activemq/util/
test/java/org/apache/activemq/usec...
Author: gtully
Date: Fri Jul 17 23:18:29 2009
New Revision: 795270
URL: http://svn.apache.org/viewvc?rev=795270&view=rev
Log:
use non compencating schedualler and ensure DLQ copies message early - ensures accurate processing of expired messages - https://issues.apache.org/activemq/browse/AMQ-1112
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jul 17 23:18:29 2009
@@ -187,7 +187,7 @@
}
if (getExpireMessagesPeriod() > 0) {
- scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
+ scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
super.initialize();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Fri Jul 17 23:18:29 2009
@@ -45,15 +45,18 @@
* @throws IOException
*/
protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
- if (n.isExpired()) {
- if (!broker.isExpired(n)) {
- LOG.info("ignoring ack " + ack + ", for already expired message: " + n);
- return;
- }
- }
final Destination q = n.getRegionDestination();
final QueueMessageReference node = (QueueMessageReference)n;
final Queue queue = (Queue)q;
+
+ if (n.isExpired()) {
+ if (broker.isExpired(n)) {
+ queue.messageExpired(context, this, node);
+ } else {
+ LOG.debug("ignoring ack " + ack + ", for already expired message: " + n);
+ }
+ return;
+ }
queue.removeMessage(context, this, node, ack);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Jul 17 23:18:29 2009
@@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
@@ -682,14 +683,14 @@
private boolean stampAsExpired(Message message) throws IOException {
boolean stamped=false;
if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
- long expiration=message.getExpiration();
- message.setExpiration(0);
+ long expiration=message.getExpiration();
message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
stamped = true;
}
return stamped;
}
+
public void messageExpired(ConnectionContext context, MessageReference node) {
if (LOG.isDebugEnabled()) {
LOG.debug("Message expired " + node);
@@ -708,11 +709,10 @@
.getRegionDestination().getDeadLetterStrategy();
if(deadLetterStrategy!=null){
if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
- if (node.getRegionDestination().getActiveMQDestination().isTopic()) {
- // message may be inflight to other subscriptions so do not modify
- message = message.copy();
- }
- if(!message.isPersistent()){
+ // message may be inflight to other subscriptions so do not modify
+ message = message.copy();
+ message.setExpiration(0);
+ if(!message.isPersistent()){
message.setPersistent(true);
message.setProperty("originalDeliveryMode",
"NON_PERSISTENT");
@@ -727,7 +727,7 @@
if (context.getBroker()==null) {
context.setBroker(getRoot());
}
- BrokerSupport.resend(context,message,
+ BrokerSupport.resendNoCopy(context,message,
deadLetterDestination);
}
} else {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Jul 17 23:18:29 2009
@@ -307,8 +307,8 @@
// While waiting for space to free up... the
// message may have expired.
if (message.isExpired()) {
- getDestinationStatistics().getExpired().increment();
broker.messageExpired(context, message);
+ getDestinationStatistics().getExpired().increment();
} else {
doMessageSend(producerExchange, message);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Fri Jul 17 23:18:29 2009
@@ -88,7 +88,6 @@
private transient ActiveMQConnection connection;
private transient org.apache.activemq.broker.region.Destination regionDestination;
private transient MemoryUsage memoryUsage;
- private transient boolean expired;
private BrokerId[] brokerPath;
private BrokerId[] cluster;
@@ -339,9 +338,6 @@
public void setExpiration(long expiration) {
this.expiration = expiration;
- if (this.expiration > 0) {
- expired = false;
- }
}
/**
@@ -439,13 +435,8 @@
}
public boolean isExpired() {
- if (!expired) {
- long expireTime = getExpiration();
- if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
- expired = true;
- }
- }
- return expired;
+ long expireTime = getExpiration();
+ return expireTime > 0 && System.currentTimeMillis() > expireTime;
}
public boolean isAdvisory() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Fri Jul 17 23:18:29 2009
@@ -47,6 +47,16 @@
TIMER_TASKS.put(task, timerTask);
}
+ /*
+ * execute on rough schedual based on termination of last execution. There is no
+ * compensation (two runs in quick succession) for delays
+ */
+ public synchronized void schedualPeriodically(final Runnable task, long period) {
+ TimerTask timerTask = new SchedulerTimerTask(task);
+ CLOCK_DAEMON.schedule(timerTask, period, period);
+ TIMER_TASKS.put(task, timerTask);
+ }
+
public synchronized void cancel(Runnable task) {
TimerTask ticket = TIMER_TASKS.remove(task);
if (ticket != null) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java Fri Jul 17 23:18:29 2009
@@ -32,6 +32,10 @@
private BrokerSupport() {
}
+ public static void resendNoCopy(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
+ doResend(context, originalMessage, deadLetterDestination, false);
+ }
+
/**
* @param context
* @param originalMessage
@@ -39,7 +43,11 @@
* @throws Exception
*/
public static void resend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
- Message message = originalMessage.copy();
+ doResend(context, originalMessage, deadLetterDestination, true);
+ }
+
+ public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception {
+ Message message = copy ? originalMessage.copy() : originalMessage;
message.setOriginalDestination(message.getDestination());
message.setOriginalTransactionId(message.getTransactionId());
message.setDestination(deadLetterDestination);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Fri Jul 17 23:18:29 2009
@@ -49,12 +49,13 @@
private static final Log LOG = LogFactory.getLog(ExpiredMessagesTest.class);
- BrokerService broker;
- Connection connection;
- Session session;
- MessageProducer producer;
- MessageConsumer consumer;
- public ActiveMQDestination destination = new ActiveMQQueue("test");
+ BrokerService broker;
+ Connection connection;
+ Session session;
+ MessageProducer producer;
+ MessageConsumer consumer;
+ public ActiveMQDestination destination = new ActiveMQQueue("test");
+ public ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
public boolean useTextMessage = true;
public boolean useVMCursor = true;
@@ -103,12 +104,12 @@
consumerThread.start();
-
+ final int numMessagesToSend = 10000;
Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
int i = 0;
- while (i++ < 10000) {
+ while (i++ < numMessagesToSend) {
producer.send(session.createTextMessage("test"));
}
producer.close();
@@ -159,10 +160,36 @@
return view.getQueueSize() == 0;
}
}));
+
+ final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueueCount();
+ final long totalExpiredCount = view.getExpiredCount() + expiredBeforeEnqueue;
+
+ final DestinationViewMBean dlqView = createView(dlqDestination);
+ LOG.info("DLQ stats: size= " + dlqView.getQueueSize() + ", enqueues: " + dlqView.getDequeueCount() + ", dequeues: " + dlqView.getDequeueCount()
+ + ", dispatched: " + dlqView.getDispatchCount() + ", inflight: " + dlqView.getInFlightCount() + ", expiries: " + dlqView.getExpiredCount());
+
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return totalExpiredCount == dlqView.getQueueSize();
+ }
+ });
+ assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getQueueSize());
+
+ // verify DQL
+ MessageConsumer dlqConsumer = createDlqConsumer(connection);
+ int count = 0;
+ while (dlqConsumer.receive(4000) != null) {
+ count++;
+ }
+ assertEquals("dlq returned all expired", count, totalExpiredCount);
}
- public void initCombosForTestRecoverExpiredMessages() {
+ private MessageConsumer createDlqConsumer(Connection connection) throws Exception {
+ return connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination);
+ }
+
+ public void initCombosForTestRecoverExpiredMessages() {
addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
}
@@ -266,9 +293,9 @@
String domain = "org.apache.activemq";
ObjectName name;
if (destination.isQueue()) {
- name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
+ name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" + destination.getPhysicalName());
} else {
- name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
+ name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" + destination.getPhysicalName());
}
return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
}