You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Craig Brumfield <cr...@apihotels.com> on 2011/06/07 22:59:51 UTC

Problem with Aggregator HawtDB persistence

I am using Camel 2.6.0 with Active MQ 5.4.2, and am having a problem with the Aggregator using the HawtDB persistence (1.5 for HawtDB and 1.3 for HawtBuf).

What I am trying to do is to consume files from an FTP source and then aggregate these files into a single exchange.  The FTP site contains 3 different files with the same base file name, e.g., FILENAME. abc, FILENAME.xyz, and FILENAME.done.  I am using a custom file strategy object to verify that the FILENAME.done is there before consuming either of the other two files (because the doneFile support did not exist when I first wrote this code), and this all works fine.   The filename is used as the aggregator correlation key, e.g., A123456 in the output below.

If I run my test before implementing the aggregator persistence using the HawtDB database, everything works fine.  The FILENAME.abc is consumed, then the FILENAME.xyz is consumed and these are then sent to my aggregator class process() method containing a grouped exchange.  In this method, I write the files to local temp files (using an input stream and File objects) and then read these files using a third party program that parses these files and presents the results to me in record format.

However, if I make a single change to my activemq.xml file to add the reference to my aggregator persistence, it continues to consume both files, but never aggregates them and sends them to my process() method.

  <aggregate groupExchanges="true" completionSize="2" aggregationRepositoryRef="aggregatorRepository">
    <correlationExpression>
      <header>basename</header>
    </correlationExpression>
    <process ref="myFileParser"/>
  </aggregate>

  <bean id="aggregatorRepository" class="org.apache.camel.component.hawtdb.HawtDBAggregationRepository">
    <property name="repositoryName" value="myAggregator"/>
    <property name="persistentFileName" value="data/myAggregator.dat"/>
  </bean>

My reason for wanting to use this persistence framework is to provide some better error handling in the event that by server crashes before completely processing the aggregated result since there seems not to be transactional support with the aggregator.  The commit() method on my custom file strategy class gets called as each file is consumed, not after the aggregated results are processed.

Is there any reason this should not work-am I misunderstanding what this aggregator persistence should provide?

When it consumes the first file, the log output from HawtDB is as follows:

2011-06-07 14:38:25,834 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing work +++ done  +++ Scan
2011-06-07 14:38:25,834 | TRACE | org.apache.camel.component.hawtdb.HawtDBAggregationRepository | Scanned and found no exchange to recover.
2011-06-07 14:38:25,845 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing work +++ start +++ Getting key [A123456]
2011-06-07 14:38:25,845 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Repository index with name myAggregator -> null
2011-06-07 14:38:25,846 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | TX is read only: true for executed work: Getting key [A123456]
2011-06-07 14:38:25,848 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing work +++ done  +++ Getting key [A123456]
2011-06-07 14:38:25,848 | DEBUG | org.apache.camel.component.hawtdb.HawtDBAggregationRepository | Getting key  [A123456] -> null
2011-06-07 14:38:25,849 | DEBUG | org.apache.camel.component.hawtdb.HawtDBAggregationRepository | Adding key   [A123456] -> Exchange[null]
2011-06-07 14:38:25,852 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing work +++ start +++ Adding key [A123456]
2011-06-07 14:38:25,853 | DEBUG | org.apache.camel.component.hawtdb.HawtDBFile | Created new repository index with name myAggregator at location 1
2011-06-07 14:38:25,853 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Repository index with name myAggregator -> { page: 1, deferredEncoding: true }
2011-06-07 14:38:25,854 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | TX is read only: false for executed work: Adding key [A123456]
2011-06-07 14:38:25,858 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing work +++ done  +++ Adding key [A123456]

When it consumes the second file, the output is as follows:

2011-06-07 14:38:25,886 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing work +++ start +++ Getting key [A123456]
2011-06-07 14:38:25,887 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Repository index with name myAggregator at location 1
2011-06-07 14:38:25,887 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Repository index with name myAggregator -> { page: 1, deferredEncoding: true }
2011-06-07 14:38:25,888 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | TX is read only: true for executed work: Getting key [A123456]
2011-06-07 14:38:25,890 | TRACE | org.apache.camel.component.hawtdb.HawtDBFile | Executing work +++ done  +++ Getting key [A123456]
2011-06-07 14:38:25,905 | DEBUG | org.apache.camel.component.hawtdb.HawtDBAggregationRepository | Getting key  [A123456] -> Exchange[Message: [Body is null]]

At this point, it seems to be stuck.  The commit() method of my file strategy never gets called for the second file, nor is the grouped Exchange sent to my process() method.

Any suggestions?

Thanks,
Craig