You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Scisci <fe...@gmail.com> on 2015/11/16 09:14:11 UTC

AggregatinStrategy on split never stop

Hi,
I've a problem with a split and custom aggregationStrateqy.
I've to count the items that were split.
This in my AggreagationSt egy

public class Item_AggregationStrategy implements AggregationStrategy{
	
	private static final Logger LOGGER =
Logger.getLogger(Item_AggregationStrategy.class);
	

	@Override
	public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
		LOGGER.info("Sono in Item_AggregationStrategy");
		
		 if (oldExchange == null) 
		 {
	            return newExchange;
	     }
		 
		
		Integer numero_items = oldExchange.getIn().getHeader("Numero_Items",
Integer.class);
		LOGGER.info("Numero Items: " + numero_items);
		numero_items++;
		LOGGER.info("Numero Items 2: " + numero_items);
		
		newExchange.getIn().setHeader("Numero_Items", numero_items);
		
		return newExchange;
	}

and this is part of my route:

<process ref="Cancellazione_Tabelle"/>
 				<split streaming="true" strategyRef="Item_Aggregator" >-->
					<tokenize token="Item" xml="true"
inheritNamespaceTagName="ag_01:Accpos"/>
					<to uri="direct:Convert_Accpos"/>
        		</split>
        		<log message="Numero_Items dopo lo split:
${header.Numero_Items}"/>

it's look like tha the aggregator never end...
How to set a timeout like completionTimeout on aggregator ?

Thanks
Mirko




--
View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: AggregatinStrategy on split never stop

Posted by Jakub Korab <ja...@ameliant.com>.
To handle exceptions within a split, your AggregationStrategy needs to 
be written in such a way that it deals with Exceptions. See the 
following for an example:

https://github.com/CamelCookbook/camel-cookbook-examples/blob/master/camel-cookbook-routing/src/main/java/org/camelcookbook/routing/multicast/ExceptionHandlingAggregationStrategy.java

If you don't, the splitting process will complete anyway, but the 
exception will be thrown afterwards - never reaching the next processor 
(in your case, "Check_New_Record"). See the documentation about 
"stopOnException" here:

     https://camel.apache.org/splitter.html

Jakub

On 11/12/15 08:27, Scisci wrote:
> hi,
> sorry for the delayed response.
> I'm using camel version 2.6.0 and spring version 4.1.6.RELEASE.
> This one of the routes
>
> <camelContext id="XXX"
> 	  	xmlns="http://camel.apache.org/schema/spring"
> 	  	xmlns:ag_01="http://www.my.it/Accpos"
> 	  	
> 	  	>
> 	  	
>      	<propertyPlaceholder id="placeholder"
> location="classpath:file.properties"/>
>      	
>          <route id="Accpos">
> 			<from
> uri="file:{{Path_In}}?fileName={{File_Name_Accpos}}&amp;readLock=changed&amp;readLockTimeout=10000&amp;readLockCheckInterval=5000&amp;move={{Backup}}"/>
>   			<doTry>
>        			<to uri="validator:classpath:Accpos.xsd"/>
>        			<log message="Reciving ${file:name}"/>
>        			<setHeader headerName="Tipo_Fondo"><xpath
> resultType="java.lang.String">/ag_01:Accpos/ag_01:Tipo_Fondo/text()</xpath></setHeader>
>        			<log message="Tipo_Fondo : ${header.Tipo_Fondo}"/>
>        			<setHeader headerName="Data_Giro"><xpath
> resultType="java.lang.String">/ag_01:Accpos/ag_01:Data_Giro/text()</xpath></setHeader>
>        			<log message="Data_Giro : ${header.Data_Giro}"/>
>        			<setHeader headerName="Numero_Record_Inviati"><xpath
> resultType="java.lang.String">/ag_01:Accpos/ag_01:Items/@count</xpath></setHeader>
>        			<log message="Numero_Record_Inviati :
> ${header.Numero_Record_Inviati}"/>
>        			<setHeader
> headerName="Tabella"><simple>${file:name}</simple></setHeader>
>        			<log message="Tabella : ${header.Tabella}"/>
>        			<setHeader headerName="Numero_Items"><simple>0</simple></setHeader>
>        			<log message="Numero_Items : ${header.Numero_Items}"/>
>        			<process ref="Clean_Tables"/>
> 				<split strategyRef="Item_Aggregator" streaming="true">
> 					<xtokenize>/ag_01:Accpos/ag_01:Items/ag_01:Item</xtokenize>
> 					<to uri="direct:Convert_Accpos"/>
>          		</split>
>          		<process ref="Check_New_Record"/>
>          		<to uri="direct:Mail_Info_Loading"/>
>    				<doCatch>
>          			<exception>org.apache.camel.ValidationException</exception>
> 			        <setHeader headerName="subject">
> 			          <simple>ERRORE - validazione - ACCPOS</simple>
> 			        </setHeader>
> 			        <setBody>
> 			          <simple>Errore nella validazione \r \n
> ${exception.message}</simple>
> 			        </setBody>
> 			        <to
> uri="smtp://{{Mail_Server}}?from={{Sender}}&amp;to={{Receivers_Error}}"/>
> 			    </doCatch>
>      		</doTry>
>          </route>
>          <route id="Convert_Accpos"
> errorHandlerRef="myDeadLetterErrorHandler_Accpos">
>          	<from uri="direct:Convert_Accpos"/>
>          	<unmarshal>
> 				<jaxb contextPath="it.my.Risk_Files.Accpos"
> partClass="it.my.Risk_Files.Accpos.ItemType" ignoreJAXBElement="true"/>
> 			</unmarshal>
> 			<convertBodyTo type="it.my.Risk_Files.Accpos.Accpos"/>
> 			<to uri="jpa:it.my.Risk_Files.Accpos.Accpos?usePersist=true"/>
>      	</route>
> ....
>
> <bean id="myDeadLetterErrorHandler_Accpos"
> class="org.apache.camel.builder.DeadLetterChannelBuilder">
>      	<property name="deadLetterUri"
> value="jpa:it.my.Risk_Files.Accpos.Accpos_Error"/>
> 	</bean>
>
> if the last split throw an error, for example primary key violation, never
> return after the deadletter error handler and don't execute process
> Check_New_Record.
>
> Thanks
> Mirko
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905p5774985.html
> Sent from the Camel - Users mailing list archive at Nabble.com.


Re: AggregatinStrategy on split never stop

Posted by Scisci <fe...@gmail.com>.
hi,
sorry for the delayed response.
I'm using camel version 2.6.0 and spring version 4.1.6.RELEASE.
This one of the routes

<camelContext id="XXX" 
	  	xmlns="http://camel.apache.org/schema/spring" 
	  	xmlns:ag_01="http://www.my.it/Accpos"
	  	
	  	>
	  	
    	<propertyPlaceholder id="placeholder"
location="classpath:file.properties"/>
    	
        <route id="Accpos">
			<from
uri="file:{{Path_In}}?fileName={{File_Name_Accpos}}&amp;readLock=changed&amp;readLockTimeout=10000&amp;readLockCheckInterval=5000&amp;move={{Backup}}"/>
 			<doTry>
      			<to uri="validator:classpath:Accpos.xsd"/>
      			<log message="Reciving ${file:name}"/>
      			<setHeader headerName="Tipo_Fondo"><xpath
resultType="java.lang.String">/ag_01:Accpos/ag_01:Tipo_Fondo/text()</xpath></setHeader>
      			<log message="Tipo_Fondo : ${header.Tipo_Fondo}"/>
      			<setHeader headerName="Data_Giro"><xpath
resultType="java.lang.String">/ag_01:Accpos/ag_01:Data_Giro/text()</xpath></setHeader>
      			<log message="Data_Giro : ${header.Data_Giro}"/>
      			<setHeader headerName="Numero_Record_Inviati"><xpath
resultType="java.lang.String">/ag_01:Accpos/ag_01:Items/@count</xpath></setHeader>
      			<log message="Numero_Record_Inviati :
${header.Numero_Record_Inviati}"/>
      			<setHeader
headerName="Tabella"><simple>${file:name}</simple></setHeader>
      			<log message="Tabella : ${header.Tabella}"/>
      			<setHeader headerName="Numero_Items"><simple>0</simple></setHeader>
      			<log message="Numero_Items : ${header.Numero_Items}"/>
      			<process ref="Clean_Tables"/>
				<split strategyRef="Item_Aggregator" streaming="true">
					<xtokenize>/ag_01:Accpos/ag_01:Items/ag_01:Item</xtokenize>
					<to uri="direct:Convert_Accpos"/>
        		</split>
        		<process ref="Check_New_Record"/>
        		<to uri="direct:Mail_Info_Loading"/>
  				<doCatch>
        			<exception>org.apache.camel.ValidationException</exception>
			        <setHeader headerName="subject">
			          <simple>ERRORE - validazione - ACCPOS</simple>
			        </setHeader>
			        <setBody>
			          <simple>Errore nella validazione \r \n
${exception.message}</simple>
			        </setBody>
			        <to
uri="smtp://{{Mail_Server}}?from={{Sender}}&amp;to={{Receivers_Error}}"/>
			    </doCatch>
    		</doTry>
        </route>
        <route id="Convert_Accpos"
errorHandlerRef="myDeadLetterErrorHandler_Accpos">
        	<from uri="direct:Convert_Accpos"/>
        	<unmarshal>
				<jaxb contextPath="it.my.Risk_Files.Accpos"
partClass="it.my.Risk_Files.Accpos.ItemType" ignoreJAXBElement="true"/>
			</unmarshal>
			<convertBodyTo type="it.my.Risk_Files.Accpos.Accpos"/>
			<to uri="jpa:it.my.Risk_Files.Accpos.Accpos?usePersist=true"/>
    	</route>
....

<bean id="myDeadLetterErrorHandler_Accpos"
class="org.apache.camel.builder.DeadLetterChannelBuilder">
    	<property name="deadLetterUri"
value="jpa:it.my.Risk_Files.Accpos.Accpos_Error"/>
	</bean>

if the last split throw an error, for example primary key violation, never
return after the deadletter error handler and don't execute process
Check_New_Record.

Thanks
Mirko



--
View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905p5774985.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: AggregatinStrategy on split never stop

Posted by Claus Ibsen <cl...@gmail.com>.
What version of Camel do you use? And can you show all your Camel
routes and configuration

On Thu, Nov 26, 2015 at 9:14 AM, Scisci <fe...@gmail.com> wrote:
> Hi,
> after a bit of investigation I've found when the aggregation not end.
> If the last item in the split throw an error it will be redirected to a
> deadletter channel and not return to the Split component.
> I think this is because the default failure processor on
> org.apache.camel.DeadLetterChannelBuider is set with ExchangePattern.InOnly
>
> How to create a new process that send back same info to split componet so it
> can terminate ?
>
> Thanks
> Mirko
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905p5774432.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Re: AggregatinStrategy on split never stop

Posted by Scisci <fe...@gmail.com>.
Hi, 
after a bit of investigation I've found when the aggregation not end.
If the last item in the split throw an error it will be redirected to a
deadletter channel and not return to the Split component.
I think this is because the default failure processor on
org.apache.camel.DeadLetterChannelBuider is set with ExchangePattern.InOnly

How to create a new process that send back same info to split componet so it
can terminate ?

Thanks
Mirko



--
View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905p5774432.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: AggregatinStrategy on split never stop

Posted by Claus Ibsen <cl...@gmail.com>.
It ends when there is no more data to split.

On Mon, Nov 16, 2015 at 9:14 AM, Scisci <fe...@gmail.com> wrote:
> Hi,
> I've a problem with a split and custom aggregationStrateqy.
> I've to count the items that were split.
> This in my AggreagationSt egy
>
> public class Item_AggregationStrategy implements AggregationStrategy{
>
>         private static final Logger LOGGER =
> Logger.getLogger(Item_AggregationStrategy.class);
>
>
>         @Override
>         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>                 LOGGER.info("Sono in Item_AggregationStrategy");
>
>                  if (oldExchange == null)
>                  {
>                     return newExchange;
>              }
>
>
>                 Integer numero_items = oldExchange.getIn().getHeader("Numero_Items",
> Integer.class);
>                 LOGGER.info("Numero Items: " + numero_items);
>                 numero_items++;
>                 LOGGER.info("Numero Items 2: " + numero_items);
>
>                 newExchange.getIn().setHeader("Numero_Items", numero_items);
>
>                 return newExchange;
>         }
>
> and this is part of my route:
>
> <process ref="Cancellazione_Tabelle"/>
>                                 <split streaming="true" strategyRef="Item_Aggregator" >-->
>                                         <tokenize token="Item" xml="true"
> inheritNamespaceTagName="ag_01:Accpos"/>
>                                         <to uri="direct:Convert_Accpos"/>
>                         </split>
>                         <log message="Numero_Items dopo lo split:
> ${header.Numero_Items}"/>
>
> it's look like tha the aggregator never end...
> How to set a timeout like completionTimeout on aggregator ?
>
> Thanks
> Mirko
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

RE: AggregatinStrategy on split never stop

Posted by Scisci <fe...@gmail.com>.
Hi, 
to repond to Kai , I've tryed with the properties CamelSplitIndex,
CamelSlpitSize and CamelSplitComplete but this properties are empty out of
split and if I set the header with a simple on the split at the end the
split the  header is also not set.

To respond to Claus, the problem is that the split never ends..
The last log I see is the print on the aggeagtionstrategy and not the log
out of the split...
I've also try to remove streaming="true" but the aggregation never ends

some suggestion?

Thanks
Mirko



--
View this message in context: http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-tp5773905p5773909.html
Sent from the Camel - Users mailing list archive at Nabble.com.

RE: AggregatinStrategy on split never stop

Posted by Kai Broszat <Ka...@kewill.com>.
You shouldn't have to do the counting yourself - Camel is doing that for you:

Exchange properties: The following properties are set on each Exchange that are split:

CamelSplitIndex  (int)
A split counter that increases for each Exchange being split. The counter starts from 0.

CamelSplitSize (int)
The total number of Exchanges that was splitted. This header is not applied for stream based splitting. From Camel 2.9 onwards this header is also set in stream based splitting, but only on the completed Exchange.

CamelSplitComplete (Boolean)
Camel 2.4: Whether or not this Exchange is the last. 

(from http://camel.apache.org/splitter.html)

Hope that helps,
Kai


> -----Original Message-----
> From: Scisci [mailto:ferioli.mirko@gmail.com]
> Sent: Monday, November 16, 2015 9:14 AM
> To: users@camel.apache.org
> Subject: AggregatinStrategy on split never stop
> 
> Hi,
> I've a problem with a split and custom aggregationStrateqy.
> I've to count the items that were split.
> This in my AggreagationSt egy
> 
> public class Item_AggregationStrategy implements AggregationStrategy{
> 
> 	private static final Logger LOGGER =
> Logger.getLogger(Item_AggregationStrategy.class);
> 
> 
> 	@Override
> 	public Exchange aggregate(Exchange oldExchange, Exchange
> newExchange) {
> 		LOGGER.info("Sono in Item_AggregationStrategy");
> 
> 		 if (oldExchange == null)
> 		 {
> 	            return newExchange;
> 	     }
> 
> 
> 		Integer numero_items =
> oldExchange.getIn().getHeader("Numero_Items",
> Integer.class);
> 		LOGGER.info("Numero Items: " + numero_items);
> 		numero_items++;
> 		LOGGER.info("Numero Items 2: " + numero_items);
> 
> 		newExchange.getIn().setHeader("Numero_Items",
> numero_items);
> 
> 		return newExchange;
> 	}
> 
> and this is part of my route:
> 
> <process ref="Cancellazione_Tabelle"/>
>  				<split streaming="true"
> strategyRef="Item_Aggregator" >-->
> 					<tokenize token="Item" xml="true"
> inheritNamespaceTagName="ag_01:Accpos"/>
> 					<to uri="direct:Convert_Accpos"/>
>         		</split>
>         		<log message="Numero_Items dopo lo split:
> ${header.Numero_Items}"/>
> 
> it's look like tha the aggregator never end...
> How to set a timeout like completionTimeout on aggregator ?
> 
> Thanks
> Mirko
> 
> 
> 
> 
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/AggregatinStrategy-on-split-never-stop-
> tp5773905.html
> Sent from the Camel - Users mailing list archive at Nabble.com.