You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Scisci <fe...@gmail.com> on 2015/11/16 09:14:11 UTC
AggregatinStrategy on split never stop
Hi,
I've a problem with a split and custom aggregationStrateqy.
I've to count the items that were split.
This in my AggreagationSt egy
public class Item_AggregationStrategy implements AggregationStrategy{
private static final Logger LOGGER =
Logger.getLogger(Item_AggregationStrategy.class);
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
LOGGER.info("Sono in Item_AggregationStrategy");
if (oldExchange == null)
{
return newExchange;
}
Integer numero_items = oldExchange.getIn().getHeader("Numero_Items",
Integer.class);
LOGGER.info("Numero Items: " + numero_items);
numero_items++;
LOGGER.info("Numero Items 2: " + numero_items);
newExchange.getIn().setHeader("Numero_Items", numero_items);
return newExchange;
}
and this is part of my route:
<process ref="Cancellazione_Tabelle"/>
<split streaming="true" strategyRef="Item_Aggregator" >-->
<tokenize token="Item" xml="true"
inheritNamespaceTagName="ag_01:Accpos"/>
<to uri="direct:Convert_Accpos"/>
</split>
<log message="Numero_Items dopo lo split:
${header.Numero_Items}"/>
it's look like tha the aggregator never end...
How to set a timeout like completionTimeout on aggregator ?
Thanks
Mirko
--
View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Re: AggregatinStrategy on split never stop
Posted by Jakub Korab <ja...@ameliant.com>.
To handle exceptions within a split, your AggregationStrategy needs to
be written in such a way that it deals with Exceptions. See the
following for an example:
https://github.com/CamelCookbook/camel-cookbook-examples/blob/master/camel-cookbook-routing/src/main/java/org/camelcookbook/routing/multicast/ExceptionHandlingAggregationStrategy.java
If you don't, the splitting process will complete anyway, but the
exception will be thrown afterwards - never reaching the next processor
(in your case, "Check_New_Record"). See the documentation about
"stopOnException" here:
https://camel.apache.org/splitter.html
Jakub
On 11/12/15 08:27, Scisci wrote:
> hi,
> sorry for the delayed response.
> I'm using camel version 2.6.0 and spring version 4.1.6.RELEASE.
> This one of the routes
>
> <camelContext id="XXX"
> xmlns="http://camel.apache.org/schema/spring"
> xmlns:ag_01="http://www.my.it/Accpos"
>
> >
>
> <propertyPlaceholder id="placeholder"
> location="classpath:file.properties"/>
>
> <route id="Accpos">
> <from
> uri="file:{{Path_In}}?fileName={{File_Name_Accpos}}&readLock=changed&readLockTimeout=10000&readLockCheckInterval=5000&move={{Backup}}"/>
> <doTry>
> <to uri="validator:classpath:Accpos.xsd"/>
> <log message="Reciving ${file:name}"/>
> <setHeader headerName="Tipo_Fondo"><xpath
> resultType="java.lang.String">/ag_01:Accpos/ag_01:Tipo_Fondo/text()</xpath></setHeader>
> <log message="Tipo_Fondo : ${header.Tipo_Fondo}"/>
> <setHeader headerName="Data_Giro"><xpath
> resultType="java.lang.String">/ag_01:Accpos/ag_01:Data_Giro/text()</xpath></setHeader>
> <log message="Data_Giro : ${header.Data_Giro}"/>
> <setHeader headerName="Numero_Record_Inviati"><xpath
> resultType="java.lang.String">/ag_01:Accpos/ag_01:Items/@count</xpath></setHeader>
> <log message="Numero_Record_Inviati :
> ${header.Numero_Record_Inviati}"/>
> <setHeader
> headerName="Tabella"><simple>${file:name}</simple></setHeader>
> <log message="Tabella : ${header.Tabella}"/>
> <setHeader headerName="Numero_Items"><simple>0</simple></setHeader>
> <log message="Numero_Items : ${header.Numero_Items}"/>
> <process ref="Clean_Tables"/>
> <split strategyRef="Item_Aggregator" streaming="true">
> <xtokenize>/ag_01:Accpos/ag_01:Items/ag_01:Item</xtokenize>
> <to uri="direct:Convert_Accpos"/>
> </split>
> <process ref="Check_New_Record"/>
> <to uri="direct:Mail_Info_Loading"/>
> <doCatch>
> <exception>org.apache.camel.ValidationException</exception>
> <setHeader headerName="subject">
> <simple>ERRORE - validazione - ACCPOS</simple>
> </setHeader>
> <setBody>
> <simple>Errore nella validazione \r \n
> ${exception.message}</simple>
> </setBody>
> <to
> uri="smtp://{{Mail_Server}}?from={{Sender}}&to={{Receivers_Error}}"/>
> </doCatch>
> </doTry>
> </route>
> <route id="Convert_Accpos"
> errorHandlerRef="myDeadLetterErrorHandler_Accpos">
> <from uri="direct:Convert_Accpos"/>
> <unmarshal>
> <jaxb contextPath="it.my.Risk_Files.Accpos"
> partClass="it.my.Risk_Files.Accpos.ItemType" ignoreJAXBElement="true"/>
> </unmarshal>
> <convertBodyTo type="it.my.Risk_Files.Accpos.Accpos"/>
> <to uri="jpa:it.my.Risk_Files.Accpos.Accpos?usePersist=true"/>
> </route>
> ....
>
> <bean id="myDeadLetterErrorHandler_Accpos"
> class="org.apache.camel.builder.DeadLetterChannelBuilder">
> <property name="deadLetterUri"
> value="jpa:it.my.Risk_Files.Accpos.Accpos_Error"/>
> </bean>
>
> if the last split throw an error, for example primary key violation, never
> return after the deadletter error handler and don't execute process
> Check_New_Record.
>
> Thanks
> Mirko
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905p5774985.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
Re: AggregatinStrategy on split never stop
Posted by Scisci <fe...@gmail.com>.
hi,
sorry for the delayed response.
I'm using camel version 2.6.0 and spring version 4.1.6.RELEASE.
This one of the routes
<camelContext id="XXX"
xmlns="http://camel.apache.org/schema/spring"
xmlns:ag_01="http://www.my.it/Accpos"
>
<propertyPlaceholder id="placeholder"
location="classpath:file.properties"/>
<route id="Accpos">
<from
uri="file:{{Path_In}}?fileName={{File_Name_Accpos}}&readLock=changed&readLockTimeout=10000&readLockCheckInterval=5000&move={{Backup}}"/>
<doTry>
<to uri="validator:classpath:Accpos.xsd"/>
<log message="Reciving ${file:name}"/>
<setHeader headerName="Tipo_Fondo"><xpath
resultType="java.lang.String">/ag_01:Accpos/ag_01:Tipo_Fondo/text()</xpath></setHeader>
<log message="Tipo_Fondo : ${header.Tipo_Fondo}"/>
<setHeader headerName="Data_Giro"><xpath
resultType="java.lang.String">/ag_01:Accpos/ag_01:Data_Giro/text()</xpath></setHeader>
<log message="Data_Giro : ${header.Data_Giro}"/>
<setHeader headerName="Numero_Record_Inviati"><xpath
resultType="java.lang.String">/ag_01:Accpos/ag_01:Items/@count</xpath></setHeader>
<log message="Numero_Record_Inviati :
${header.Numero_Record_Inviati}"/>
<setHeader
headerName="Tabella"><simple>${file:name}</simple></setHeader>
<log message="Tabella : ${header.Tabella}"/>
<setHeader headerName="Numero_Items"><simple>0</simple></setHeader>
<log message="Numero_Items : ${header.Numero_Items}"/>
<process ref="Clean_Tables"/>
<split strategyRef="Item_Aggregator" streaming="true">
<xtokenize>/ag_01:Accpos/ag_01:Items/ag_01:Item</xtokenize>
<to uri="direct:Convert_Accpos"/>
</split>
<process ref="Check_New_Record"/>
<to uri="direct:Mail_Info_Loading"/>
<doCatch>
<exception>org.apache.camel.ValidationException</exception>
<setHeader headerName="subject">
<simple>ERRORE - validazione - ACCPOS</simple>
</setHeader>
<setBody>
<simple>Errore nella validazione \r \n
${exception.message}</simple>
</setBody>
<to
uri="smtp://{{Mail_Server}}?from={{Sender}}&to={{Receivers_Error}}"/>
</doCatch>
</doTry>
</route>
<route id="Convert_Accpos"
errorHandlerRef="myDeadLetterErrorHandler_Accpos">
<from uri="direct:Convert_Accpos"/>
<unmarshal>
<jaxb contextPath="it.my.Risk_Files.Accpos"
partClass="it.my.Risk_Files.Accpos.ItemType" ignoreJAXBElement="true"/>
</unmarshal>
<convertBodyTo type="it.my.Risk_Files.Accpos.Accpos"/>
<to uri="jpa:it.my.Risk_Files.Accpos.Accpos?usePersist=true"/>
</route>
....
<bean id="myDeadLetterErrorHandler_Accpos"
class="org.apache.camel.builder.DeadLetterChannelBuilder">
<property name="deadLetterUri"
value="jpa:it.my.Risk_Files.Accpos.Accpos_Error"/>
</bean>
if the last split throw an error, for example primary key violation, never
return after the deadletter error handler and don't execute process
Check_New_Record.
Thanks
Mirko
--
View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905p5774985.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Re: AggregatinStrategy on split never stop
Posted by Claus Ibsen <cl...@gmail.com>.
What version of Camel do you use? And can you show all your Camel
routes and configuration
On Thu, Nov 26, 2015 at 9:14 AM, Scisci <fe...@gmail.com> wrote:
> Hi,
> after a bit of investigation I've found when the aggregation not end.
> If the last item in the split throw an error it will be redirected to a
> deadletter channel and not return to the Split component.
> I think this is because the default failure processor on
> org.apache.camel.DeadLetterChannelBuider is set with ExchangePattern.InOnly
>
> How to create a new process that send back same info to split componet so it
> can terminate ?
>
> Thanks
> Mirko
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905p5774432.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2
Re: AggregatinStrategy on split never stop
Posted by Scisci <fe...@gmail.com>.
Hi,
after a bit of investigation I've found when the aggregation not end.
If the last item in the split throw an error it will be redirected to a
deadletter channel and not return to the Split component.
I think this is because the default failure processor on
org.apache.camel.DeadLetterChannelBuider is set with ExchangePattern.InOnly
How to create a new process that send back same info to split componet so it
can terminate ?
Thanks
Mirko
--
View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905p5774432.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Re: AggregatinStrategy on split never stop
Posted by Claus Ibsen <cl...@gmail.com>.
It ends when there is no more data to split.
On Mon, Nov 16, 2015 at 9:14 AM, Scisci <fe...@gmail.com> wrote:
> Hi,
> I've a problem with a split and custom aggregationStrateqy.
> I've to count the items that were split.
> This in my AggreagationSt egy
>
> public class Item_AggregationStrategy implements AggregationStrategy{
>
> private static final Logger LOGGER =
> Logger.getLogger(Item_AggregationStrategy.class);
>
>
> @Override
> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
> LOGGER.info("Sono in Item_AggregationStrategy");
>
> if (oldExchange == null)
> {
> return newExchange;
> }
>
>
> Integer numero_items = oldExchange.getIn().getHeader("Numero_Items",
> Integer.class);
> LOGGER.info("Numero Items: " + numero_items);
> numero_items++;
> LOGGER.info("Numero Items 2: " + numero_items);
>
> newExchange.getIn().setHeader("Numero_Items", numero_items);
>
> return newExchange;
> }
>
> and this is part of my route:
>
> <process ref="Cancellazione_Tabelle"/>
> <split streaming="true" strategyRef="Item_Aggregator" >-->
> <tokenize token="Item" xml="true"
> inheritNamespaceTagName="ag_01:Accpos"/>
> <to uri="direct:Convert_Accpos"/>
> </split>
> <log message="Numero_Items dopo lo split:
> ${header.Numero_Items}"/>
>
> it's look like tha the aggregator never end...
> How to set a timeout like completionTimeout on aggregator ?
>
> Thanks
> Mirko
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2
RE: AggregatinStrategy on split never stop
Posted by Scisci <fe...@gmail.com>.
Hi,
to repond to Kai , I've tryed with the properties CamelSplitIndex,
CamelSlpitSize and CamelSplitComplete but this properties are empty out of
split and if I set the header with a simple on the split at the end the
split the header is also not set.
To respond to Claus, the problem is that the split never ends..
The last log I see is the print on the aggeagtionstrategy and not the log
out of the split...
I've also try to remove streaming="true" but the aggregation never ends
some suggestion?
Thanks
Mirko
--
View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905p5773909.html
Sent from the Camel - Users mailing list archive at Nabble.com.
RE: AggregatinStrategy on split never stop
Posted by Kai Broszat <Ka...@kewill.com>.
You shouldn't have to do the counting yourself - Camel is doing that for you:
Exchange properties: The following properties are set on each Exchange that are split:
CamelSplitIndex (int)
A split counter that increases for each Exchange being split. The counter starts from 0.
CamelSplitSize (int)
The total number of Exchanges that was splitted. This header is not applied for stream based splitting. From Camel 2.9 onwards this header is also set in stream based splitting, but only on the completed Exchange.
CamelSplitComplete (Boolean)
Camel 2.4: Whether or not this Exchange is the last.
(from http://camel.apache.org/splitter.html)
Hope that helps,
Kai
> -----Original Message-----
> From: Scisci [mailto:ferioli.mirko@gmail.com]
> Sent: Monday, November 16, 2015 9:14 AM
> To: users@camel.apache.org
> Subject: AggregatinStrategy on split never stop
>
> Hi,
> I've a problem with a split and custom aggregationStrateqy.
> I've to count the items that were split.
> This in my AggreagationSt egy
>
> public class Item_AggregationStrategy implements AggregationStrategy{
>
> private static final Logger LOGGER =
> Logger.getLogger(Item_AggregationStrategy.class);
>
>
> @Override
> public Exchange aggregate(Exchange oldExchange, Exchange
> newExchange) {
> LOGGER.info("Sono in Item_AggregationStrategy");
>
> if (oldExchange == null)
> {
> return newExchange;
> }
>
>
> Integer numero_items =
> oldExchange.getIn().getHeader("Numero_Items",
> Integer.class);
> LOGGER.info("Numero Items: " + numero_items);
> numero_items++;
> LOGGER.info("Numero Items 2: " + numero_items);
>
> newExchange.getIn().setHeader("Numero_Items",
> numero_items);
>
> return newExchange;
> }
>
> and this is part of my route:
>
> <process ref="Cancellazione_Tabelle"/>
> <split streaming="true"
> strategyRef="Item_Aggregator" >-->
> <tokenize token="Item" xml="true"
> inheritNamespaceTagName="ag_01:Accpos"/>
> <to uri="direct:Convert_Accpos"/>
> </split>
> <log message="Numero_Items dopo lo split:
> ${header.Numero_Items}"/>
>
> it's look like tha the aggregator never end...
> How to set a timeout like completionTimeout on aggregator ?
>
> Thanks
> Mirko
>
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-
> tp5773905.html
> Sent from the Camel - Users mailing list archive at Nabble.com.