You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/03/19 15:08:07 UTC
svn commit: r925221 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
Author: rajdavies
Date: Fri Mar 19 14:08:07 2010
New Revision: 925221
URL: http://svn.apache.org/viewvc?rev=925221&view=rev
Log:
make sure scheduling works with transactions
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=925221&r1=925220&r2=925221&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Fri Mar 19 14:08:07 2010
@@ -111,7 +111,10 @@ public class SchedulerBroker extends Bro
Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
if (cronValue != null || periodValue != null || delayValue != null) {
- org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend);
+ //clear transaction context
+ Message msg = messageSend.copy();
+ msg.setTransactionId(null);
+ org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
if (cronValue != null) {
cronEntry = cronValue.toString();
}
@@ -121,12 +124,11 @@ public class SchedulerBroker extends Bro
if (delayValue != null) {
delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
}
- Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+ Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
if (repeatValue != null) {
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
}
-
- getInternalScheduler().schedule(messageSend.getMessageId().toString(),
+ getInternalScheduler().schedule(msg.getMessageId().toString(),
new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java?rev=925221&r1=925220&r2=925221&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java Fri Mar 19 14:08:07 2010
@@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -103,6 +104,44 @@ public class JmsSchedulerTest extends Em
latch.await(5, TimeUnit.SECONDS);
assertEquals(latch.getCount(), 0);
}
+
+ public void testTransactedSchedule() throws Exception {
+ final int COUNT = 1;
+ Connection connection = createConnection();
+
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ final CountDownLatch latch = new CountDownLatch(COUNT);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ latch.countDown();
+ try {
+ session.commit();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ connection.start();
+ long time = 5000;
+ MessageProducer producer = session.createProducer(destination);
+ TextMessage message = session.createTextMessage("test msg");
+
+ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+
+ producer.send(message);
+ session.commit();
+ producer.close();
+ // make sure the message isn't delivered early
+ Thread.sleep(2000);
+ assertEquals(latch.getCount(), COUNT);
+ latch.await(5, TimeUnit.SECONDS);
+ assertEquals(latch.getCount(), 0);
+ }
+
public void testScheduleRepeated() throws Exception {
final int NUMBER = 10;