You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/19 00:56:53 UTC
svn commit: r1094807 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
Author: tabish
Date: Mon Apr 18 22:56:53 2011
New Revision: 1094807
URL: http://svn.apache.org/viewvc?rev=1094807&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3278
fox for: https://issues.apache.org/jira/browse/AMQ-3271
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1094807&r1=1094806&r2=1094807&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Mon Apr 18 22:56:53 2011
@@ -117,52 +117,52 @@ public class SchedulerBroker extends Bro
String physicalName = messageSend.getDestination().getPhysicalName();
boolean schedularManage = physicalName.regionMatches(true, 0,
- ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
- ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
+ ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
+ ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
if (schedularManage == true) {
- JobScheduler scheduler = getInternalScheduler();
- ActiveMQDestination replyTo = messageSend.getReplyTo();
+ JobScheduler scheduler = getInternalScheduler();
+ ActiveMQDestination replyTo = messageSend.getReplyTo();
- String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
+ String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
- if (action != null ) {
+ if (action != null ) {
- Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
- Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
-
- if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
-
- if( startTime != null && endTime != null ) {
-
- long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
- long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
-
- for (Job job : scheduler.getAllJobs(start, finish)) {
- sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
- }
- } else {
- for (Job job : scheduler.getAllJobs()) {
- sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
- }
- }
- }
- if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
- scheduler.remove(jobId);
- } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
-
- if( startTime != null && endTime != null ) {
-
- long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
- long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
-
- scheduler.removeAllJobs(start, finish);
- } else {
- scheduler.removeAllJobs();
- }
- }
- }
+ Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
+ Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
+
+ if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
+
+ if( startTime != null && endTime != null ) {
+
+ long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
+ long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
+
+ for (Job job : scheduler.getAllJobs(start, finish)) {
+ sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
+ }
+ } else {
+ for (Job job : scheduler.getAllJobs()) {
+ sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
+ }
+ }
+ }
+ if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
+ scheduler.remove(jobId);
+ } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
+
+ if( startTime != null && endTime != null ) {
+
+ long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
+ long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
+
+ scheduler.removeAllJobs(start, finish);
+ } else {
+ scheduler.removeAllJobs();
+ }
+ }
+ }
} else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
//clear transaction context
@@ -197,7 +197,7 @@ public class SchedulerBroker extends Bro
Message messageSend = (Message) this.wireFormat.unmarshal(packet);
messageSend.setOriginalTransactionId(null);
Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
- Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+ Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
String cronStr = cronValue != null ? cronValue.toString() : null;
int repeat = 0;
if (repeatValue != null) {
@@ -208,7 +208,7 @@ public class SchedulerBroker extends Bro
// create a unique id - the original message could be sent
// lots of times
messageSend.setMessageId(
- new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
+ new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
}
// Add the jobId as a property
@@ -220,6 +220,30 @@ public class SchedulerBroker extends Bro
messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
+ if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) {
+
+ long oldExpiration = messageSend.getExpiration();
+ long newTimeStamp = System.currentTimeMillis();
+ long timeToLive = 0;
+ long oldTimestamp = messageSend.getTimestamp();
+
+ if (oldExpiration > 0) {
+ timeToLive = oldExpiration - oldTimestamp;
+ }
+
+ long expiration = timeToLive + newTimeStamp;
+
+ if(expiration > oldExpiration) {
+ if (timeToLive > 0 && expiration > 0) {
+ messageSend.setExpiration(expiration);
+ }
+ messageSend.setTimestamp(newTimeStamp);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Set message " + messageSend.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
+ }
+ }
+ }
+
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
@@ -253,37 +277,37 @@ public class SchedulerBroker extends Bro
return null;
}
- protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
- throws Exception {
+ protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
+ throws Exception {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
try {
Message msg = (Message) this.wireFormat.unmarshal(packet);
msg.setOriginalTransactionId(null);
- msg.setPersistent(false);
- msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
- msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
- msg.setDestination(replyTo);
- msg.setResponseRequired(false);
- msg.setProducerId(this.producerId);
+ msg.setPersistent(false);
+ msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+ msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
+ msg.setDestination(replyTo);
+ msg.setResponseRequired(false);
+ msg.setProducerId(this.producerId);
// Add the jobId as a property
- msg.setProperty("scheduledJobId", job.getJobId());
+ msg.setProperty("scheduledJobId", job.getJobId());
- final boolean originalFlowControl = context.isProducerFlowControl();
- final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
- producerExchange.setConnectionContext(context);
- producerExchange.setMutable(true);
- producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
- try {
- context.setProducerFlowControl(false);
- this.next.send(producerExchange, msg);
- } finally {
- context.setProducerFlowControl(originalFlowControl);
- }
+ final boolean originalFlowControl = context.isProducerFlowControl();
+ final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+ producerExchange.setConnectionContext(context);
+ producerExchange.setMutable(true);
+ producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
+ try {
+ context.setProducerFlowControl(false);
+ this.next.send(producerExchange, msg);
+ } finally {
+ context.setProducerFlowControl(originalFlowControl);
+ }
} catch (Exception e) {
LOG.error("Failed to send scheduled message " + job.getJobId(), e);
}
- }
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java?rev=1094807&r1=1094806&r2=1094807&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java Mon Apr 18 22:56:53 2011
@@ -56,18 +56,18 @@ public class JmsCronSchedulerTest extend
public void onMessage(Message message) {
latch.countDown();
count.incrementAndGet();
- LOG.debug("Received one Message, count is at: " + count.get());
+ LOG.debug("Received one Message, count is at: " + count.get());
}
});
connection.start();
for (int i = 0; i < COUNT; i++) {
- MessageProducer producer = session.createProducer(destination);
- TextMessage message = session.createTextMessage("test msg "+i);
- message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
- producer.send(message);
- producer.close();
- //wait a couple sec so cron start time is different for next message
+ MessageProducer producer = session.createProducer(destination);
+ TextMessage message = session.createTextMessage("test msg "+i);
+ message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
+ producer.send(message);
+ producer.close();
+ //wait a couple sec so cron start time is different for next message
Thread.sleep(2000);
}
SchedulerBroker sb = (SchedulerBroker) this.broker.getBroker().getAdaptor(SchedulerBroker.class);
@@ -79,6 +79,27 @@ public class JmsCronSchedulerTest extend
assertEquals(COUNT, count.get());
}
+ public void testCronScheduleWithTtlSet() throws Exception {
+
+ Connection connection = createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+ connection.start();
+
+ MessageProducer producer = session.createProducer(destination);
+ producer.setTimeToLive(TimeUnit.MINUTES.toMillis(1));
+ TextMessage message = session.createTextMessage("test msg ");
+ message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
+
+ producer.send(message);
+ producer.close();
+
+ Thread.sleep(TimeUnit.MINUTES.toMillis(2));
+
+ assertNotNull(consumer.receiveNoWait());
+ assertNull(consumer.receiveNoWait());
+ }
+
@Override
protected void setUp() throws Exception {
bindAddress = "vm://localhost";