You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by mzehr <mz...@ingdirect.com> on 2012/06/21 19:04:35 UTC

AMQ_SCHEDULED_DELAY problem with camel

Let me start by saying I know schedulerSupport=true needs to be set in
activemq.xml.

I am trying to get the delay router sample posted by G Tully working with an
external instance of activemq (as opposed to an embedded broker).

I have a test case that demonstrate
AMQ_SCHEDULED_DELAY/schedulerSupport=True working properly with spring
alone.  This works as expected with either an embedded broker or an external
instance of activemq.

I have a test case with spring/camel that works with the embeded broker, but
not the external instance of active mq. (this is the same instance that
successfully demonstrates the delay when using spring alone).  

I have tried this with 5.4.2 and 5.6.0 with the same results.

This is the spring only test case that works in both cases

	@ContextConfiguration(locations={"classpath:test.xml"})
	public class TestWithSpring extends AbstractJUnit4SpringContextTests {
		
		@Test
		public void testSchedulerSupport() throws InterruptedException,
JMSException {
			ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)
applicationContext.getBean("connectionFactory");
			  ActiveMQConnection connection = (ActiveMQConnection)
factory.createConnection();
			  connection.start();
			  Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
			  ActiveMQQueue destination = new ActiveMQQueue("theQueue");
			  MessageProducer producer = session.createProducer(destination);
			  MessageConsumer consumer = session.createConsumer(destination);
			
	          TextMessage msg = session.createTextMessage("1st");
	          msg.setLongProperty("AMQ_SCHEDULED_DELAY", 60 * 1000);
			
			  producer.send(msg);
			  session.commit();
			  TextMessage m;
			  m = (TextMessage)consumer.receive(1000);
			  assertNull(m);
			  m = (TextMessage)consumer.receive(120 * 1000);
			
			  assertNotNull("got redelivery on second attempt", m);
			  assertEquals("text matches original", "1st", m.getText());

			  session.commit();
			
		}

}

test.xml 
...
   <bean id="connectionFactory"
class="org.apache.activemq.spring.ActiveMQConnectionFactory">
     <property name="brokerURL" value="tcp://127.0.0.1:61616" />
  </bean>


This is the test with camel.  It works if I use the embedded broker but not
the external instance of activemq

	@ContextConfiguration(locations={"classpath:testWithCamel.xml"})
	public class TestWithSpringAndCamel extends
AbstractJUnit4SpringContextTests {
		
		@Test
		public void testSchedulerSupport() throws InterruptedException,
JMSException {
			ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)
applicationContext.getBean("connectionFactory");
            ActiveMQConnection connection = (ActiveMQConnection)
factory.createConnection();
				
  	        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
	        policy.setMaximumRedeliveries(0);        
            connection.start();
			  
			  Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
			  ActiveMQQueue destination = new ActiveMQQueue("theQueue");
			  MessageProducer producer = session.createProducer(destination);
			  MessageConsumer consumer = session.createConsumer(destination);
			
	          TextMessage msg = session.createTextMessage("1st");
			  producer.send(msg);
			
			  session.commit();
			
		        TextMessage m;
		        m = (TextMessage)consumer.receive(1000);
		        assertNotNull(m);
		        
		        String msgTxt = m.getText();
		        assertEquals("1st", m.getText());        

		        session.rollback();			
			  
		        m = (TextMessage)consumer.receive(7000);
		        assertNull("no immediate redelivery", m);

		        m = (TextMessage)consumer.receive(120000);

		        assertNotNull("got redelivery on second attempt", m);
		        assertEquals("text matches original", "1st", m.getText());

		        // came from camel

		        assertTrue("redelivery marker header set, so came from camel",
m.getBooleanProperty("CamelRedeliveryMarker"));
			  session.commit();
		}
}


testWithCamel.xml
 <bean id="messageConverter"
class="org.apache.activemq.camel.converter.IdentityMessageReuseConverter" />

  <camel:camelContext id="mzCamel" trace="true">
  	
    <camel:route>
       <camel:from uri="activemq:ActiveMQ.DLQ?mapJmsMessage=false"/>
       <camel:setHeader
headerName="CamelRedeliveryMarker"><camel:constant>true</camel:constant></camel:setHeader>
        <camel:setHeader
headerName="AMQ_SCHEDULED_DELAY"><camel:constant>60000</camel:constant></camel:setHeader>
       <camel:to pattern="InOnly"
uri="activemq:theQueue?explicitQosEnabled=true&amp;messageConverter=#messageConverter"
/> 
    </camel:route>
  </camel:camelContext>





 
 

  <bean id="connectionFactory"
class="org.apache.activemq.spring.ActiveMQConnectionFactory">
     <property name="brokerURL" value="tcp://127.0.0.1:61616" />
  </bean>
 

  


--
View this message in context: http://camel.465427.n5.nabble.com/AMQ-SCHEDULED-DELAY-problem-with-camel-tp5714870.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: AMQ_SCHEDULED_DELAY problem with camel

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

What Camel version are you using?

You are using a custom message converter in the <to>. Can you double
check that you get the AMQ_DELAYED_xx header included.

And if you enable DEBUG/TRACE logging for org.apache.camel.component.jms
(notice there can be a lot of logging)

But that may help you too what the difference is between the working
vs non working.



On Thu, Jun 21, 2012 at 7:04 PM, mzehr <mz...@ingdirect.com> wrote:
> Let me start by saying I know schedulerSupport=true needs to be set in
> activemq.xml.
>
> I am trying to get the delay router sample posted by G Tully working with an
> external instance of activemq (as opposed to an embedded broker).
>
> I have a test case that demonstrate
> AMQ_SCHEDULED_DELAY/schedulerSupport=True working properly with spring
> alone.  This works as expected with either an embedded broker or an external
> instance of activemq.
>
> I have a test case with spring/camel that works with the embeded broker, but
> not the external instance of active mq. (this is the same instance that
> successfully demonstrates the delay when using spring alone).
>
> I have tried this with 5.4.2 and 5.6.0 with the same results.
>
> This is the spring only test case that works in both cases
>
>        @ContextConfiguration(locations={"classpath:test.xml"})
>        public class TestWithSpring extends AbstractJUnit4SpringContextTests {
>
>                @Test
>                public void testSchedulerSupport() throws InterruptedException,
> JMSException {
>                        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)
> applicationContext.getBean("connectionFactory");
>                          ActiveMQConnection connection = (ActiveMQConnection)
> factory.createConnection();
>                          connection.start();
>                          Session session = connection.createSession(true,
> Session.SESSION_TRANSACTED);
>                          ActiveMQQueue destination = new ActiveMQQueue("theQueue");
>                          MessageProducer producer = session.createProducer(destination);
>                          MessageConsumer consumer = session.createConsumer(destination);
>
>                  TextMessage msg = session.createTextMessage("1st");
>                  msg.setLongProperty("AMQ_SCHEDULED_DELAY", 60 * 1000);
>
>                          producer.send(msg);
>                          session.commit();
>                          TextMessage m;
>                          m = (TextMessage)consumer.receive(1000);
>                          assertNull(m);
>                          m = (TextMessage)consumer.receive(120 * 1000);
>
>                          assertNotNull("got redelivery on second attempt", m);
>                          assertEquals("text matches original", "1st", m.getText());
>
>                          session.commit();
>
>                }
>
> }
>
> test.xml
> ...
>   <bean id="connectionFactory"
> class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>     <property name="brokerURL" value="tcp://127.0.0.1:61616" />
>  </bean>
>
>
> This is the test with camel.  It works if I use the embedded broker but not
> the external instance of activemq
>
>        @ContextConfiguration(locations={"classpath:testWithCamel.xml"})
>        public class TestWithSpringAndCamel extends
> AbstractJUnit4SpringContextTests {
>
>                @Test
>                public void testSchedulerSupport() throws InterruptedException,
> JMSException {
>                        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)
> applicationContext.getBean("connectionFactory");
>            ActiveMQConnection connection = (ActiveMQConnection)
> factory.createConnection();
>
>                RedeliveryPolicy policy = connection.getRedeliveryPolicy();
>                policy.setMaximumRedeliveries(0);
>            connection.start();
>
>                          Session session = connection.createSession(true,
> Session.SESSION_TRANSACTED);
>                          ActiveMQQueue destination = new ActiveMQQueue("theQueue");
>                          MessageProducer producer = session.createProducer(destination);
>                          MessageConsumer consumer = session.createConsumer(destination);
>
>                  TextMessage msg = session.createTextMessage("1st");
>                          producer.send(msg);
>
>                          session.commit();
>
>                        TextMessage m;
>                        m = (TextMessage)consumer.receive(1000);
>                        assertNotNull(m);
>
>                        String msgTxt = m.getText();
>                        assertEquals("1st", m.getText());
>
>                        session.rollback();
>
>                        m = (TextMessage)consumer.receive(7000);
>                        assertNull("no immediate redelivery", m);
>
>                        m = (TextMessage)consumer.receive(120000);
>
>                        assertNotNull("got redelivery on second attempt", m);
>                        assertEquals("text matches original", "1st", m.getText());
>
>                        // came from camel
>
>                        assertTrue("redelivery marker header set, so came from camel",
> m.getBooleanProperty("CamelRedeliveryMarker"));
>                          session.commit();
>                }
> }
>
>
> testWithCamel.xml
>  <bean id="messageConverter"
> class="org.apache.activemq.camel.converter.IdentityMessageReuseConverter" />
>
>  <camel:camelContext id="mzCamel" trace="true">
>
>    <camel:route>
>       <camel:from uri="activemq:ActiveMQ.DLQ?mapJmsMessage=false"/>
>       <camel:setHeader
> headerName="CamelRedeliveryMarker"><camel:constant>true</camel:constant></camel:setHeader>
>        <camel:setHeader
> headerName="AMQ_SCHEDULED_DELAY"><camel:constant>60000</camel:constant></camel:setHeader>
>       <camel:to pattern="InOnly"
> uri="activemq:theQueue?explicitQosEnabled=true&amp;messageConverter=#messageConverter"
> />
>    </camel:route>
>  </camel:camelContext>
>
>
>
>
>
>
>
>
>  <bean id="connectionFactory"
> class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>     <property name="brokerURL" value="tcp://127.0.0.1:61616" />
>  </bean>
>
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/AMQ-SCHEDULED-DELAY-problem-with-camel-tp5714870.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen