You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by vassilis <2b...@gmail.com> on 2015/04/09 16:10:37 UTC

Aggregator + (LevelDB or HawtDB for persistency) incorrect behavior

Hi all!

Using camel 2.12.5 problem is reproducible (same for latest 2.11, latest
2.13, latest 2.14, latest 2.15).
Using camel 2.10.7 (i.e. latest 2.10) problem is NOT reproducible.
Problem appears only if a LevelDB or HawtDB repo is used. In memory default
aggregator works fine.
Reproduced by unit test.
Route and unit tests in the end.

Repo definition:

HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1",
"target/data/hawtdb.dat");
repo.setDeadLetterUri(dlq.getEndpointUri());
repo.setMaximumRedeliveries(3);

Problems: 
1. Wrong number of aggregated messages (configured to fire for
completionSize=2 or timeout=10000 millis)
2. Error Handling of HawtDB/ LevelDB is not working (i.e. no retries, no
message goes to DLQ)

Any ideas??

Many thanks!!!

---------------------------

Unit test output regarding (1):

The unit test sends 4 messages and expects 2 aggregated messages (i.e. 2
incoming messages per group). But, aggregator outputs 4 messages. The last
two are NOT correct and have EMPTY header
"in.header.CamelAggregatedCompletedBy".

INFO  09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#info:96] [Timer
with FILE REF=1 cancelled]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Aggregation Completion reason=size]
INFO  09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#info:96] [Timer
with FILE REF=2 cancelled]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Aggregation Completion reason=size]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Ignore - Aggregation Completion Reason Not Expected=]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Ignore - Aggregation Completion Reason Not Expected=]

----------------------------

Route:

		LastMessageAggregationStrategy aggregationStratery = new
LastMessageAggregationStrategy();
		
		from(timerRouteFrom).routeId("timerRoute")
		    .aggregate(header(TimerRoute.FILE_REF_HEADER),
aggregationStratery).completionSize(2).completionTimeout(timeout).aggregationRepository(repo)
		    .choice()
                	.when(simple("${in.header.CamelAggregatedCompletedBy}
contains 'timeout'"))
                		.log(LoggingLevel.ERROR, timeoutMessage)
						.log(LoggingLevel.ERROR, "Timeout threshold (millis): " + timeout)
						.log(LoggingLevel.ERROR, "File Ref= ${in.header." + FILE_REF_HEADER
+"}")
						.log(LoggingLevel.DEBUG, "Sending to: " + timeoutUri)
						.setHeader("contentType", constant("text/html"))
						.setHeader(TIMEOUT_THRESHOLD_HEADER, constant(timeout))
			            .to(freeMarkerTemplate).id("freemarkerId")
			            .log(LoggingLevel.DEBUG, "Emailing Exchange body: ${body}")
			            .to(timeoutUri)
			        .when(simple("${in.header.CamelAggregatedCompletedBy} contains
'size'"))    
                		.log(LoggingLevel.INFO, "Timer with FILE REF=${in.header."
+ FILE_REF_HEADER +"} cancelled")
                		.log(LoggingLevel.DEBUG, "Aggregation Completion
reason=${in.header.CamelAggregatedCompletedBy}")
                		.to(successUri)
                	.otherwise()
                		.log(LoggingLevel.DEBUG, "Ignore - Aggregation Completion
Reason Not Expected=${in.header.CamelAggregatedCompletedBy}")
                	.end()
                .end();


	class LastMessageAggregationStrategy implements AggregationStrategy {
	 
	    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {      
	    	return newExchange;
	    }
	}

----------------------

Unit Test (aggregation size=2, so expecting 2 aggregated messages):

    public void happyPath() throws InterruptedException {
    	
        successUri.expectedMessageCount(2);
        timeoutUri.expectedMessageCount(0);
        
        Map<String, Object> headers1 = new HashMap<String, Object>();
        headers1.put(TimerRoute.FILE_REF_HEADER, "1");
        
        Map<String, Object> headers2 = new HashMap<String, Object>();
        headers2.put(TimerRoute.FILE_REF_HEADER, "2");
        
        for(int i=0;i<2;i++) 
        	fromTemplate.sendBodyAndHeaders("dummy body1..", headers1);
        
        for(int i=0;i<2;i++) 
        	fromTemplate.sendBodyAndHeaders("dummy body2..", headers2);

    	Thread.sleep(5000);
    	
        assertMockEndpointsSatisfied();
    	
    }

    public void testAggregatorRetries() throws Exception {
		
    	context.getRouteDefinition("timerRoute").adviceWith(context, new
AdviceWithRouteBuilder() {
            @Override
            public void configure() throws Exception {
                
                interceptSendToEndpoint(timeoutUri.getEndpointUri())
                	.skipSendToOriginalEndpoint()
                	.throwException(new IOException("This a simulated exception
Timeout!"));
                
                interceptSendToEndpoint(successUri.getEndpointUri())
            	.skipSendToOriginalEndpoint()
            	.throwException(new IOException("This a simulated exception
Success!")); 
            }
        });
    		
        successUri.expectedMessageCount(0);
        timeoutUri.expectedMessageCount(0);
        dlq.expectedMessageCount(1);
        
        Map<String, Object> headers1 = new HashMap<String, Object>();
        headers1.put(TimerRoute.FILE_REF_HEADER, "1");
        
        for(int i=0;i<2;i++) 
        	fromTemplate.sendBodyAndHeaders("dummy body1..", headers1);
        
    	Thread.sleep(5000);
    	
        assertMockEndpointsSatisfied();	
    }

	protected RouteBuilder createRouteBuilder() throws Exception {
		
		// create the repo
		HawtDBAggregationRepository repo = new
HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat");
		repo.setDeadLetterUri(dlq.getEndpointUri());
		repo.setMaximumRedeliveries(3);
		// create the route that will be tested
		TimerRoute routePutToTest = new TimerRoute("direct:from", "Expired!!!!" ,
repo, 10 * 1000, successUri.getEndpointUri(),
				timeoutUri.getEndpointUri());
		
		return routePutToTest;	
	}



	



--
View this message in context: http://camel.465427.n5.nabble.com/Aggregator-LevelDB-or-HawtDB-for-persistency-incorrect-behavior-tp5765524.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Aggregator + (LevelDB or HawtDB for persistency) incorrect behavior

Posted by onders <on...@gmail.com>.
btw, i tried both leveldb and hawtdb as aggregationrepository.

  
  <bean id="myRepo"
class="org.apache.camel.component.hawtdb.HawtDBAggregationRepository">
    
    <property name="persistentFileName" value="data/hawtdb.dat"/>
    
    <property name="repositoryName" value="myCoolRepo"/>
    
    <property name="bufferSize" value="602400"/>
  </bean>


or



	
<bean id="myRepo"
class="org.apache.camel.component.leveldb.LevelDBAggregationRepository">
    
    <property name="persistentFileName" value="target/data/leveldb.dat"/>
    
    <property name="repositoryName" value="repo2"/>
</bean>
	



--
View this message in context: http://camel.465427.n5.nabble.com/Aggregator-LevelDB-or-HawtDB-for-persistency-incorrect-behavior-tp5765524p5789837.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Aggregator + (LevelDB or HawtDB for persistency) incorrect behavior

Posted by onders <on...@gmail.com>.
Hi,

I am usign all camel core and components as of 2.17.3.
This issue seems to be still up.

Under examples/camel-example-aggregator i can test it is working.

as of route below; aggregation is not completed and not output file is
getting generated. (btw, if i do aggregate without persistence, it is
working.)

from("amq:queue:jms.myqueue")
			.startupOrder(initialStartupOrder)
			.shutdownRoute(ShutdownRoute.Defer)
							.aggregate(simple(myId.toString()),
						new AggregationStrategy() {

								@Override
								public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
								if (oldExchange == null) {
								newExchange.getIn().setBody(new
StringBuilder(newExchange.getIn().getBody(MyObject.class).asLine("|")));
								return newExchange;
								}
							
oldExchange.getIn().getBody(StringBuilder.class).append(newExchange.getIn().getBody(MyObject.class).asLine("|"));
								return oldExchange;
								}
						})
				.eagerCheckCompletion()
				.forceCompletionOnStop()
				.completeAllOnStop()
				.completionSize(500)
				.completionInterval(60000)
				.parallelProcessing(true)
				.aggregationRepository(myRepo)
				.to("file:myfile");



--
View this message in context: http://camel.465427.n5.nabble.com/Aggregator-LevelDB-or-HawtDB-for-persistency-incorrect-behavior-tp5765524p5789810.html
Sent from the Camel - Users mailing list archive at Nabble.com.