You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Sergey Zhemzhitsky (Created) (JIRA)" <ji...@apache.org> on 2012/01/20 11:03:40 UTC

[jira] [Created] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
-----------------------------------------------------------------------------------------------------------------------------------------------------

                 Key: CAMEL-4925
                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
             Project: Camel
          Issue Type: Bug
          Components: camel-core
    Affects Versions: 2.8.0
            Reporter: Sergey Zhemzhitsky
         Attachments: CamelRoutingTest.java

ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.

{code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
public boolean process(Exchange exchange, AsyncCallback callback) {
    if (shutdown.get()) {
        throw new IllegalStateException("ThreadsProcessor is not running.");
    }

    ProcessCall call = new ProcessCall(exchange, callback);
    try {
        executorService.submit(call);
        // tell Camel routing engine we continue routing asynchronous
        return false;
    } catch (RejectedExecutionException e) {
        if (isCallerRunsWhenRejected()) {
            if (shutdown.get()) {
                exchange.setException(new RejectedExecutionException());
            } else {
                callback.done(true);
            }
        } else {
            exchange.setException(e);
        }
        return true;
    }
}
{code}

Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Sergey Zhemzhitsky (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13190696#comment-13190696 ] 

Sergey Zhemzhitsky commented on CAMEL-4925:
-------------------------------------------

Claus, thanks a lot.

If you don't mind I will create a new jira improvement to add a blocked policy. It's not more dangerous than CallerRuns strategy, which also may block the main thread forever (for example when all the threads in the pool are busy, the working queue is full and the next submitted task is never-ending). Also once the main thread is executing the task no one be able to submit new tasks with CallerRuns strategy.
                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CAMEL-4925.patch, CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Sergey Zhemzhitsky (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sergey Zhemzhitsky updated CAMEL-4925:
--------------------------------------

    Attachment: CAMEL-4925.patch
    
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CAMEL-4925.patch, CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Assigned] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Claus Ibsen (Assigned) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen reassigned CAMEL-4925:
----------------------------------

    Assignee: Claus Ibsen
    
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Claus Ibsen (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13190435#comment-13190435 ] 

Claus Ibsen commented on CAMEL-4925:
------------------------------------

After looking into this for a bit, then the JDK does *not* offer good APIs for being able to do custom logic when rejected, by which you get access to the inner details of the runnable task you submitted to the thread pool. That means for DiscardOldest, you do *not* know which Exchange is to be discarded, as you cannot get access to the Exchange. The JDK ThreadPoolExecutor will wrap the runnable/FutureTask using an adapter, which does not expose API for you to get access to the Exchange.

Even if you create a custom FutureTask and submit that, then it's the adapter you get when the task is rejected. And the adapter does not allow to get you to your custom FutureTask.

So the best we can do is to deny supporting DiscardOldest, and then we can handle Abort and Discard in the ThreadsProcessor, where we can mark the exchange to stop routing, so the exchange will be done, which means that it will be unregistered from the inflight registry and whatnot.
                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Claus Ibsen (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13190647#comment-13190647 ] 

Claus Ibsen commented on CAMEL-4925:
------------------------------------

Sergey,

Thanks for the patch. This seems promising. I am polishing this a bit
- discard and discard oldest = the exchange will not continue routing, done successfully
- abort = the exchange will not continue routing, an exception will be set, done with a failure

Then you can decide to use between abort or discard, to chose between whether the exchange should done with a failure or not.

I am removing the blocked policy, as its not tested, and we have no people asking for it. Don't give people robe to hang themselves.

And yeah if end user provide their own thread pool, then they are responsible for handling that situation. 

                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CAMEL-4925.patch, CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Sergey Zhemzhitsky (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13190471#comment-13190471 ] 

Sergey Zhemzhitsky commented on CAMEL-4925:
-------------------------------------------

Hi Claus,
I noticed the same things you mentioned and attached a sample patch that can possibly be used to support correct rejection of submitted tasks. The idea is to provide our own _ThreadPoolExecutor_ and _ScheduledThreadPoolExecutor_ from _DefaultThreadPoolFactory_. These pools wrap submitted tasks with custom _FutureTask_ that supports rejection. _RejectedExecutionHandlers_ in the _ThreadPoolRejectedPolicy_ are also changed to check whether the discarded tasks can be rejected (I also added blocking policy into the _ThreadPoolRejectedPolicy_).

So if submitted tasks do not implement _Rejectable_ interface the behavior is exactly the same as with ordinary _ThreadPoolExecutor_ and _ScheduledThreadPoolExecutor_. If these tasks are _Rejectable_, their _reject()_ method will be called. 

The only problem is what to do if the user provides its own ExecutorService to configure the ThreadsProcessor. I suppose in that case this user should be fully responsible for handling rejections correctly (it should be mentions in camel docs or somewhere else).
                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CAMEL-4925.patch, CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Claus Ibsen (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13191133#comment-13191133 ] 

Claus Ibsen commented on CAMEL-4925:
------------------------------------

Thanks for the contribution. I polished it a bit, and fixed CS. You may want to read about building with checkstyle here:
http://camel.apache.org/building.html

This outputs a report if the source code is not aligned with the checkstyle we have in place. eg checking for missing license headers, and code formatting and the likes.
                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CAMEL-4925.patch, CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Claus Ibsen (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen updated CAMEL-4925:
-------------------------------

         Priority: Minor  (was: Major)
    Fix Version/s: 2.10.0

We would need to wrap those 2 policies, and then remove the exchange from them inflight registry, when the rejectedExecutor callback is invoked.
                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Claus Ibsen (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13189758#comment-13189758 ] 

Claus Ibsen commented on CAMEL-4925:
------------------------------------

Yeah we would need to check in ThreadsDefinition if you have configured Discard and DiscardOldest. And then wrap those policies with a custom policy, so we get the callback from the JDK when the task is rejected.

The discard, would possible not be needed, as I would assume the thread pool will reject it asap, when you try to submit it. But the discard oldest, is an existing task from the task queue, so that is a different story.

Then we need to provide this as a callback to the ThreadsProcessor, so it can do custom logic when the RejectedExecutionHandler#rejectedExecution is invoked. We can then from the Runnable parameter cast that to ProcessCall, and then get access to the Exchange. Then we can set on the Exchange a RejectedExecutionException as exception, and invoke its callback. Then Camel will take it from there to remove the exchange from inflight registry and whatnot.

Something along the lines of that. Its a shame the API of the ExecutorService do not have a runnable for rejected execution. Then it would have been easier.
                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Resolved] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Claus Ibsen (Resolved) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen resolved CAMEL-4925.
--------------------------------

    Resolution: Fixed
    
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CAMEL-4925.patch, CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Issue Comment Edited] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Claus Ibsen (Issue Comment Edited) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13189758#comment-13189758 ] 

Claus Ibsen edited comment on CAMEL-4925 at 1/20/12 12:26 PM:
--------------------------------------------------------------

Yeah we would need to check in ThreadsDefinition if you have configured Discard and DiscardOldest. And then wrap those policies with a custom policy, so we get the callback from the JDK when the task is rejected.

The discard, would possible not be needed, as I would assume the thread pool will reject it asap, when you try to submit it. But the discard oldest, is an existing task from the task queue, so that is a different story.

Then we need to provide this as a callback to the ThreadsProcessor, so it can do custom logic when the RejectedExecutionHandler#rejectedExecution is invoked. We can then from the Runnable parameter cast that to ProcessCall, and then get access to the Exchange. Then we can set on the Exchange a RejectedExecutionException as exception, and invoke its callback. Then Camel will take it from there to remove the exchange from inflight registry and whatnot.

Something along the lines of that. Its a shame the API of the ExecutorService do not have a 2nd optional runnable parameter for rejected execution. Then it would have been easier. As you just submit 2 runnable's. And the JDK will invoke the appropriate.
                
      was (Author: davsclaus):
    Yeah we would need to check in ThreadsDefinition if you have configured Discard and DiscardOldest. And then wrap those policies with a custom policy, so we get the callback from the JDK when the task is rejected.

The discard, would possible not be needed, as I would assume the thread pool will reject it asap, when you try to submit it. But the discard oldest, is an existing task from the task queue, so that is a different story.

Then we need to provide this as a callback to the ThreadsProcessor, so it can do custom logic when the RejectedExecutionHandler#rejectedExecution is invoked. We can then from the Runnable parameter cast that to ProcessCall, and then get access to the Exchange. Then we can set on the Exchange a RejectedExecutionException as exception, and invoke its callback. Then Camel will take it from there to remove the exchange from inflight registry and whatnot.

Something along the lines of that. Its a shame the API of the ExecutorService do not have a runnable for rejected execution. Then it would have been easier.
                  
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Sergey Zhemzhitsky (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sergey Zhemzhitsky updated CAMEL-4925:
--------------------------------------

    Attachment: CamelRoutingTest.java
    
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>         Attachments: CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Issue Comment Edited] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Sergey Zhemzhitsky (Issue Comment Edited) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13190696#comment-13190696 ] 

Sergey Zhemzhitsky edited comment on CAMEL-4925 at 1/22/12 6:54 PM:
--------------------------------------------------------------------

Claus, thanks a lot.

I've done some experiments with the blocking policy that seems rather dangerous, as threads in the pool may become timed out before the rejected task will be put into the queue. So you're right that this policy has to be removed.
                
      was (Author: szhemzhitsky):
    Claus, thanks a lot.

If you don't mind I will create a new jira improvement to add a blocked policy. It's not more dangerous than CallerRuns strategy, which also may block the main thread forever (for example when all the threads in the pool are busy, the working queue is full and the next submitted task is never-ending). Also once the main thread is executing the task no one be able to submit new tasks with CallerRuns strategy.
                  
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CAMEL-4925.patch, CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Sergey Zhemzhitsky (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13191252#comment-13191252 ] 

Sergey Zhemzhitsky commented on CAMEL-4925:
-------------------------------------------

{quote}
I polished it a bit, and fixed CS. You may want to read about building with checkstyle here:
http://camel.apache.org/building.html
{quote}
Hi Claus, thanks for providing this useful link.
                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CAMEL-4925.patch, CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-4925) ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.

Posted by "Sergey Zhemzhitsky (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13189748#comment-13189748 ] 

Sergey Zhemzhitsky commented on CAMEL-4925:
-------------------------------------------

So if we wrap these two policies, how to know which exchange must be removed from the inflight repository?
                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or DiscardOldestPolicy executorService will no throw RejectedExecutionException, so exchange remains unprocessed and count of inflight exchanges will not be decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira