You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/02/24 22:00:26 UTC

svn commit: r1074289 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/command/Message.java test/java/org/apache/activemq/bugs/DoubleExpireTest.java

Author: tabish
Date: Thu Feb 24 21:00:26 2011
New Revision: 1074289

URL: http://svn.apache.org/viewvc?rev=1074289&view=rev
Log:
Add fix for https://issues.apache.org/jira/browse/AMQ-3153 and add user supplied unit test.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=1074289&r1=1074288&r2=1074289&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Thu Feb 24 21:00:26 2011
@@ -27,6 +27,7 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
@@ -36,9 +37,9 @@ import org.apache.activemq.wireformat.Wi
 
 /**
  * Represents an ActiveMQ message
- * 
+ *
  * @openwire:marshaller
- * 
+ *
  */
 public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
 
@@ -122,6 +123,9 @@ public abstract class Message extends Ba
 
         if (properties != null) {
             copy.properties = new HashMap<String, Object>(properties);
+
+            // The new message hasn't expired, so remove this feild.
+            copy.properties.remove(RegionBroker.ORIGINAL_EXPIRATION);
         } else {
             copy.properties = properties;
         }
@@ -177,7 +181,7 @@ public abstract class Message extends Ba
         lazyCreateProperties();
         properties.put(name, value);
     }
-    
+
     public void removeProperty(String name) throws IOException {
         lazyCreateProperties();
         properties.remove(name);
@@ -438,7 +442,7 @@ public abstract class Message extends Ba
      * consumer id is an active consumer on the broker, the message is dropped.
      * Used by the AdvisoryBroker to replay advisory messages to a specific
      * consumer.
-     * 
+     *
      * @openwire:property version=1 cache=true
      */
     public ConsumerId getTargetConsumerId() {
@@ -502,7 +506,7 @@ public abstract class Message extends Ba
 
     /**
      * The route of brokers the command has moved through.
-     * 
+     *
      * @openwire:property version=1 cache=true
      */
     public BrokerId[] getBrokerPath() {
@@ -541,7 +545,7 @@ public abstract class Message extends Ba
      * Used to schedule the arrival time of a message to a broker. The broker
      * will not dispatch a message to a consumer until it's arrival time has
      * elapsed.
-     * 
+     *
      * @openwire:property version=1
      */
     public long getArrival() {
@@ -556,7 +560,7 @@ public abstract class Message extends Ba
      * Only set by the broker and defines the userID of the producer connection
      * who sent this message. This is an optional field, it needs to be enabled
      * on the broker to have this field populated.
-     * 
+     *
      * @openwire:property version=1
      */
     public String getUserID() {
@@ -589,11 +593,11 @@ public abstract class Message extends Ba
             this.memoryUsage=regionDestination.getMemoryUsage();
         }
     }
-    
+
     public MemoryUsage getMemoryUsage() {
         return this.memoryUsage;
     }
-    
+
     public void setMemoryUsage(MemoryUsage usage) {
         this.memoryUsage=usage;
     }
@@ -614,7 +618,7 @@ public abstract class Message extends Ba
         if (rc == 1 && getMemoryUsage() != null) {
             getMemoryUsage().increaseUsage(size);
             //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
-           
+
         }
 
         //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
@@ -634,7 +638,7 @@ public abstract class Message extends Ba
             //Thread.dumpStack();
             //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
         }
-       
+
         //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
 
         return rc;
@@ -653,7 +657,7 @@ public abstract class Message extends Ba
         }
         return size;
     }
-    
+
     protected int getMinimumMessageSize() {
         int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
         //let destination override
@@ -697,7 +701,7 @@ public abstract class Message extends Ba
     /**
      * If a message is stored in multiple nodes on a cluster, all the cluster
      * members will be listed here. Otherwise, it will be null.
-     * 
+     *
      * @openwire:property version=3 cache=true
      */
     public BrokerId[] getCluster() {
@@ -734,16 +738,16 @@ public abstract class Message extends Ba
     public void setBrokerOutTime(long brokerOutTime) {
         this.brokerOutTime = brokerOutTime;
     }
-    
+
     public boolean isDropped() {
         return false;
     }
-    
+
     @Override
     public String toString() {
         return toString(null);
     }
-    
+
     @Override
     public String toString(Map<String, Object>overrideFields) {
         try {
@@ -751,5 +755,5 @@ public abstract class Message extends Ba
         } catch (IOException e) {
         }
         return super.toString(overrideFields);
-    }    
+    }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java?rev=1074289&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java Thu Feb 24 21:00:26 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.TimeoutException;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.Assert;
+
+public class DoubleExpireTest extends EmbeddedBrokerTestSupport {
+
+	private static final long MESSAGE_TTL_MILLIS = 1000;
+	private static final long MAX_TEST_TIME_MILLIS = 60000;
+
+	public void setUp() throws Exception {
+		setAutoFail(true);
+		setMaxTestTime(MAX_TEST_TIME_MILLIS);
+		super.setUp();
+	}
+
+	/**
+	 * This test verifies that a message that expires can be be resent to queue
+	 * with a new expiration and that it will be processed as a new message and
+	 * allowed to re-expire.
+	 * <p>
+	 * <b>NOTE:</b> This test fails on AMQ 5.4.2 because the originalExpiration
+	 * timestamp is not cleared when the message is resent.
+	 */
+	public void testDoubleExpireWithoutMove() throws Exception {
+		// Create the default dead letter queue.
+		final ActiveMQDestination DLQ = createDestination("ActiveMQ.DLQ");
+
+		Connection conn = createConnection();
+		try {
+			conn.start();
+			Session session = conn.createSession(false,
+					Session.AUTO_ACKNOWLEDGE);
+
+			// Verify that the test queue and DLQ are empty.
+			Assert.assertEquals(0, getSize(destination));
+			Assert.assertEquals(0, getSize(DLQ));
+
+			// Enqueue a message to the test queue that will expire after 1s.
+			MessageProducer producer = session.createProducer(destination);
+			Message testMessage = session.createTextMessage("test message");
+			producer.send(testMessage, Message.DEFAULT_DELIVERY_MODE,
+					Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS);
+			Assert.assertEquals(1, getSize(destination));
+
+			// Wait for the message to expire.
+			waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
+			Assert.assertEquals(1, getSize(DLQ));
+
+			// Consume the message from the DLQ and re-enqueue it to the test
+			// queue so that it expires after 1s.
+			MessageConsumer consumer = session.createConsumer(DLQ);
+			Message expiredMessage = consumer.receive();
+			Assert.assertEquals(testMessage.getJMSMessageID(), expiredMessage
+					.getJMSMessageID());
+
+			producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE,
+					Message.DEFAULT_PRIORITY, MESSAGE_TTL_MILLIS);
+			Assert.assertEquals(1, getSize(destination));
+			Assert.assertEquals(0, getSize(DLQ));
+
+			// Verify that the resent message is "different" in that it has
+			// another ID.
+			Assert.assertNotSame(testMessage.getJMSMessageID(), expiredMessage
+					.getJMSMessageID());
+
+			// Wait for the message to re-expire.
+			waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
+			Assert.assertEquals(1, getSize(DLQ));
+
+			// Re-consume the message from the DLQ.
+			Message reexpiredMessage = consumer.receive();
+			Assert.assertEquals(expiredMessage.getJMSMessageID(), reexpiredMessage
+					.getJMSMessageID());
+		} finally {
+			conn.close();
+		}
+	}
+
+	/**
+	 * A helper method that returns the embedded broker's implementation of a
+	 * JMS queue.
+	 */
+	private Queue getPhysicalDestination(ActiveMQDestination destination)
+			throws Exception {
+		return (Queue) broker.getAdminView().getBroker().getDestinationMap()
+				.get(destination);
+	}
+
+	/**
+	 * A helper method that returns the size of the specified queue/topic.
+	 */
+	private long getSize(ActiveMQDestination destination) throws Exception {
+		return getPhysicalDestination(destination) != null ? getPhysicalDestination(
+				destination).getDestinationStatistics().getMessages()
+				.getCount()
+				: 0;
+	}
+
+	/**
+	 * A helper method that waits for a destination to reach a certain size.
+	 */
+	private void waitForSize(ActiveMQDestination destination, int size,
+			long timeoutMillis) throws Exception, TimeoutException {
+		long startTimeMillis = System.currentTimeMillis();
+
+		while (getSize(destination) != size
+				&& System.currentTimeMillis() < (startTimeMillis + timeoutMillis)) {
+			Thread.sleep(250);
+		}
+
+		if (getSize(destination) != size) {
+			throw new TimeoutException("Destination "
+					+ destination.getPhysicalName() + " did not reach size "
+					+ size + " within " + timeoutMillis + "ms.");
+		}
+	}
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
------------------------------------------------------------------------------
    svn:eol-style = native