You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Reto Peter <re...@advanceit.ch> on 2022/04/05 08:12:46 UTC

JdbcAggregationRepository is not permanently?

Hi

Using the JdbcAggregationRepository I am trying to save the relation between my incoming messages and the aggregate, as a 1-n relation. It is actually the EDI Messages (Single message) into the EDI Interchange (Collection of messages).
When I run the route, I see that the entries are available in the my-SQL aggregation table.
If the aggregate is finished, then all the entries are gone, both tables aggregation and aggregation_completed are empty
Is that normal? Or Do I have an error somewhere in my setup?

I was trying to find another solution to get my relation saved. In my AggregatorStrategy class, in the aggregate method I did save a list of messages, and in the onCompletion method, I set that list as a Header. This works only if I use the correlationExpression 'true', but actually I do aggregate messages with same doctype like "DESADV" and "INVOIC" and then the value of the Header is always just the one from the last processed aggregate

My route looks like this (simplified)

from("file:{{dir.from}}")
.choice()
.when()....setHeader("doctype").constant("DESADV")
.when()....setHeader("doctype").constant("INVOICE")
.aggregate(header("doctype"), new EDIMessageToInterchangeAggregator()).aggregationRepository("repo3")
.log("Header: ${header.messageIDList}") // Shows always the latest Aggregate Header only

If I run message for one doctype only, then it all works, cause there will only be one aggregate

My bean config is like this:
       <bean id="repo3"
         class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
         <property name="repositoryName" value="test_aggregation"/>
         <property name="transactionManager" ref="txManager"/>
         <property name="dataSource" ref="datasource"/>
         <!-- configure to store the message body and following headers as text in the repo -->

         <property name="storeBodyAsText" value="true"/>
         <property name="headersToStoreAsText">
           <list>
             <value>doctype</value>
             <value>messageID</value>
             <value>interchangeID</value>
           </list>
         </property>
       </bean>

       <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
       <property name="dataSource" ref="datasource"/>
       </bean>

My environment:
Camel 3.16.0
Java 11
SpringBoot 2.6.4
MySQL 8.x



Re: JdbcAggregationRepository is not permanently?

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

If you mean that the tables are "cleaned up" then yes that is by
design. The tables are storing in-progress aggregations, when they are
completed and processed finished then their rows are deleted.

On Tue, Apr 5, 2022 at 10:13 AM Reto Peter <re...@advanceit.ch> wrote:
>
> Hi
>
> Using the JdbcAggregationRepository I am trying to save the relation between my incoming messages and the aggregate, as a 1-n relation. It is actually the EDI Messages (Single message) into the EDI Interchange (Collection of messages).
> When I run the route, I see that the entries are available in the my-SQL aggregation table.
> If the aggregate is finished, then all the entries are gone, both tables aggregation and aggregation_completed are empty
> Is that normal? Or Do I have an error somewhere in my setup?
>
> I was trying to find another solution to get my relation saved. In my AggregatorStrategy class, in the aggregate method I did save a list of messages, and in the onCompletion method, I set that list as a Header. This works only if I use the correlationExpression 'true', but actually I do aggregate messages with same doctype like "DESADV" and "INVOIC" and then the value of the Header is always just the one from the last processed aggregate
>
> My route looks like this (simplified)
>
> from("file:{{dir.from}}")
> .choice()
> .when()....setHeader("doctype").constant("DESADV")
> .when()....setHeader("doctype").constant("INVOICE")
> .aggregate(header("doctype"), new EDIMessageToInterchangeAggregator()).aggregationRepository("repo3")
> .log("Header: ${header.messageIDList}") // Shows always the latest Aggregate Header only
>
> If I run message for one doctype only, then it all works, cause there will only be one aggregate
>
> My bean config is like this:
>        <bean id="repo3"
>          class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
>          <property name="repositoryName" value="test_aggregation"/>
>          <property name="transactionManager" ref="txManager"/>
>          <property name="dataSource" ref="datasource"/>
>          <!-- configure to store the message body and following headers as text in the repo -->
>
>          <property name="storeBodyAsText" value="true"/>
>          <property name="headersToStoreAsText">
>            <list>
>              <value>doctype</value>
>              <value>messageID</value>
>              <value>interchangeID</value>
>            </list>
>          </property>
>        </bean>
>
>        <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
>        <property name="dataSource" ref="datasource"/>
>        </bean>
>
> My environment:
> Camel 3.16.0
> Java 11
> SpringBoot 2.6.4
> MySQL 8.x
>
>


-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

RE: JdbcAggregationRepository is not permanently?

Posted by Reto Peter <re...@advanceit.ch>.
Hi Karen

I actually need INSERT INTO xxxx (interchangeID, messageID)....

The question was if the AGGREGATE method can save the relation between the incoming MESSAGES(files) and the produced AGGREGATE (INTERCHANGE) inside the database without any hassle. With the aggregation repo that would be there automatically. But its only temporary...

So I do need a db-table with InterchangeID as the PRIMARY KEY and MessageID as foreign Key. Many Messages(files) for 1 Interchange (Aggregate). In my case I do have different incoming type of messages (DESADV and INVOIC)
I solved this issue already inside the AggregatorStrategy. I saved the MessageID's in a list there for each doctype (desadvMessageIDs and invoicMessageIds) At "onCompletion" then I do set the outgoing Header to the specific messageIDs. Inside the route then I do update the related messages with the created InterchangeID.

It took me a lot of time to find this solution....but now it all works

-----Original Message-----
From: Karen Lease <ka...@gmail.com> 
Sent: Friday, April 8, 2022 9:34 PM
To: users@camel.apache.org
Subject: Re: JdbcAggregationRepository is not permanently?

Hi,

To store the aggregated results, you need to create a separate table in your database.
Then you can add something like the following statement to your route after the aggregation:

.to("sql:insert into messages (doctype, messageID) values (:#doctype, :#messageID)"); The values of the named parameters will be taken from the corresponding headers in the aggregated Exchange.


On 05/04/2022 10:12, Reto Peter wrote:
> Hi
> 
> Using the JdbcAggregationRepository I am trying to save the relation between my incoming messages and the aggregate, as a 1-n relation. It is actually the EDI Messages (Single message) into the EDI Interchange (Collection of messages).
> When I run the route, I see that the entries are available in the my-SQL aggregation table.
> If the aggregate is finished, then all the entries are gone, both 
> tables aggregation and aggregation_completed are empty Is that normal? Or Do I have an error somewhere in my setup?
> 
> I was trying to find another solution to get my relation saved. In my 
> AggregatorStrategy class, in the aggregate method I did save a list of 
> messages, and in the onCompletion method, I set that list as a Header. 
> This works only if I use the correlationExpression 'true', but 
> actually I do aggregate messages with same doctype like "DESADV" and 
> "INVOIC" and then the value of the Header is always just the one from 
> the last processed aggregate
> 
> My route looks like this (simplified)
> 
> from("file:{{dir.from}}")
> .choice()
> .when()....setHeader("doctype").constant("DESADV")
> .when()....setHeader("doctype").constant("INVOICE")
> .aggregate(header("doctype"), new 
> EDIMessageToInterchangeAggregator()).aggregationRepository("repo3")
> .log("Header: ${header.messageIDList}") // Shows always the latest 
> Aggregate Header only
> 
> If I run message for one doctype only, then it all works, cause there 
> will only be one aggregate
> 
> My bean config is like this:
>         <bean id="repo3"
>           class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
>           <property name="repositoryName" value="test_aggregation"/>
>           <property name="transactionManager" ref="txManager"/>
>           <property name="dataSource" ref="datasource"/>
>           <!-- configure to store the message body and following 
> headers as text in the repo -->
> 
>           <property name="storeBodyAsText" value="true"/>
>           <property name="headersToStoreAsText">
>             <list>
>               <value>doctype</value>
>               <value>messageID</value>
>               <value>interchangeID</value>
>             </list>
>           </property>
>         </bean>
> 
>         <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
>         <property name="dataSource" ref="datasource"/>
>         </bean>
> 

> 
> 
> 

Re: JdbcAggregationRepository is not permanently?

Posted by Karen Lease <ka...@gmail.com>.
Hi,

To store the aggregated results, you need to create a separate table in 
your database.
Then you can add something like the following statement to your route 
after the aggregation:

.to("sql:insert into messages (doctype, messageID) values (:#doctype, 
:#messageID)");
The values of the named parameters will be taken from the corresponding 
headers in the aggregated Exchange.


On 05/04/2022 10:12, Reto Peter wrote:
> Hi
> 
> Using the JdbcAggregationRepository I am trying to save the relation between my incoming messages and the aggregate, as a 1-n relation. It is actually the EDI Messages (Single message) into the EDI Interchange (Collection of messages).
> When I run the route, I see that the entries are available in the my-SQL aggregation table.
> If the aggregate is finished, then all the entries are gone, both tables aggregation and aggregation_completed are empty
> Is that normal? Or Do I have an error somewhere in my setup?
> 
> I was trying to find another solution to get my relation saved. In my AggregatorStrategy class, in the aggregate method I did save a list of messages, and in the onCompletion method, I set that list as a Header. This works only if I use the correlationExpression 'true', but actually I do aggregate messages with same doctype like "DESADV" and "INVOIC" and then the value of the Header is always just the one from the last processed aggregate
> 
> My route looks like this (simplified)
> 
> from("file:{{dir.from}}")
> .choice()
> .when()....setHeader("doctype").constant("DESADV")
> .when()....setHeader("doctype").constant("INVOICE")
> .aggregate(header("doctype"), new EDIMessageToInterchangeAggregator()).aggregationRepository("repo3")
> .log("Header: ${header.messageIDList}") // Shows always the latest Aggregate Header only
> 
> If I run message for one doctype only, then it all works, cause there will only be one aggregate
> 
> My bean config is like this:
>         <bean id="repo3"
>           class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
>           <property name="repositoryName" value="test_aggregation"/>
>           <property name="transactionManager" ref="txManager"/>
>           <property name="dataSource" ref="datasource"/>
>           <!-- configure to store the message body and following headers as text in the repo -->
> 
>           <property name="storeBodyAsText" value="true"/>
>           <property name="headersToStoreAsText">
>             <list>
>               <value>doctype</value>
>               <value>messageID</value>
>               <value>interchangeID</value>
>             </list>
>           </property>
>         </bean>
> 
>         <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
>         <property name="dataSource" ref="datasource"/>
>         </bean>
> 

> 
> 
>