You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by "Christian Müller (JIRA)" <ji...@apache.org> on 2011/06/19 13:20:47 UTC

[jira] [Created] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Improve the Aggregator to be able to force a flush all aggregated exchanges
---------------------------------------------------------------------------

                 Key: CAMEL-4118
                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
             Project: Camel
          Issue Type: Improvement
    Affects Versions: 2.7.2
            Reporter: Christian Müller
             Fix For: 2.9.0


Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".

A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.


We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Assigned] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ben O'Day reassigned CAMEL-4118:
--------------------------------

    Assignee: Ben O'Day

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Resolved] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ben O'Day resolved CAMEL-4118.
------------------------------

    Resolution: Fixed

added this support as discussed, updated 2.9 release notes and Aggregator2 docs...

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Claus Ibsen (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13066599#comment-13066599 ] 

Claus Ibsen commented on CAMEL-4118:
------------------------------------

Also it would be good to have unit tests in camel-hawtdb and camel-sql as they have a persistent repository.

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Christian Müller (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13063577#comment-13063577 ] 

Christian Müller commented on CAMEL-4118:
-----------------------------------------

Of curse. Imagine you you process a big file which is splitted per each line and in the aggregator grouped by e.g. the manufacturer. We don't know how many messages the aggregator will receive. After we splitted and grouped all individual messages, the splitter send a "signal" message to the aggregator to flush *ALL* messages, independent from the group.
Hope this makes it a bit cleaner.

Best,
Christian

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Christian Müller (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13064832#comment-13064832 ] 

Christian Müller commented on CAMEL-4118:
-----------------------------------------

Hey Ben!

I had a quick look on it. Good work. Here are my comments:

Instead of writing:
{code:java}
String completeAllGroups = (String) exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
if (completeAllGroups != null && completeAllGroups.equals("true")) {
    forceCompletionOfAllGroups();
    return;
}
{code}

you could write:
{code:java}
Boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, Boolean.class);
if (completeAllGroups == Boolean.TRUE) {
    forceCompletionOfAllGroups();
    return;
}
{code}
and also benefit from the Camel type converter mechanism (the user can set the header as a boolean or String value).

AggregateForceCompletionHeaderTest and AggregateForceCompletionOnStopTest didn't check the header "Exchange.AGGREGATED_COMPLETED_BY" which is/should be set.

Do we need the MyCompletionProcessor for the tests or could we also use the Mock component?

Sorry for being petty... ;-)

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4097-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13063533#comment-13063533 ] 

Ben O'Day commented on CAMEL-4118:
----------------------------------

hmmm...seems like the completionPredicate() does the job of letting you dynamically determine when to complete the exchange...as long as you can set it on the last message in your group, etc.  Can you give me a more detailed example of what you are looking for...

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13064842#comment-13064842 ] 

Ben O'Day commented on CAMEL-4118:
----------------------------------

thanks Christian....agreed on the header/type conversion conmments...I'll post an updated patch soon.  For MyCompletionProcessor, I used it only because I couldn't validate a mock endpoint after calling context.stop().  If you have any other ideas on this...let me know

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4097-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Updated] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ben O'Day updated CAMEL-4118:
-----------------------------

    Attachment: CAMEL-4097-4118.patch

here is a first cut at a patch for both this and CAMEL-4097...will hold off until 2.9, but wanted to get some feedback on this prior to committing...

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4097-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Christian Müller (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13064098#comment-13064098 ] 

Christian Müller commented on CAMEL-4118:
-----------------------------------------

Sorry, I was not able to respond earlier...

Yes, that's exactly what I mean. Thanks for taking the stab on this.

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13064055#comment-13064055 ] 

Ben O'Day commented on CAMEL-4118:
----------------------------------

OK, I see the case now...the completionPredicate() only applies to the current group...not ALL groups.  I'll add a check in the AggregateProcessor to look for a specific header ("completeAllGroups", etc) to force completion on ALL groups immediately.

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Issue Comment Edited] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Claus Ibsen (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13065063#comment-13065063 ] 

Claus Ibsen edited comment on CAMEL-4118 at 7/14/11 6:02 AM:
-------------------------------------------------------------

There are 2 patches in one ;)

1)
In AggregationDefinition, you should use isForceCompletionOnStop when setting the value, just like the others.

2)
I wonder when aggregating an Exchange having AGGREGATION_COMPLETE_ALL_GROUPS as true, why should you not also aggregate that message, as it may contain data.

3)
In forceCompletionOfAllGroups you check if CamelContext is running. The log message is wrong. And it should be a WARN instead indicating it cannot force completion of all groups.

4)
There is a problem with the force completion on stop option. As when stopping, you do not wait for all the exchanges to complete, before you continue and shutdown the thread pools etc. So this logic is more complicated. 

I suggest to at first implement this ticket. And then later revisit the logic needed to safely flush on stopping the aggregator. The logic to do that is more complicated.

      was (Author: davsclaus):
    There are 2 patches in one ;)

1)
In AggregationDefinition, you should use isForceCompletionOnStop when setting the value, just like the others.

2)
I wonder when aggregating an Exchange having AGGREGATION_COMPLETE_ALL_GROUPS as true, why should you not also aggregate that message, as it may contain data.

3)
In forceCompletionOfAllGroups you check if CamelContext is running. The log message is wrong. And it should be a WARN instead indicating it cannot force completion of all groups.

4)
There is a problem with the force completion on stop option. As when stopping, you do not wait for all the exchanges to complete, before you continue and shutdown the thread pools etc. So this logic is more complicated. 

I suggest to at first implement this ticket. And then later revisit the logic needed to safely flash on stopping the aggregator. The logic to do that is more complicated.
  
> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4097-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Updated] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ben O'Day updated CAMEL-4118:
-----------------------------

    Attachment: CAMEL-4118.patch

thanks Claus, here is an updated patch to simply support forcing completion of all groups based on a "signal" message with a specific header set. Note that the signal message is NOT aggregated itself...per the initial request...

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4097-4118.patch, CAMEL-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Updated] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ben O'Day updated CAMEL-4118:
-----------------------------

    Attachment:     (was: CAMEL-4118.patch)

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Claus Ibsen (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13065063#comment-13065063 ] 

Claus Ibsen commented on CAMEL-4118:
------------------------------------

There are 2 patches in one ;)

1)
In AggregationDefinition, you should use isForceCompletionOnStop when setting the value, just like the others.

2)
I wonder when aggregating an Exchange having AGGREGATION_COMPLETE_ALL_GROUPS as true, why should you not also aggregate that message, as it may contain data.

3)
In forceCompletionOfAllGroups you check if CamelContext is running. The log message is wrong. And it should be a WARN instead indicating it cannot force completion of all groups.

4)
There is a problem with the force completion on stop option. As when stopping, you do not wait for all the exchanges to complete, before you continue and shutdown the thread pools etc. So this logic is more complicated. 

I suggest to at first implement this ticket. And then later revisit the logic needed to safely flash on stopping the aggregator. The logic to do that is more complicated.

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4097-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13063605#comment-13063605 ] 

Ben O'Day commented on CAMEL-4118:
----------------------------------

thanks Christian.  for splitter(), you could just use the Exchange.SPLIT_COMPLETE header that is included in the last message, like this...

from("file:/tmp/myBigFiles")
.split(body().tokenize("\n"))
.aggregate(constant(true), new MyAggregationStrategy())
    .eagerCheckCompletion().completionPredicate(header(Exchange.SPLIT_COMPLETE).isEqualTo(true))

If you are doing other custom splitting of messages, then I'd think you'd be able to set a header on the last message as easily as you could send a separate "flush" message...I guess it depends on what other EIPs are involved, etc...




> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Work started] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Work on CAMEL-4118 started by Ben O'Day.

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4097-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Updated] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ben O'Day updated CAMEL-4118:
-----------------------------

    Attachment:     (was: CAMEL-4097-4118.patch)

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Updated] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Ben O'Day (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ben O'Day updated CAMEL-4118:
-----------------------------

    Attachment: CAMEL-4118.patch

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Christian Müller (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13051674#comment-13051674 ] 

Christian Müller commented on CAMEL-4118:
-----------------------------------------

Instead of calling "getKeys" on the aggregator, we could also provide a AggregationStrategy on the Splitter directly, which will collect all the aggregation keys. In this case the bean can take the list of keys from the exchange.

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>             Fix For: 2.9.0
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (CAMEL-4118) Improve the Aggregator to be able to force a flush all aggregated exchanges

Posted by "Claus Ibsen (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13066598#comment-13066598 ] 

Claus Ibsen commented on CAMEL-4118:
------------------------------------

In the AggregateProcessorTest unit test, I suggest to have multiple correlation groups in the aggregator when you complete it, so you test that this works.

In the type converter, you can also convert it to a primitive boolean with a default value
boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);

> Improve the Aggregator to be able to force a flush all aggregated exchanges
> ---------------------------------------------------------------------------
>
>                 Key: CAMEL-4118
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4118
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 2.7.2
>            Reporter: Christian Müller
>            Assignee: Ben O'Day
>             Fix For: 2.9.0
>
>         Attachments: CAMEL-4118.patch
>
>
> Imagine you process a big file with multiple financial transactions. After splitting the file into its individual transactions, we send they to an aggregator to group transactions for the same card/account together. At this time, we don't know how many transactions we have to group together. Only at the end of processing the input file, we know that we are done. At this time, we have to instruct the aggregator to "flush all aggregated exchanges".
> A workaround for the time being is, to inject the aggregator also into a bean which is called after the splitter. This bean can query for all keys with the "getKeys" method and than send a "flush" exchange to the aggregator (completionPredicate(header("flush"))). However it will still aggregate that "flush exchange". So take care of that in your custom aggregation strategy and enable eagerCheckCompletion, so the predicate is checked before aggregating.
> We had a similar question on the user list to "flush all aggregated exchanges" when Camel shut down.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira