You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/19 00:56:53 UTC

svn commit: r1094807 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java

Author: tabish
Date: Mon Apr 18 22:56:53 2011
New Revision: 1094807

URL: http://svn.apache.org/viewvc?rev=1094807&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3278
fox for: https://issues.apache.org/jira/browse/AMQ-3271

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1094807&r1=1094806&r2=1094807&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Mon Apr 18 22:56:53 2011
@@ -117,52 +117,52 @@ public class SchedulerBroker extends Bro
 
         String physicalName = messageSend.getDestination().getPhysicalName();
         boolean schedularManage = physicalName.regionMatches(true, 0,
-        		ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
-        		ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
+                ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
+                ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
 
         if (schedularManage == true) {
 
-        	JobScheduler scheduler = getInternalScheduler();
-	        ActiveMQDestination replyTo = messageSend.getReplyTo();
+            JobScheduler scheduler = getInternalScheduler();
+            ActiveMQDestination replyTo = messageSend.getReplyTo();
 
-	        String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
+            String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
 
-	        if (action != null ) {
+            if (action != null ) {
 
-	        	Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
-	        	Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
-
-		        if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
-
-		        	if( startTime != null && endTime != null ) {
-
-		                long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
-		                long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
-
-			        	for (Job job : scheduler.getAllJobs(start, finish)) {
-			        		sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
-			        	}
-		        	} else {
-			        	for (Job job : scheduler.getAllJobs()) {
-			        		sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
-			        	}
-		        	}
-		        }
-		        if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
-		        	scheduler.remove(jobId);
-		        } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
-
-		        	if( startTime != null && endTime != null ) {
-
-		                long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
-		                long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
-
-		                scheduler.removeAllJobs(start, finish);
-		        	} else {
-			        	scheduler.removeAllJobs();
-		        	}
-		        }
-	        }
+                Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
+                Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
+
+                if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
+
+                    if( startTime != null && endTime != null ) {
+
+                        long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
+                        long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
+
+                        for (Job job : scheduler.getAllJobs(start, finish)) {
+                            sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
+                        }
+                    } else {
+                        for (Job job : scheduler.getAllJobs()) {
+                            sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
+                        }
+                    }
+                }
+                if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
+                    scheduler.remove(jobId);
+                } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
+
+                    if( startTime != null && endTime != null ) {
+
+                        long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
+                        long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
+
+                        scheduler.removeAllJobs(start, finish);
+                    } else {
+                        scheduler.removeAllJobs();
+                    }
+                }
+            }
 
         } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
             //clear transaction context
@@ -197,7 +197,7 @@ public class SchedulerBroker extends Bro
             Message messageSend = (Message) this.wireFormat.unmarshal(packet);
             messageSend.setOriginalTransactionId(null);
             Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
-            Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+            Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
             String cronStr = cronValue != null ? cronValue.toString() : null;
             int repeat = 0;
             if (repeatValue != null) {
@@ -208,7 +208,7 @@ public class SchedulerBroker extends Bro
                 // create a unique id - the original message could be sent
                 // lots of times
                 messageSend.setMessageId(
-                		new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
+                        new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
             }
 
             // Add the jobId as a property
@@ -220,6 +220,30 @@ public class SchedulerBroker extends Bro
             messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
             messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
 
+            if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) {
+
+                long oldExpiration = messageSend.getExpiration();
+                long newTimeStamp = System.currentTimeMillis();
+                long timeToLive = 0;
+                long oldTimestamp = messageSend.getTimestamp();
+
+                if (oldExpiration > 0) {
+                    timeToLive = oldExpiration - oldTimestamp;
+                }
+
+                long expiration = timeToLive + newTimeStamp;
+
+                if(expiration > oldExpiration) {
+                    if (timeToLive > 0 && expiration > 0) {
+                        messageSend.setExpiration(expiration);
+                    }
+                    messageSend.setTimestamp(newTimeStamp);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Set message " + messageSend.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
+                    }
+                }
+            }
+
             final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
             producerExchange.setConnectionContext(context);
             producerExchange.setMutable(true);
@@ -253,37 +277,37 @@ public class SchedulerBroker extends Bro
         return null;
     }
 
-	protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
-			throws Exception {
+    protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
+            throws Exception {
 
         org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
         try {
             Message msg = (Message) this.wireFormat.unmarshal(packet);
             msg.setOriginalTransactionId(null);
-    		msg.setPersistent(false);
-    		msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
-    		msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
-    		msg.setDestination(replyTo);
-    		msg.setResponseRequired(false);
-    		msg.setProducerId(this.producerId);
+            msg.setPersistent(false);
+            msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+            msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
+            msg.setDestination(replyTo);
+            msg.setResponseRequired(false);
+            msg.setProducerId(this.producerId);
 
             // Add the jobId as a property
-    		msg.setProperty("scheduledJobId", job.getJobId());
+            msg.setProperty("scheduledJobId", job.getJobId());
 
-    		final boolean originalFlowControl = context.isProducerFlowControl();
-    		final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
-    		producerExchange.setConnectionContext(context);
-    		producerExchange.setMutable(true);
-    		producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
-    		try {
-    			context.setProducerFlowControl(false);
-    			this.next.send(producerExchange, msg);
-    		} finally {
-    			context.setProducerFlowControl(originalFlowControl);
-    		}
+            final boolean originalFlowControl = context.isProducerFlowControl();
+            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+            producerExchange.setConnectionContext(context);
+            producerExchange.setMutable(true);
+            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
+            try {
+                context.setProducerFlowControl(false);
+                this.next.send(producerExchange, msg);
+            } finally {
+                context.setProducerFlowControl(originalFlowControl);
+            }
         } catch (Exception e) {
             LOG.error("Failed to send scheduled message " + job.getJobId(), e);
         }
 
-	}
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java?rev=1094807&r1=1094806&r2=1094807&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java Mon Apr 18 22:56:53 2011
@@ -56,18 +56,18 @@ public class JmsCronSchedulerTest extend
             public void onMessage(Message message) {
                 latch.countDown();
                 count.incrementAndGet();
-            	LOG.debug("Received one Message, count is at: " + count.get());
+                LOG.debug("Received one Message, count is at: " + count.get());
             }
         });
 
         connection.start();
         for (int i = 0; i < COUNT; i++) {
-	        MessageProducer producer = session.createProducer(destination);
-	        TextMessage message = session.createTextMessage("test msg "+i);
-	        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
-	        producer.send(message);
-	        producer.close();
-	        //wait a couple sec so cron start time is different for next message
+            MessageProducer producer = session.createProducer(destination);
+            TextMessage message = session.createTextMessage("test msg "+i);
+            message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
+            producer.send(message);
+            producer.close();
+            //wait a couple sec so cron start time is different for next message
             Thread.sleep(2000);
         }
         SchedulerBroker sb = (SchedulerBroker) this.broker.getBroker().getAdaptor(SchedulerBroker.class);
@@ -79,6 +79,27 @@ public class JmsCronSchedulerTest extend
         assertEquals(COUNT, count.get());
     }
 
+    public void testCronScheduleWithTtlSet() throws Exception {
+
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+        connection.start();
+
+        MessageProducer producer = session.createProducer(destination);
+        producer.setTimeToLive(TimeUnit.MINUTES.toMillis(1));
+        TextMessage message = session.createTextMessage("test msg ");
+        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
+
+        producer.send(message);
+        producer.close();
+
+        Thread.sleep(TimeUnit.MINUTES.toMillis(2));
+
+        assertNotNull(consumer.receiveNoWait());
+        assertNull(consumer.receiveNoWait());
+    }
+
     @Override
     protected void setUp() throws Exception {
         bindAddress = "vm://localhost";