You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/03/19 15:08:07 UTC

svn commit: r925221 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java

Author: rajdavies
Date: Fri Mar 19 14:08:07 2010
New Revision: 925221

URL: http://svn.apache.org/viewvc?rev=925221&view=rev
Log:
make sure scheduling works with transactions

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=925221&r1=925220&r2=925221&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Fri Mar 19 14:08:07 2010
@@ -111,7 +111,10 @@ public class SchedulerBroker extends Bro
         Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
 
         if (cronValue != null || periodValue != null || delayValue != null) {
-            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend);
+            //clear transaction context
+            Message msg = messageSend.copy();
+            msg.setTransactionId(null);
+            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
                 if (cronValue != null) {
                     cronEntry = cronValue.toString();
                 }
@@ -121,12 +124,11 @@ public class SchedulerBroker extends Bro
                 if (delayValue != null) {
                     delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
                 }
-                Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+                Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
                 if (repeatValue != null) {
                     repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
                 }
-                
-                getInternalScheduler().schedule(messageSend.getMessageId().toString(),
+                getInternalScheduler().schedule(msg.getMessageId().toString(),
                         new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat);
             
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java?rev=925221&r1=925220&r2=925221&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java Fri Mar 19 14:08:07 2010
@@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Connection;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -103,6 +104,44 @@ public class JmsSchedulerTest extends Em
         latch.await(5, TimeUnit.SECONDS);
         assertEquals(latch.getCount(), 0);
     }
+    
+    public void testTransactedSchedule() throws Exception {
+        final int COUNT = 1;
+        Connection connection = createConnection();
+
+        final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                latch.countDown();
+                try {
+                    session.commit();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        connection.start();
+        long time = 5000;
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+
+        producer.send(message);
+        session.commit();
+        producer.close();
+        // make sure the message isn't delivered early
+        Thread.sleep(2000);
+        assertEquals(latch.getCount(), COUNT);
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(latch.getCount(), 0);
+    }
+
 
     public void testScheduleRepeated() throws Exception {
         final int NUMBER = 10;