You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/03/31 16:32:45 UTC

svn commit: r1307815 [2/3] - in /camel/trunk: apache-camel/ apache-camel/src/main/descriptors/ components/ components/camel-leveldb/ components/camel-leveldb/src/ components/camel-leveldb/src/main/ components/camel-leveldb/src/main/java/ components/cam...

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelDBAggregateLoadAndRecoverTest extends CamelTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LevelDBAggregateLoadAndRecoverTest.class);
+    private static final int SIZE = 200;
+    private static AtomicInteger counter = new AtomicInteger();
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        super.setUp();
+    }
+
+    @Test
+    public void testLoadAndRecoverLevelDBAggregate() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(SIZE / 10);
+        mock.setResultWaitTime(50 * 1000);
+
+        LOG.info("Staring to send " + SIZE + " messages.");
+
+        for (int i = 0; i < SIZE; i++) {
+            final int value = 1;
+            char id = 'A';
+            Map<String, Object> headers = new HashMap<String, Object>();
+            headers.put("id", id);
+            headers.put("seq", i);
+            LOG.debug("Sending {} with id {}", value, id);
+            template.sendBodyAndHeaders("seda:start", value, headers);
+            // simulate a little delay
+            Thread.sleep(5);
+        }
+
+        LOG.info("Sending all " + SIZE + " message done. Now waiting for aggregation to complete.");
+
+        assertMockEndpointsSatisfied();
+
+        int recovered = 0;
+        for (Exchange exchange : mock.getReceivedExchanges()) {
+            if (exchange.getIn().getHeader(Exchange.REDELIVERED) != null) {
+                recovered++;
+            }
+        }
+        int expected = SIZE / 10 / 10;
+        assertEquals("There should be " + expected + " recovered", expected, recovered);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                LevelDBAggregationRepository repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+                repo.setUseRecovery(true);
+                // for faster unit testing
+                repo.setRecoveryInterval(500);
+
+                from("seda:start?size=" + SIZE)
+                    .to("log:input?groupSize=500")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .aggregationRepository(repo)
+                        .completionSize(10)
+                        .to("log:output?showHeaders=true")
+                        // have every 10th exchange fail which should then be recovered
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                int num = counter.incrementAndGet();
+                                if (num % 10 == 0) {
+                                    throw new IllegalStateException("Failed for num " + num);
+                                }
+                            }
+                        })
+                        .to("mock:result")
+                    .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            Integer body1 = oldExchange.getIn().getBody(Integer.class);
+            Integer body2 = newExchange.getIn().getBody(Integer.class);
+            int sum = body1 + body2;
+
+            oldExchange.getIn().setBody(sum);
+            return oldExchange;
+        }
+    }
+
+}

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelDBAggregateLoadConcurrentTest extends CamelTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LevelDBAggregateLoadConcurrentTest.class);
+    private static final char[] KEYS = new char[]{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J'};
+    private static final int SIZE = 500;
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        super.setUp();
+    }
+
+    @Test
+    public void testLoadTestLevelDBAggregate() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(10);
+        mock.setResultWaitTime(50 * 1000);
+
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+
+        LOG.info("Staring to send " + SIZE + " messages.");
+
+        for (int i = 0; i < SIZE; i++) {
+            final int value = 1;
+            final int key = i % 10;
+            executor.submit(new Callable<Object>() {
+                public Object call() throws Exception {
+                    char id = KEYS[key];
+                    LOG.debug("Sending {} with id {}", value, id);
+                    template.sendBodyAndHeader("direct:start", value, "id", "" + id);
+                    // simulate a little delay
+                    Thread.sleep(3);
+                    return null;
+                }
+            });
+        }
+
+        LOG.info("Sending all " + SIZE + " message done. Now waiting for aggregation to complete.");
+
+        assertMockEndpointsSatisfied();
+        executor.shutdownNow();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                LevelDBAggregationRepository repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+
+                from("direct:start")
+                    .to("log:input?groupSize=500")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .aggregationRepository(repo)
+                        .completionSize(SIZE / 10)
+                        .to("log:output?showHeaders=true")
+                        .to("mock:result")
+                    .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            Integer body1 = oldExchange.getIn().getBody(Integer.class);
+            Integer body2 = newExchange.getIn().getBody(Integer.class);
+            int sum = body1 + body2;
+
+            oldExchange.getIn().setBody(sum);
+            return oldExchange;
+        }
+    }
+}

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelDBAggregateLoadTest extends CamelTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LevelDBAggregateLoadTest.class);
+    private static final int SIZE = 500;
+    private LevelDBAggregationRepository repo;
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+        super.setUp();
+    }
+
+    @Test
+    public void testLoadTestLevelDBAggregate() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(1);
+        mock.setResultWaitTime(50 * 1000);
+
+        LOG.info("Staring to send " + SIZE + " messages.");
+
+        for (int i = 0; i < SIZE; i++) {
+            final int value = 1;
+            char id = 'A';
+            LOG.debug("Sending {} with id {}", value, id);
+            template.sendBodyAndHeader("seda:start?size=" + SIZE, value, "id", "" + id);
+        }
+
+        LOG.info("Sending all " + SIZE + " message done. Now waiting for aggregation to complete.");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start?size=" + SIZE)
+                    .to("log:input?groupSize=500")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .aggregationRepository(repo)
+                        .completionSize(SIZE)
+                        .to("log:output?showHeaders=true")
+                        .to("mock:result")
+                    .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            Integer body1 = oldExchange.getIn().getBody(Integer.class);
+            Integer body2 = newExchange.getIn().getBody(Integer.class);
+            int sum = body1 + body2;
+
+            oldExchange.getIn().setBody(sum);
+            return oldExchange;
+        }
+    }
+
+}

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.leveldb.LevelDBAggregationRepository.keyBuilder;
+
+public class LevelDBAggregateNotLostRemovedWhenConfirmedTest extends CamelTestSupport {
+
+    private LevelDBAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+        super.setUp();
+    }
+
+    @Test
+    public void testLevelDBAggregateNotLostRemovedWhenConfirmed() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+
+        Thread.sleep(1000);
+
+        String exchangeId = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId();
+
+        // the exchange should NOT be in the completed repo as it was confirmed
+        final LevelDBFile levelDBFile = repo.getLevelDBFile();
+        final LevelDBCamelCodec codec = new LevelDBCamelCodec();
+        byte[] bf = levelDBFile.getDb().get(keyBuilder("repo1-completed", exchangeId));
+
+        // assert the exchange was deleted
+        assertNull(bf);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:result")
+                    .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.fusesource.hawtbuf.Buffer;
+import org.junit.Test;
+
+import static org.apache.camel.component.leveldb.LevelDBAggregationRepository.keyBuilder;
+
+public class LevelDBAggregateNotLostTest extends CamelTestSupport {
+
+    private LevelDBAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+        super.setUp();
+    }
+
+    @Test
+    public void testLevelDBAggregateNotLost() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("ABCDE");
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+
+        Thread.sleep(1000);
+
+        String exchangeId = getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId();
+
+        // the exchange should be in the completed repo where we should be able to find it
+        final LevelDBFile levelDBFile = repo.getLevelDBFile();
+        final LevelDBCamelCodec codec = new LevelDBCamelCodec();
+        byte[] bf = levelDBFile.getDb().get(keyBuilder("repo1-completed", exchangeId));
+
+        // assert the exchange was not lost and we got all the information still
+        assertNotNull(bf);
+        Exchange completed = codec.unmarshallExchange(context, new Buffer(bf));
+        assertNotNull(completed);
+        // should retain the exchange id
+        assertEquals(exchangeId, completed.getExchangeId());
+        assertEquals("ABCDE", completed.getIn().getBody());
+        assertEquals(123, completed.getIn().getHeader("id"));
+        assertEquals("size", completed.getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+        assertEquals(5, completed.getProperty(Exchange.AGGREGATED_SIZE));
+        // will store correlation keys as String
+        assertEquals("123", completed.getProperty(Exchange.AGGREGATED_CORRELATION_KEY));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                        // throw an exception to fail, which we then will loose this message
+                        .throwException(new IllegalArgumentException("Damn"))
+                        .to("mock:result")
+                    .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateRecoverDeadLetterChannelFailedTest extends CamelTestSupport {
+
+    private LevelDBAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+        // enable recovery
+        repo.setUseRecovery(true);
+        // exhaust after at most 2 attempts
+        repo.setMaximumRedeliveries(2);
+        // and move to this dead letter channel
+        repo.setDeadLetterUri("direct:dead");
+        // check faster
+        repo.setRecoveryInterval(1000, TimeUnit.MILLISECONDS);
+
+        super.setUp();
+    }
+
+    @Test
+    public void testLevelDBAggregateRecoverDeadLetterChannelFailed() throws Exception {
+        // should fail all times
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        // it should keep sending to DLC if it failed, so test for min 2 attempts
+        getMockEndpoint("mock:dead").expectedMinimumMessageCount(2);
+        // all the details should be the same about redelivered and redelivered 2 times
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+        getMockEndpoint("mock:dead").message(1).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+        getMockEndpoint("mock:dead").message(1).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+
+        // all the details should be the same about redelivered and redelivered 2 times
+        Exchange first = getMockEndpoint("mock:dead").getReceivedExchanges().get(0);
+        assertEquals(true, first.getIn().getHeader(Exchange.REDELIVERED));
+        assertEquals(2, first.getIn().getHeader(Exchange.REDELIVERY_COUNTER));
+
+        Exchange second = getMockEndpoint("mock:dead").getReceivedExchanges().get(1);
+        assertEquals(true, second.getIn().getHeader(Exchange.REDELIVERED));
+        assertEquals(2, first.getIn().getHeader(Exchange.REDELIVERY_COUNTER));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                        .throwException(new IllegalArgumentException("Damn"))
+                        .to("mock:result")
+                    .end();
+
+                from("direct:dead")
+                    .to("mock:dead")
+                    .throwException(new IllegalArgumentException("We are dead"));
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateRecoverDeadLetterChannelTest extends CamelTestSupport {
+
+    private LevelDBAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+        // enable recovery
+        repo.setUseRecovery(true);
+        // exhaust after at most 3 attempts
+        repo.setMaximumRedeliveries(3);
+        // and move to this dead letter channel
+        repo.setDeadLetterUri("mock:dead");
+        // check faster
+        repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+
+        super.setUp();
+    }
+
+    @Test
+    public void testLevelDBAggregateRecoverDeadLetterChannel() throws Exception {
+        // should fail all times
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(4);
+        getMockEndpoint("mock:aggregated").message(0).header(Exchange.REDELIVERED).isNull();
+        getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERY_COUNTER).isEqualTo(1);
+        getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+        getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+
+        getMockEndpoint("mock:dead").expectedBodiesReceived("ABCDE");
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                        .throwException(new IllegalArgumentException("Damn"))
+                        .to("mock:result")
+                    .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateRecoverTest extends CamelTestSupport {
+
+    private static AtomicInteger counter = new AtomicInteger(0);
+    private LevelDBAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+        // enable recovery
+        repo.setUseRecovery(true);
+        // check faster
+        repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+        super.setUp();
+    }
+
+    @Test
+    public void testLevelDBAggregateRecover() throws Exception {
+        // should fail the first 2 times and then recover
+        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+        // should be marked as redelivered
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        // on the 2nd redelivery attempt we success
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                        .delay(1000)
+                        // simulate errors the first two times
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                int count = counter.incrementAndGet();
+                                if (count <= 2) {
+                                    throw new IllegalArgumentException("Damn");
+                                }
+                            }
+                        })
+                        .to("mock:result")
+                    .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateRecoverWithRedeliveryPolicyTest extends CamelTestSupport {
+
+    private static AtomicInteger counter = new AtomicInteger(0);
+    private LevelDBAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+        // enable recovery
+        repo.setUseRecovery(true);
+        // check faster
+        repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+        super.setUp();
+    }
+
+    @Test
+    public void testLevelDBAggregateRecover() throws Exception {
+        getMockEndpoint("mock:aggregated").setResultWaitTime(20000);
+        getMockEndpoint("mock:result").setResultWaitTime(20000);
+        
+        // should fail the first 3 times and then recover
+        getMockEndpoint("mock:aggregated").expectedMessageCount(4);
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+        // should be marked as redelivered
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        // on the 2nd redelivery attempt we success
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        // this is the output from the aggregator
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                        // simulate errors the first three times
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                int count = counter.incrementAndGet();
+                                if (count <= 3) {
+                                    throw new IllegalArgumentException("Damn");
+                                }
+                            }
+                        })
+                        .to("mock:result")
+                    .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateRecoverWithSedaTest extends CamelTestSupport {
+
+    private static AtomicInteger counter = new AtomicInteger(0);
+    private LevelDBAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+        // enable recovery
+        repo.setUseRecovery(true);
+        // check faster
+        repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+        super.setUp();
+    }
+
+    @Test
+    public void testLevelDBAggregateRecoverWithSeda() throws Exception {
+        // should fail the first 2 times and then recover
+        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+        // should be marked as redelivered
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        // on the 2nd redelivery attempt we success
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                        .to("seda:foo")
+                    .end();
+
+                // should be able to recover when we send over SEDA as its a OnCompletion
+                // which confirms the exchange when its complete.
+                from("seda:foo")
+                    .delay(1000)
+                    // simulate errors the first two times
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            int count = counter.incrementAndGet();
+                            if (count <= 2) {
+                                throw new IllegalArgumentException("Damn");
+                            }
+                        }
+                    })
+                    .to("mock:result");
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateTest extends CamelTestSupport {
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        super.setUp();
+    }
+
+    @Test
+    public void testLevelDBAggregate() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("ABCDE");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+
+        // from endpoint should be preserved
+        assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            // START SNIPPET: e1
+            public void configure() throws Exception {
+                // create the leveldb repo
+                LevelDBAggregationRepository repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+
+                // here is the Camel route where we aggregate
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        // use our created leveldb repo as aggregation repository
+                        .completionSize(5).aggregationRepository(repo)
+                        .to("mock:aggregated");
+            }
+            // END SNIPPET: e1
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateTimeoutCompletionRestartTest extends CamelTestSupport {
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        super.setUp();
+    }
+
+    @Test
+    public void testLevelDBAggregateTimeoutCompletionRestart() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+
+        // stop Camel
+        context.stop();
+        assertEquals(0, mock.getReceivedCounter());
+
+        // start Camel again, and the timeout should trigger a completion
+        context.start();
+
+        mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("ABC");
+
+        assertMockEndpointsSatisfied();
+        assertEquals(1, mock.getReceivedCounter());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // create the leveldb repo
+                LevelDBAggregationRepository repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+
+                // here is the Camel route where we aggregate
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        // use our created leveldb repo as aggregation repository
+                        .completionTimeout(3000).aggregationRepository(repo)
+                        .to("mock:aggregated");
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregationRepositoryAlotDataTest extends CamelTestSupport {
+
+    private LevelDBFile levelDBFile;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("target/data");
+        File file = new File("target/data/leveldb.dat");
+        levelDBFile = new LevelDBFile();
+        levelDBFile.setFile(file);
+        levelDBFile.start();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        levelDBFile.stop();
+        super.tearDown();
+    }
+
+    @Test
+    public void testWithAlotOfDataSameKey() {
+        LevelDBAggregationRepository repo = new LevelDBAggregationRepository();
+        repo.setLevelDBFile(levelDBFile);
+        repo.setRepositoryName("repo1");
+
+        for (int i = 0; i < 100; i++) {
+            Exchange exchange1 = new DefaultExchange(context);
+            exchange1.getIn().setBody("counter:" + i);
+            repo.add(context, "foo", exchange1);
+        }
+
+        // Get it back..
+        Exchange actual = repo.get(context, "foo");
+        assertEquals("counter:99", actual.getIn().getBody());
+    }
+
+    @Test
+    public void testWithAlotOfDataTwoKesy() {
+        LevelDBAggregationRepository repo = new LevelDBAggregationRepository();
+        repo.setLevelDBFile(levelDBFile);
+        repo.setRepositoryName("repo1");
+
+        for (int i = 0; i < 100; i++) {
+            Exchange exchange1 = new DefaultExchange(context);
+            exchange1.getIn().setBody("counter:" + i);
+            String key = i % 2 == 0 ? "foo" : "bar";
+            repo.add(context, key, exchange1);
+        }
+
+        // Get it back..
+        Exchange actual = repo.get(context, "foo");
+        assertEquals("counter:98", actual.getIn().getBody());
+
+        actual = repo.get(context, "bar");
+        assertEquals("counter:99", actual.getIn().getBody());
+    }
+
+    @Test
+    public void testWithAlotOfDataWithDifferentKesy() {
+        LevelDBAggregationRepository repo = new LevelDBAggregationRepository();
+        repo.setLevelDBFile(levelDBFile);
+        repo.setRepositoryName("repo1");
+
+        for (int i = 0; i < 100; i++) {
+            Exchange exchange1 = new DefaultExchange(context);
+            exchange1.getIn().setBody("counter:" + i);
+            String key = "key" + i;
+            repo.add(context, key, exchange1);
+        }
+
+        // Get it back..
+        for (int i = 0; i < 100; i++) {
+            Exchange actual = repo.get(context, "key" + i);
+            assertEquals("counter:" + i, actual.getIn().getBody());
+        }
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregationRepositoryLoadExistingTest extends CamelTestSupport {
+
+    private LevelDBFile levelDBFile;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("target/data");
+        File file = new File("target/data/leveldb.dat");
+        levelDBFile = new LevelDBFile();
+        levelDBFile.setFile(file);
+        levelDBFile.start();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        levelDBFile.stop();
+        super.tearDown();
+    }
+
+    @Test
+    public void testExisting() throws Exception {
+        LevelDBAggregationRepository repo = new LevelDBAggregationRepository();
+        repo.setLevelDBFile(levelDBFile);
+        repo.setRepositoryName("repo1");
+        repo.setReturnOldExchange(true);
+
+        // Store it..
+        Exchange exchange1 = new DefaultExchange(context);
+        exchange1.getIn().setBody("counter:1");
+        Exchange actual = repo.add(context, "foo", exchange1);
+        assertEquals(null, actual);
+
+        // stop the repo
+        levelDBFile.stop();
+
+        Thread.sleep(1000);
+
+        // load the repo again
+        levelDBFile.start();
+
+        // Get it back..
+        actual = repo.get(context, "foo");
+        assertEquals("counter:1", actual.getIn().getBody());
+
+        // Change it..
+        Exchange exchange2 = new DefaultExchange(context);
+        exchange2.getIn().setBody("counter:2");
+        actual = repo.add(context, "foo", exchange2);
+        // the old one
+        assertEquals("counter:1", actual.getIn().getBody());
+
+        // Get it back..
+        actual = repo.get(context, "foo");
+        assertEquals("counter:2", actual.getIn().getBody());
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregationRepositoryMultipleRepoTest extends CamelTestSupport {
+
+    private LevelDBFile levelDBFile;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("target/data");
+        File file = new File("target/data/leveldb.dat");
+        levelDBFile = new LevelDBFile();
+        levelDBFile.setFile(file);
+        levelDBFile.start();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        levelDBFile.stop();
+        super.tearDown();
+    }
+
+    @Test
+    public void testMultipeRepo() {
+        LevelDBAggregationRepository repo1 = new LevelDBAggregationRepository();
+        repo1.setLevelDBFile(levelDBFile);
+        repo1.setRepositoryName("repo1");
+        repo1.setReturnOldExchange(true);
+
+        LevelDBAggregationRepository repo2 = new LevelDBAggregationRepository();
+        repo2.setLevelDBFile(levelDBFile);
+        repo2.setRepositoryName("repo2");
+        repo2.setReturnOldExchange(true);
+
+        // Can't get something we have not put in...
+        Exchange actual = repo1.get(context, "missing");
+        assertEquals(null, actual);
+
+        actual = repo2.get(context, "missing");
+        assertEquals(null, actual);
+
+        // Store it..
+        Exchange exchange1 = new DefaultExchange(context);
+        exchange1.getIn().setBody("counter:1");
+        actual = repo1.add(context, "foo", exchange1);
+        assertEquals(null, actual);
+
+        // Get it back..
+        actual = repo1.get(context, "foo");
+        assertEquals("counter:1", actual.getIn().getBody());
+        assertEquals(null, repo2.get(context, "foo"));
+
+        // Change it..
+        Exchange exchange2 = new DefaultExchange(context);
+        exchange2.getIn().setBody("counter:2");
+        actual = repo1.add(context, "foo", exchange2);
+        // the old one
+        assertEquals("counter:1", actual.getIn().getBody());
+
+        // add to repo2
+        Exchange exchange3 = new DefaultExchange(context);
+        exchange3.getIn().setBody("Hello World");
+        actual = repo2.add(context, "bar", exchange3);
+        assertEquals(null, actual);
+        assertEquals(null, repo1.get(context, "bar"));
+
+        // Get it back..
+        actual = repo1.get(context, "foo");
+        assertEquals("counter:2", actual.getIn().getBody());
+        assertEquals(null, repo2.get(context, "foo"));
+
+        actual = repo2.get(context, "bar");
+        assertEquals("Hello World", actual.getIn().getBody());
+        assertEquals(null, repo1.get(context, "bar"));
+    }
+
+    @Test
+    public void testMultipeRepoSameKeyDifferentContent() {
+        LevelDBAggregationRepository repo1 = new LevelDBAggregationRepository();
+        repo1.setLevelDBFile(levelDBFile);
+        repo1.setRepositoryName("repo1");
+
+        LevelDBAggregationRepository repo2 = new LevelDBAggregationRepository();
+        repo2.setLevelDBFile(levelDBFile);
+        repo2.setRepositoryName("repo2");
+
+        Exchange exchange1 = new DefaultExchange(context);
+        exchange1.getIn().setBody("Hello World");
+        repo1.add(context, "foo", exchange1);
+
+        Exchange exchange2 = new DefaultExchange(context);
+        exchange2.getIn().setBody("Bye World");
+        repo2.add(context, "foo", exchange2);
+
+        Exchange actual = repo1.get(context, "foo");
+        assertEquals("Hello World", actual.getIn().getBody());
+        actual = repo2.get(context, "foo");
+        assertEquals("Bye World", actual.getIn().getBody());
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregationRepositoryRecoverExistingTest extends CamelTestSupport {
+
+    private LevelDBFile levelDBFile;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("target/data");
+        File file = new File("target/data/leveldb.dat");
+        levelDBFile = new LevelDBFile();
+        levelDBFile.setFile(file);
+    }
+
+    @Test
+    public void testExisting() throws Exception {
+        LevelDBAggregationRepository repo = new LevelDBAggregationRepository();
+        repo.setLevelDBFile(levelDBFile);
+        repo.setRepositoryName("repo1");
+        repo.setReturnOldExchange(true);
+        repo.setUseRecovery(true);
+        repo.start();
+
+        // Store it..
+        Exchange exchange1 = new DefaultExchange(context);
+        exchange1.getIn().setBody("counter:1");
+        Exchange actual = repo.add(context, "foo", exchange1);
+        assertEquals(null, actual);
+
+        // Remove it, which makes it in the pre confirm stage
+        repo.remove(context, "foo", exchange1);
+
+        String id = exchange1.getExchangeId();
+
+        // stop the repo
+        repo.stop();
+
+        Thread.sleep(1000);
+
+        // load the repo again
+        repo.start();
+
+        // Get it back..
+        actual = repo.get(context, "foo");
+        assertNull(actual);
+
+        // Recover it
+        actual = repo.recover(context, id);
+        assertNotNull(actual);
+        assertEquals("counter:1", actual.getIn().getBody());
+
+        repo.stop();
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date