You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by selva <sg...@gmail.com> on 2014/12/02 11:54:45 UTC

Re: Quartz clustering in camel spring DSL - JIRA CAMEL-8076

Hi Willem,

I have downloaded(2.14.x from github) and debugged the camel quartz
component source code and found the reason for the issue i am facing.

As i already posted i am facing issue in the cluster environment, deployed
the camel application as stand alone program running 2 instances. While
executing a route if one instance goes down the other instance should
trigger the same route immediately as we have configured
*recoverableJob=true* in the quartz2 endpoint .

The issue i was facing like the second instance is triggering quartz2
endpoint but not executing my process(QueryBuilderProcessor) .

Example :

<route id="quartz" trace="true">
                        <from
uri="quartz2://cluster/quartz?cron=0+0/4+++*+?&durableJob=true&stateful=true&recoverableJob=true">
                      <to uri="bean:QueryBuilderProcessor" />
</route>


While debugging i found that in camelJob.java, the triggerkey in the normal
and the triggerykey during recovery is different,so the below condition is
failing and going else block(Please check the highlighted else block).


   public void execute(JobExecutionContext context) throws
JobExecutionException {
        Exchange exchange = null;
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Running CamelJob jobExecutionContext={}",
context);
            }

            CamelContext camelContext = getCamelContext(context);
            QuartzEndpoint endpoint = lookupQuartzEndpoint(camelContext,
context);//*  Step 1 : * 
            exchange = endpoint.createExchange();
            exchange.setIn(new QuartzMessage(exchange, context));
            endpoint.getConsumerLoadBalancer().process(exchange);//*Step :2* 



*  Step 1 : * Quartz endpoint creation


          if (triggerKey.equals(checkTriggerKey){
                     return quartzEndpoint;
           }

 
        if (camelContext.hasEndpoint(endpointUri) != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Getting Endpoint from camelContext.");
            }
            result = camelContext.getEndpoint(endpointUri,
QuartzEndpoint.class);
        } else {
            LOG.warn("Cannot find existing QuartzEndpoint with uri: {}.*
Creating new endpoint instance.", endpointUri);*
            result = camelContext.getEndpoint(endpointUri,
QuartzEndpoint.class);
        }



Since its creating new quartz endpoint , the consumerLoadBalancer Object is
null.

So as next step its executing the below code and creating *New *
consumerLoadBalancer Object

*Step :2* 


 public LoadBalancer getConsumerLoadBalancer() {
        if (consumerLoadBalancer == null) {
            consumerLoadBalancer = new RoundRobinLoadBalancer();
        }
        return consumerLoadBalancer;
    }

 Since its creating as a new Object ,the  processors property is empty so
not calling my QueryBuilderProcessor Processor.

In normal flow(not recovery) i can see list of processors details in the
processors property of consumerLoadBalancer .

*Temporary Fix for Testing:* Below Code working fine in Recovery flow with
immediate retry.

As a temporary fix we have modified code in the camelJob.java ,
lookupQuartzEndpoint method like below
(Please check the highlighted text)

    protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext,
JobExecutionContext quartzContext) throws JobExecutionException {
        TriggerKey triggerKey = quartzContext.getTrigger().getKey();
       * JobDetail jobDetail = quartzContext.getJobDetail();
        JobKey jobKey =  jobDetail.getKey();
        *
        if (LOG.isDebugEnabled()) {
            LOG.debug("Looking up existing QuartzEndpoint with
triggerKey={}", triggerKey);
        }

        // check all active routes for the quartz endpoint this task matches
        // as we prefer to use the existing endpoint from the routes
        for (Route route : camelContext.getRoutes()) {
            if (route.getEndpoint() instanceof QuartzEndpoint) {
                QuartzEndpoint quartzEndpoint = (QuartzEndpoint)
route.getEndpoint();
                TriggerKey checkTriggerKey = quartzEndpoint.getTriggerKey();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Checking route endpoint={} with
checkTriggerKey={}", quartzEndpoint, checkTriggerKey);
                }
               if
(triggerKey.equals(checkTriggerKey)*||(jobDetail.requestsRecovery()== true
&&
jobKey.getGroup().equals(checkTriggerKey.getGroup())&&jobKey.getName().equals(checkTriggerKey.getName())))
{*
                    return quartzEndpoint;
               }
            }
        }
                    return quartzEndpoint;
               }


Except the triggerKey remaning all jobdetails are same in the existing
quartzEndpoint so we put the condition check to return the existing quartz
endpoint instead creating new quartz endpoint in the recoveryflow.

* Summary :* 
The problem we found is while creating new quartz endpoint the
consumerLoadBalancer is null.

Please let us know the right way  to get the consumerLoadBalancer values
while creating new quartz endpoint in case of recovery flow.

Thanks,
selva





--
View this message in context: http://camel.465427.n5.nabble.com/Quartz-clustering-in-camel-spring-DSL-JIRA-CAMEL-8076-tp5759589p5759928.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Quartz clustering in camel spring DSL - JIRA CAMEL-8076

Posted by Willem Jiang <wi...@gmail.com>.
Hi Selva,

Thanks for sharing your patch with us. Your patch looks good to me, and I managed to reproduce the error by doing some change in the SpringQuartzConsumerTwoAppsClusteredRecoveryTest. 
I will commit the patch shortly.

Regards,

--  
Willem Jiang

Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem



On December 2, 2014 at 7:23:03 PM, selva (sgpselvam@gmail.com) wrote:
> Hi Willem,
>  
> I have downloaded(2.14.x from github) and debugged the camel quartz
> component source code and found the reason for the issue i am facing.
>  
> As i already posted i am facing issue in the cluster environment, deployed
> the camel application as stand alone program running 2 instances. While
> executing a route if one instance goes down the other instance should
> trigger the same route immediately as we have configured
> *recoverableJob=true* in the quartz2 endpoint .
>  
> The issue i was facing like the second instance is triggering quartz2
> endpoint but not executing my process(QueryBuilderProcessor) .
>  
> Example :
>  
>  
> > uri="quartz2://cluster/quartz?cron=0+0/4+++*+?&durableJob=true&stateful=true&recoverableJob=true">  
>  
>  
>  
>  
> While debugging i found that in camelJob.java, the triggerkey in the normal
> and the triggerykey during recovery is different,so the below condition is
> failing and going else block(Please check the highlighted else block).
>  
>  
> public void execute(JobExecutionContext context) throws
> JobExecutionException {
> Exchange exchange = null;
> try {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Running CamelJob jobExecutionContext={}",
> context);
> }
>  
> CamelContext camelContext = getCamelContext(context);
> QuartzEndpoint endpoint = lookupQuartzEndpoint(camelContext,
> context);//* Step 1 : *
> exchange = endpoint.createExchange();
> exchange.setIn(new QuartzMessage(exchange, context));
> endpoint.getConsumerLoadBalancer().process(exchange);//*Step :2*
>  
>  
>  
> * Step 1 : * Quartz endpoint creation
>  
>  
> if (triggerKey.equals(checkTriggerKey){
> return quartzEndpoint;
> }
>  
>  
> if (camelContext.hasEndpoint(endpointUri) != null) {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Getting Endpoint from camelContext.");
> }
> result = camelContext.getEndpoint(endpointUri,
> QuartzEndpoint.class);
> } else {
> LOG.warn("Cannot find existing QuartzEndpoint with uri: {}.*
> Creating new endpoint instance.", endpointUri);*
> result = camelContext.getEndpoint(endpointUri,
> QuartzEndpoint.class);
> }
>  
>  
>  
> Since its creating new quartz endpoint , the consumerLoadBalancer Object is
> null.
>  
> So as next step its executing the below code and creating *New *
> consumerLoadBalancer Object
>  
> *Step :2*
>  
>  
> public LoadBalancer getConsumerLoadBalancer() {
> if (consumerLoadBalancer == null) {
> consumerLoadBalancer = new RoundRobinLoadBalancer();
> }
> return consumerLoadBalancer;
> }
>  
> Since its creating as a new Object ,the processors property is empty so
> not calling my QueryBuilderProcessor Processor.
>  
> In normal flow(not recovery) i can see list of processors details in the
> processors property of consumerLoadBalancer .
>  
> *Temporary Fix for Testing:* Below Code working fine in Recovery flow with
> immediate retry.
>  
> As a temporary fix we have modified code in the camelJob.java ,
> lookupQuartzEndpoint method like below
> (Please check the highlighted text)
>  
> protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext,
> JobExecutionContext quartzContext) throws JobExecutionException {
> TriggerKey triggerKey = quartzContext.getTrigger().getKey();
> * JobDetail jobDetail = quartzContext.getJobDetail();
> JobKey jobKey = jobDetail.getKey();
> *
> if (LOG.isDebugEnabled()) {
> LOG.debug("Looking up existing QuartzEndpoint with
> triggerKey={}", triggerKey);
> }
>  
> // check all active routes for the quartz endpoint this task matches
> // as we prefer to use the existing endpoint from the routes
> for (Route route : camelContext.getRoutes()) {
> if (route.getEndpoint() instanceof QuartzEndpoint) {
> QuartzEndpoint quartzEndpoint = (QuartzEndpoint)
> route.getEndpoint();
> TriggerKey checkTriggerKey = quartzEndpoint.getTriggerKey();
> if (LOG.isTraceEnabled()) {
> LOG.trace("Checking route endpoint={} with
> checkTriggerKey={}", quartzEndpoint, checkTriggerKey);
> }
> if
> (triggerKey.equals(checkTriggerKey)*||(jobDetail.requestsRecovery()== true  
> &&
> jobKey.getGroup().equals(checkTriggerKey.getGroup())&&jobKey.getName().equals(checkTriggerKey.getName())))  
> {*
> return quartzEndpoint;
> }
> }
> }
> return quartzEndpoint;
> }
>  
>  
> Except the triggerKey remaning all jobdetails are same in the existing
> quartzEndpoint so we put the condition check to return the existing quartz
> endpoint instead creating new quartz endpoint in the recoveryflow.
>  
> * Summary :*
> The problem we found is while creating new quartz endpoint the
> consumerLoadBalancer is null.
>  
> Please let us know the right way to get the consumerLoadBalancer values
> while creating new quartz endpoint in case of recovery flow.
>  
> Thanks,
> selva
>  
>  
>  
>  
>  
> --
> View this message in context: http://camel.465427.n5.nabble.com/Quartz-clustering-in-camel-spring-DSL-JIRA-CAMEL-8076-tp5759589p5759928.html  
> Sent from the Camel - Users mailing list archive at Nabble.com.
>