You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/07/18 01:18:30 UTC

svn commit: r795270 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/thread/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/usec...

Author: gtully
Date: Fri Jul 17 23:18:29 2009
New Revision: 795270

URL: http://svn.apache.org/viewvc?rev=795270&view=rev
Log:
use non compencating schedualler and ensure DLQ copies message early - ensures accurate processing of expired messages - https://issues.apache.org/activemq/browse/AMQ-1112

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jul 17 23:18:29 2009
@@ -187,7 +187,7 @@
         }
         
         if (getExpireMessagesPeriod() > 0) {
-            scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
+            scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
         }
         
         super.initialize();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Fri Jul 17 23:18:29 2009
@@ -45,15 +45,18 @@
      * @throws IOException
      */
     protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
-        if (n.isExpired()) {
-            if (!broker.isExpired(n)) {
-                LOG.info("ignoring ack " + ack + ", for already expired message: " + n);
-                return;
-            }
-        }
         final Destination q = n.getRegionDestination();
         final QueueMessageReference node = (QueueMessageReference)n;
         final Queue queue = (Queue)q;
+        
+        if (n.isExpired()) {
+            if (broker.isExpired(n)) {
+                queue.messageExpired(context, this, node);
+            } else {
+                LOG.debug("ignoring ack " + ack + ", for already expired message: " + n);
+            }
+            return;
+        }
         queue.removeMessage(context, this, node, ack);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Jul 17 23:18:29 2009
@@ -25,6 +25,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import org.apache.activemq.broker.Broker;
@@ -682,14 +683,14 @@
     private boolean stampAsExpired(Message message) throws IOException {
         boolean stamped=false;
         if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
-            long expiration=message.getExpiration();
-            message.setExpiration(0);
+            long expiration=message.getExpiration();     
             message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
             stamped = true;
         }
         return stamped;
     }
 
+    
     public void messageExpired(ConnectionContext context, MessageReference node) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Message expired " + node);
@@ -708,11 +709,10 @@
 					        .getRegionDestination().getDeadLetterStrategy();
 					if(deadLetterStrategy!=null){
 						if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
-						    if (node.getRegionDestination().getActiveMQDestination().isTopic()) {
-						        // message may be inflight to other subscriptions so do not modify
-						        message = message.copy();
-						    }
-							if(!message.isPersistent()){
+						    // message may be inflight to other subscriptions so do not modify
+						    message = message.copy();
+						    message.setExpiration(0);
+						    if(!message.isPersistent()){
 							    message.setPersistent(true);
 							    message.setProperty("originalDeliveryMode",
 								        "NON_PERSISTENT");
@@ -727,7 +727,7 @@
 							if (context.getBroker()==null) {
 								context.setBroker(getRoot());
 							}
-							BrokerSupport.resend(context,message,
+							BrokerSupport.resendNoCopy(context,message,
 							        deadLetterDestination);
 						}
 					} else {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Jul 17 23:18:29 2009
@@ -307,8 +307,8 @@
                                     // While waiting for space to free up... the
                                     // message may have expired.
                                     if (message.isExpired()) {
-                                        getDestinationStatistics().getExpired().increment();
                                         broker.messageExpired(context, message);
+                                        getDestinationStatistics().getExpired().increment();
                                     } else {
                                         doMessageSend(producerExchange, message);
                                     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Fri Jul 17 23:18:29 2009
@@ -88,7 +88,6 @@
     private transient ActiveMQConnection connection;
     private transient org.apache.activemq.broker.region.Destination regionDestination;
     private transient MemoryUsage memoryUsage;
-    private transient boolean expired;
 
     private BrokerId[] brokerPath;
     private BrokerId[] cluster;
@@ -339,9 +338,6 @@
 
     public void setExpiration(long expiration) {
         this.expiration = expiration;
-        if (this.expiration > 0) {
-            expired = false;
-        }
     }
 
     /**
@@ -439,13 +435,8 @@
     }
 
     public boolean isExpired() {
-        if (!expired) {
-            long expireTime = getExpiration();
-            if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
-                expired = true;
-            }
-        }
-        return expired;
+        long expireTime = getExpiration();
+        return expireTime > 0 && System.currentTimeMillis() > expireTime;
     }
 
     public boolean isAdvisory() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Fri Jul 17 23:18:29 2009
@@ -47,6 +47,16 @@
         TIMER_TASKS.put(task, timerTask);
     }
 
+    /*
+     * execute on rough schedual based on termination of last execution. There is no
+     * compensation (two runs in quick succession) for delays
+     */
+    public synchronized void schedualPeriodically(final Runnable task, long period) {
+        TimerTask timerTask = new SchedulerTimerTask(task);
+        CLOCK_DAEMON.schedule(timerTask, period, period);
+        TIMER_TASKS.put(task, timerTask);
+    }
+    
     public synchronized void cancel(Runnable task) {
     	TimerTask ticket = TIMER_TASKS.remove(task);
         if (ticket != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java Fri Jul 17 23:18:29 2009
@@ -32,6 +32,10 @@
     private BrokerSupport() {        
     }
     
+    public static void resendNoCopy(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
+        doResend(context, originalMessage, deadLetterDestination, false);
+    }
+    
     /**
      * @param context
      * @param originalMessage 
@@ -39,7 +43,11 @@
      * @throws Exception
      */
     public static void resend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
-        Message message = originalMessage.copy();
+        doResend(context, originalMessage, deadLetterDestination, true);
+    }
+    
+    public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception {       
+        Message message = copy ? originalMessage.copy() : originalMessage;
         message.setOriginalDestination(message.getDestination());
         message.setOriginalTransactionId(message.getTransactionId());
         message.setDestination(deadLetterDestination);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Fri Jul 17 23:18:29 2009
@@ -49,12 +49,13 @@
 
     private static final Log LOG = LogFactory.getLog(ExpiredMessagesTest.class);
     
-	BrokerService broker;
-	Connection connection;
-	Session session;
-	MessageProducer producer;
-	MessageConsumer consumer;
-	public ActiveMQDestination destination = new ActiveMQQueue("test");
+    BrokerService broker;
+    Connection connection;
+    Session session;
+    MessageProducer producer;
+    MessageConsumer consumer;
+    public ActiveMQDestination destination = new ActiveMQQueue("test");
+    public ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
     public boolean useTextMessage = true;
     public boolean useVMCursor = true;
     
@@ -103,12 +104,12 @@
 		
         consumerThread.start();
 		
-		
+		final int numMessagesToSend = 10000;
 		Thread producingThread = new Thread("Producing Thread") {
             public void run() {
                 try {
                 	int i = 0;
-                	while (i++ < 10000) {
+                	while (i++ < numMessagesToSend) {
                 		producer.send(session.createTextMessage("test"));
                 	}
                 	producer.close();
@@ -159,10 +160,36 @@
                 return view.getQueueSize() == 0;
             }
         }));
+        
+        final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueueCount();
+        final long totalExpiredCount = view.getExpiredCount() + expiredBeforeEnqueue;
+        
+        final DestinationViewMBean dlqView = createView(dlqDestination);
+        LOG.info("DLQ stats: size= " + dlqView.getQueueSize() + ", enqueues: " + dlqView.getDequeueCount() + ", dequeues: " + dlqView.getDequeueCount()
+                + ", dispatched: " + dlqView.getDispatchCount() + ", inflight: " + dlqView.getInFlightCount() + ", expiries: " + dlqView.getExpiredCount());
+        
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return totalExpiredCount == dlqView.getQueueSize();
+            }
+        });
+        assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getQueueSize());
+        
+        // verify DQL
+        MessageConsumer dlqConsumer = createDlqConsumer(connection);
+        int count = 0;
+        while (dlqConsumer.receive(4000) != null) {
+            count++;
+        }
+        assertEquals("dlq returned all expired", count, totalExpiredCount);
 	}
 
 	
-	public void initCombosForTestRecoverExpiredMessages() {
+	private MessageConsumer createDlqConsumer(Connection connection) throws Exception {
+	    return connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination);
+    }
+
+    public void initCombosForTestRecoverExpiredMessages() {
 	    addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
 	}
 	
@@ -266,9 +293,9 @@
 		 String domain = "org.apache.activemq";
 		 ObjectName name;
 		if (destination.isQueue()) {
-			name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
+			name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" + destination.getPhysicalName());
 		} else {
-			name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
+			name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" + destination.getPhysicalName());
 		}
 		return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
 	}