You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/03/31 16:32:45 UTC
svn commit: r1307815 [2/3] - in /camel/trunk: apache-camel/
apache-camel/src/main/descriptors/ components/ components/camel-leveldb/
components/camel-leveldb/src/ components/camel-leveldb/src/main/
components/camel-leveldb/src/main/java/ components/cam...
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelDBAggregateLoadAndRecoverTest extends CamelTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LevelDBAggregateLoadAndRecoverTest.class);
+ private static final int SIZE = 200;
+ private static AtomicInteger counter = new AtomicInteger();
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ super.setUp();
+ }
+
+ @Test
+ public void testLoadAndRecoverLevelDBAggregate() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(SIZE / 10);
+ mock.setResultWaitTime(50 * 1000);
+
+ LOG.info("Staring to send " + SIZE + " messages.");
+
+ for (int i = 0; i < SIZE; i++) {
+ final int value = 1;
+ char id = 'A';
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put("id", id);
+ headers.put("seq", i);
+ LOG.debug("Sending {} with id {}", value, id);
+ template.sendBodyAndHeaders("seda:start", value, headers);
+ // simulate a little delay
+ Thread.sleep(5);
+ }
+
+ LOG.info("Sending all " + SIZE + " message done. Now waiting for aggregation to complete.");
+
+ assertMockEndpointsSatisfied();
+
+ int recovered = 0;
+ for (Exchange exchange : mock.getReceivedExchanges()) {
+ if (exchange.getIn().getHeader(Exchange.REDELIVERED) != null) {
+ recovered++;
+ }
+ }
+ int expected = SIZE / 10 / 10;
+ assertEquals("There should be " + expected + " recovered", expected, recovered);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ LevelDBAggregationRepository repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+ repo.setUseRecovery(true);
+ // for faster unit testing
+ repo.setRecoveryInterval(500);
+
+ from("seda:start?size=" + SIZE)
+ .to("log:input?groupSize=500")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .aggregationRepository(repo)
+ .completionSize(10)
+ .to("log:output?showHeaders=true")
+ // have every 10th exchange fail which should then be recovered
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ int num = counter.incrementAndGet();
+ if (num % 10 == 0) {
+ throw new IllegalStateException("Failed for num " + num);
+ }
+ }
+ })
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ Integer body1 = oldExchange.getIn().getBody(Integer.class);
+ Integer body2 = newExchange.getIn().getBody(Integer.class);
+ int sum = body1 + body2;
+
+ oldExchange.getIn().setBody(sum);
+ return oldExchange;
+ }
+ }
+
+}
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadAndRecoverTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelDBAggregateLoadConcurrentTest extends CamelTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LevelDBAggregateLoadConcurrentTest.class);
+ private static final char[] KEYS = new char[]{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J'};
+ private static final int SIZE = 500;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ super.setUp();
+ }
+
+ @Test
+ public void testLoadTestLevelDBAggregate() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(10);
+ mock.setResultWaitTime(50 * 1000);
+
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+
+ LOG.info("Staring to send " + SIZE + " messages.");
+
+ for (int i = 0; i < SIZE; i++) {
+ final int value = 1;
+ final int key = i % 10;
+ executor.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ char id = KEYS[key];
+ LOG.debug("Sending {} with id {}", value, id);
+ template.sendBodyAndHeader("direct:start", value, "id", "" + id);
+ // simulate a little delay
+ Thread.sleep(3);
+ return null;
+ }
+ });
+ }
+
+ LOG.info("Sending all " + SIZE + " message done. Now waiting for aggregation to complete.");
+
+ assertMockEndpointsSatisfied();
+ executor.shutdownNow();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ LevelDBAggregationRepository repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+
+ from("direct:start")
+ .to("log:input?groupSize=500")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .aggregationRepository(repo)
+ .completionSize(SIZE / 10)
+ .to("log:output?showHeaders=true")
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ Integer body1 = oldExchange.getIn().getBody(Integer.class);
+ Integer body2 = newExchange.getIn().getBody(Integer.class);
+ int sum = body1 + body2;
+
+ oldExchange.getIn().setBody(sum);
+ return oldExchange;
+ }
+ }
+}
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadConcurrentTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LevelDBAggregateLoadTest extends CamelTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LevelDBAggregateLoadTest.class);
+ private static final int SIZE = 500;
+ private LevelDBAggregationRepository repo;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+ super.setUp();
+ }
+
+ @Test
+ public void testLoadTestLevelDBAggregate() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(1);
+ mock.setResultWaitTime(50 * 1000);
+
+ LOG.info("Staring to send " + SIZE + " messages.");
+
+ for (int i = 0; i < SIZE; i++) {
+ final int value = 1;
+ char id = 'A';
+ LOG.debug("Sending {} with id {}", value, id);
+ template.sendBodyAndHeader("seda:start?size=" + SIZE, value, "id", "" + id);
+ }
+
+ LOG.info("Sending all " + SIZE + " message done. Now waiting for aggregation to complete.");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:start?size=" + SIZE)
+ .to("log:input?groupSize=500")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .aggregationRepository(repo)
+ .completionSize(SIZE)
+ .to("log:output?showHeaders=true")
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ Integer body1 = oldExchange.getIn().getBody(Integer.class);
+ Integer body2 = newExchange.getIn().getBody(Integer.class);
+ int sum = body1 + body2;
+
+ oldExchange.getIn().setBody(sum);
+ return oldExchange;
+ }
+ }
+
+}
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateLoadTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.leveldb.LevelDBAggregationRepository.keyBuilder;
+
+public class LevelDBAggregateNotLostRemovedWhenConfirmedTest extends CamelTestSupport {
+
+ private LevelDBAggregationRepository repo;
+
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+ super.setUp();
+ }
+
+ @Test
+ public void testLevelDBAggregateNotLostRemovedWhenConfirmed() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+ assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+
+ Thread.sleep(1000);
+
+ String exchangeId = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId();
+
+ // the exchange should NOT be in the completed repo as it was confirmed
+ final LevelDBFile levelDBFile = repo.getLevelDBFile();
+ final LevelDBCamelCodec codec = new LevelDBCamelCodec();
+ byte[] bf = levelDBFile.getDb().get(keyBuilder("repo1-completed", exchangeId));
+
+ // assert the exchange was deleted
+ assertNull(bf);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(5).aggregationRepository(repo)
+ .log("aggregated exchange id ${exchangeId} with ${body}")
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.fusesource.hawtbuf.Buffer;
+import org.junit.Test;
+
+import static org.apache.camel.component.leveldb.LevelDBAggregationRepository.keyBuilder;
+
+public class LevelDBAggregateNotLostTest extends CamelTestSupport {
+
+ private LevelDBAggregationRepository repo;
+
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+ super.setUp();
+ }
+
+ @Test
+ public void testLevelDBAggregateNotLost() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedBodiesReceived("ABCDE");
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+ assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+
+ Thread.sleep(1000);
+
+ String exchangeId = getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId();
+
+ // the exchange should be in the completed repo where we should be able to find it
+ final LevelDBFile levelDBFile = repo.getLevelDBFile();
+ final LevelDBCamelCodec codec = new LevelDBCamelCodec();
+ byte[] bf = levelDBFile.getDb().get(keyBuilder("repo1-completed", exchangeId));
+
+ // assert the exchange was not lost and we got all the information still
+ assertNotNull(bf);
+ Exchange completed = codec.unmarshallExchange(context, new Buffer(bf));
+ assertNotNull(completed);
+ // should retain the exchange id
+ assertEquals(exchangeId, completed.getExchangeId());
+ assertEquals("ABCDE", completed.getIn().getBody());
+ assertEquals(123, completed.getIn().getHeader("id"));
+ assertEquals("size", completed.getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+ assertEquals(5, completed.getProperty(Exchange.AGGREGATED_SIZE));
+ // will store correlation keys as String
+ assertEquals("123", completed.getProperty(Exchange.AGGREGATED_CORRELATION_KEY));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(5).aggregationRepository(repo)
+ .log("aggregated exchange id ${exchangeId} with ${body}")
+ .to("mock:aggregated")
+ // throw an exception to fail, which we then will loose this message
+ .throwException(new IllegalArgumentException("Damn"))
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateRecoverDeadLetterChannelFailedTest extends CamelTestSupport {
+
+ private LevelDBAggregationRepository repo;
+
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+ // enable recovery
+ repo.setUseRecovery(true);
+ // exhaust after at most 2 attempts
+ repo.setMaximumRedeliveries(2);
+ // and move to this dead letter channel
+ repo.setDeadLetterUri("direct:dead");
+ // check faster
+ repo.setRecoveryInterval(1000, TimeUnit.MILLISECONDS);
+
+ super.setUp();
+ }
+
+ @Test
+ public void testLevelDBAggregateRecoverDeadLetterChannelFailed() throws Exception {
+ // should fail all times
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+ getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+ // it should keep sending to DLC if it failed, so test for min 2 attempts
+ getMockEndpoint("mock:dead").expectedMinimumMessageCount(2);
+ // all the details should be the same about redelivered and redelivered 2 times
+ getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+ getMockEndpoint("mock:dead").message(1).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+ getMockEndpoint("mock:dead").message(1).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+ assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+
+ // all the details should be the same about redelivered and redelivered 2 times
+ Exchange first = getMockEndpoint("mock:dead").getReceivedExchanges().get(0);
+ assertEquals(true, first.getIn().getHeader(Exchange.REDELIVERED));
+ assertEquals(2, first.getIn().getHeader(Exchange.REDELIVERY_COUNTER));
+
+ Exchange second = getMockEndpoint("mock:dead").getReceivedExchanges().get(1);
+ assertEquals(true, second.getIn().getHeader(Exchange.REDELIVERED));
+ assertEquals(2, first.getIn().getHeader(Exchange.REDELIVERY_COUNTER));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(5).aggregationRepository(repo)
+ .log("aggregated exchange id ${exchangeId} with ${body}")
+ .to("mock:aggregated")
+ .throwException(new IllegalArgumentException("Damn"))
+ .to("mock:result")
+ .end();
+
+ from("direct:dead")
+ .to("mock:dead")
+ .throwException(new IllegalArgumentException("We are dead"));
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelFailedTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateRecoverDeadLetterChannelTest extends CamelTestSupport {
+
+ private LevelDBAggregationRepository repo;
+
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+ // enable recovery
+ repo.setUseRecovery(true);
+ // exhaust after at most 3 attempts
+ repo.setMaximumRedeliveries(3);
+ // and move to this dead letter channel
+ repo.setDeadLetterUri("mock:dead");
+ // check faster
+ repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+
+ super.setUp();
+ }
+
+ @Test
+ public void testLevelDBAggregateRecoverDeadLetterChannel() throws Exception {
+ // should fail all times
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ getMockEndpoint("mock:aggregated").expectedMessageCount(4);
+ getMockEndpoint("mock:aggregated").message(0).header(Exchange.REDELIVERED).isNull();
+ getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERY_COUNTER).isEqualTo(1);
+ getMockEndpoint("mock:aggregated").message(1).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+ getMockEndpoint("mock:aggregated").message(2).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:aggregated").message(3).header(Exchange.REDELIVERY_MAX_COUNTER).isEqualTo(3);
+
+ getMockEndpoint("mock:dead").expectedBodiesReceived("ABCDE");
+ getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(5).aggregationRepository(repo)
+ .log("aggregated exchange id ${exchangeId} with ${body}")
+ .to("mock:aggregated")
+ .throwException(new IllegalArgumentException("Damn"))
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverDeadLetterChannelTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateRecoverTest extends CamelTestSupport {
+
+ private static AtomicInteger counter = new AtomicInteger(0);
+ private LevelDBAggregationRepository repo;
+
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+ // enable recovery
+ repo.setUseRecovery(true);
+ // check faster
+ repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+ super.setUp();
+ }
+
+ @Test
+ public void testLevelDBAggregateRecover() throws Exception {
+ // should fail the first 2 times and then recover
+ getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+ getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+ // should be marked as redelivered
+ getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ // on the 2nd redelivery attempt we success
+ getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+ assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(5).aggregationRepository(repo)
+ .log("aggregated exchange id ${exchangeId} with ${body}")
+ .to("mock:aggregated")
+ .delay(1000)
+ // simulate errors the first two times
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ int count = counter.incrementAndGet();
+ if (count <= 2) {
+ throw new IllegalArgumentException("Damn");
+ }
+ }
+ })
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateRecoverWithRedeliveryPolicyTest extends CamelTestSupport {
+
+ private static AtomicInteger counter = new AtomicInteger(0);
+ private LevelDBAggregationRepository repo;
+
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+ // enable recovery
+ repo.setUseRecovery(true);
+ // check faster
+ repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+ super.setUp();
+ }
+
+ @Test
+ public void testLevelDBAggregateRecover() throws Exception {
+ getMockEndpoint("mock:aggregated").setResultWaitTime(20000);
+ getMockEndpoint("mock:result").setResultWaitTime(20000);
+
+ // should fail the first 3 times and then recover
+ getMockEndpoint("mock:aggregated").expectedMessageCount(4);
+ getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+ // should be marked as redelivered
+ getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ // on the 2nd redelivery attempt we success
+ getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+ getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_MAX_COUNTER).isNull();
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+ assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(5).aggregationRepository(repo)
+ // this is the output from the aggregator
+ .log("aggregated exchange id ${exchangeId} with ${body}")
+ .to("mock:aggregated")
+ // simulate errors the first three times
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ int count = counter.incrementAndGet();
+ if (count <= 3) {
+ throw new IllegalArgumentException("Damn");
+ }
+ }
+ })
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithRedeliveryPolicyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateRecoverWithSedaTest extends CamelTestSupport {
+
+ private static AtomicInteger counter = new AtomicInteger(0);
+ private LevelDBAggregationRepository repo;
+
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+ // enable recovery
+ repo.setUseRecovery(true);
+ // check faster
+ repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+ super.setUp();
+ }
+
+ @Test
+ public void testLevelDBAggregateRecoverWithSeda() throws Exception {
+ // should fail the first 2 times and then recover
+ getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+ getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+ // should be marked as redelivered
+ getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ // on the 2nd redelivery attempt we success
+ getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+ assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(5).aggregationRepository(repo)
+ .log("aggregated exchange id ${exchangeId} with ${body}")
+ .to("mock:aggregated")
+ .to("seda:foo")
+ .end();
+
+ // should be able to recover when we send over SEDA as its a OnCompletion
+ // which confirms the exchange when its complete.
+ from("seda:foo")
+ .delay(1000)
+ // simulate errors the first two times
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ int count = counter.incrementAndGet();
+ if (count <= 2) {
+ throw new IllegalArgumentException("Damn");
+ }
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateRecoverWithSedaTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateTest extends CamelTestSupport {
+
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ super.setUp();
+ }
+
+ @Test
+ public void testLevelDBAggregate() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("ABCDE");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+ assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+
+ // from endpoint should be preserved
+ assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ // START SNIPPET: e1
+ public void configure() throws Exception {
+ // create the leveldb repo
+ LevelDBAggregationRepository repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+
+ // here is the Camel route where we aggregate
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ // use our created leveldb repo as aggregation repository
+ .completionSize(5).aggregationRepository(repo)
+ .to("mock:aggregated");
+ }
+ // END SNIPPET: e1
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregateTimeoutCompletionRestartTest extends CamelTestSupport {
+
+ @Override
+ public void setUp() throws Exception {
+ deleteDirectory("target/data");
+ super.setUp();
+ }
+
+ @Test
+ public void testLevelDBAggregateTimeoutCompletionRestart() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+
+ // stop Camel
+ context.stop();
+ assertEquals(0, mock.getReceivedCounter());
+
+ // start Camel again, and the timeout should trigger a completion
+ context.start();
+
+ mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("ABC");
+
+ assertMockEndpointsSatisfied();
+ assertEquals(1, mock.getReceivedCounter());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // create the leveldb repo
+ LevelDBAggregationRepository repo = new LevelDBAggregationRepository("repo1", "target/data/leveldb.dat");
+
+ // here is the Camel route where we aggregate
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ // use our created leveldb repo as aggregation repository
+ .completionTimeout(3000).aggregationRepository(repo)
+ .to("mock:aggregated");
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateTimeoutCompletionRestartTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregationRepositoryAlotDataTest extends CamelTestSupport {
+
+ private LevelDBFile levelDBFile;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ deleteDirectory("target/data");
+ File file = new File("target/data/leveldb.dat");
+ levelDBFile = new LevelDBFile();
+ levelDBFile.setFile(file);
+ levelDBFile.start();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ levelDBFile.stop();
+ super.tearDown();
+ }
+
+ @Test
+ public void testWithAlotOfDataSameKey() {
+ LevelDBAggregationRepository repo = new LevelDBAggregationRepository();
+ repo.setLevelDBFile(levelDBFile);
+ repo.setRepositoryName("repo1");
+
+ for (int i = 0; i < 100; i++) {
+ Exchange exchange1 = new DefaultExchange(context);
+ exchange1.getIn().setBody("counter:" + i);
+ repo.add(context, "foo", exchange1);
+ }
+
+ // Get it back..
+ Exchange actual = repo.get(context, "foo");
+ assertEquals("counter:99", actual.getIn().getBody());
+ }
+
+ @Test
+ public void testWithAlotOfDataTwoKesy() {
+ LevelDBAggregationRepository repo = new LevelDBAggregationRepository();
+ repo.setLevelDBFile(levelDBFile);
+ repo.setRepositoryName("repo1");
+
+ for (int i = 0; i < 100; i++) {
+ Exchange exchange1 = new DefaultExchange(context);
+ exchange1.getIn().setBody("counter:" + i);
+ String key = i % 2 == 0 ? "foo" : "bar";
+ repo.add(context, key, exchange1);
+ }
+
+ // Get it back..
+ Exchange actual = repo.get(context, "foo");
+ assertEquals("counter:98", actual.getIn().getBody());
+
+ actual = repo.get(context, "bar");
+ assertEquals("counter:99", actual.getIn().getBody());
+ }
+
+ @Test
+ public void testWithAlotOfDataWithDifferentKesy() {
+ LevelDBAggregationRepository repo = new LevelDBAggregationRepository();
+ repo.setLevelDBFile(levelDBFile);
+ repo.setRepositoryName("repo1");
+
+ for (int i = 0; i < 100; i++) {
+ Exchange exchange1 = new DefaultExchange(context);
+ exchange1.getIn().setBody("counter:" + i);
+ String key = "key" + i;
+ repo.add(context, key, exchange1);
+ }
+
+ // Get it back..
+ for (int i = 0; i < 100; i++) {
+ Exchange actual = repo.get(context, "key" + i);
+ assertEquals("counter:" + i, actual.getIn().getBody());
+ }
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryAlotDataTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregationRepositoryLoadExistingTest extends CamelTestSupport {
+
+ private LevelDBFile levelDBFile;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ deleteDirectory("target/data");
+ File file = new File("target/data/leveldb.dat");
+ levelDBFile = new LevelDBFile();
+ levelDBFile.setFile(file);
+ levelDBFile.start();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ levelDBFile.stop();
+ super.tearDown();
+ }
+
+ @Test
+ public void testExisting() throws Exception {
+ LevelDBAggregationRepository repo = new LevelDBAggregationRepository();
+ repo.setLevelDBFile(levelDBFile);
+ repo.setRepositoryName("repo1");
+ repo.setReturnOldExchange(true);
+
+ // Store it..
+ Exchange exchange1 = new DefaultExchange(context);
+ exchange1.getIn().setBody("counter:1");
+ Exchange actual = repo.add(context, "foo", exchange1);
+ assertEquals(null, actual);
+
+ // stop the repo
+ levelDBFile.stop();
+
+ Thread.sleep(1000);
+
+ // load the repo again
+ levelDBFile.start();
+
+ // Get it back..
+ actual = repo.get(context, "foo");
+ assertEquals("counter:1", actual.getIn().getBody());
+
+ // Change it..
+ Exchange exchange2 = new DefaultExchange(context);
+ exchange2.getIn().setBody("counter:2");
+ actual = repo.add(context, "foo", exchange2);
+ // the old one
+ assertEquals("counter:1", actual.getIn().getBody());
+
+ // Get it back..
+ actual = repo.get(context, "foo");
+ assertEquals("counter:2", actual.getIn().getBody());
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryLoadExistingTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregationRepositoryMultipleRepoTest extends CamelTestSupport {
+
+ private LevelDBFile levelDBFile;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ deleteDirectory("target/data");
+ File file = new File("target/data/leveldb.dat");
+ levelDBFile = new LevelDBFile();
+ levelDBFile.setFile(file);
+ levelDBFile.start();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ levelDBFile.stop();
+ super.tearDown();
+ }
+
+ @Test
+ public void testMultipeRepo() {
+ LevelDBAggregationRepository repo1 = new LevelDBAggregationRepository();
+ repo1.setLevelDBFile(levelDBFile);
+ repo1.setRepositoryName("repo1");
+ repo1.setReturnOldExchange(true);
+
+ LevelDBAggregationRepository repo2 = new LevelDBAggregationRepository();
+ repo2.setLevelDBFile(levelDBFile);
+ repo2.setRepositoryName("repo2");
+ repo2.setReturnOldExchange(true);
+
+ // Can't get something we have not put in...
+ Exchange actual = repo1.get(context, "missing");
+ assertEquals(null, actual);
+
+ actual = repo2.get(context, "missing");
+ assertEquals(null, actual);
+
+ // Store it..
+ Exchange exchange1 = new DefaultExchange(context);
+ exchange1.getIn().setBody("counter:1");
+ actual = repo1.add(context, "foo", exchange1);
+ assertEquals(null, actual);
+
+ // Get it back..
+ actual = repo1.get(context, "foo");
+ assertEquals("counter:1", actual.getIn().getBody());
+ assertEquals(null, repo2.get(context, "foo"));
+
+ // Change it..
+ Exchange exchange2 = new DefaultExchange(context);
+ exchange2.getIn().setBody("counter:2");
+ actual = repo1.add(context, "foo", exchange2);
+ // the old one
+ assertEquals("counter:1", actual.getIn().getBody());
+
+ // add to repo2
+ Exchange exchange3 = new DefaultExchange(context);
+ exchange3.getIn().setBody("Hello World");
+ actual = repo2.add(context, "bar", exchange3);
+ assertEquals(null, actual);
+ assertEquals(null, repo1.get(context, "bar"));
+
+ // Get it back..
+ actual = repo1.get(context, "foo");
+ assertEquals("counter:2", actual.getIn().getBody());
+ assertEquals(null, repo2.get(context, "foo"));
+
+ actual = repo2.get(context, "bar");
+ assertEquals("Hello World", actual.getIn().getBody());
+ assertEquals(null, repo1.get(context, "bar"));
+ }
+
+ @Test
+ public void testMultipeRepoSameKeyDifferentContent() {
+ LevelDBAggregationRepository repo1 = new LevelDBAggregationRepository();
+ repo1.setLevelDBFile(levelDBFile);
+ repo1.setRepositoryName("repo1");
+
+ LevelDBAggregationRepository repo2 = new LevelDBAggregationRepository();
+ repo2.setLevelDBFile(levelDBFile);
+ repo2.setRepositoryName("repo2");
+
+ Exchange exchange1 = new DefaultExchange(context);
+ exchange1.getIn().setBody("Hello World");
+ repo1.add(context, "foo", exchange1);
+
+ Exchange exchange2 = new DefaultExchange(context);
+ exchange2.getIn().setBody("Bye World");
+ repo2.add(context, "foo", exchange2);
+
+ Exchange actual = repo1.get(context, "foo");
+ assertEquals("Hello World", actual.getIn().getBody());
+ actual = repo2.get(context, "foo");
+ assertEquals("Bye World", actual.getIn().getBody());
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryMultipleRepoTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java?rev=1307815&view=auto
==============================================================================
--- camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java (added)
+++ camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java Sat Mar 31 14:32:43 2012
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.leveldb;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class LevelDBAggregationRepositoryRecoverExistingTest extends CamelTestSupport {
+
+ private LevelDBFile levelDBFile;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ deleteDirectory("target/data");
+ File file = new File("target/data/leveldb.dat");
+ levelDBFile = new LevelDBFile();
+ levelDBFile.setFile(file);
+ }
+
+ @Test
+ public void testExisting() throws Exception {
+ LevelDBAggregationRepository repo = new LevelDBAggregationRepository();
+ repo.setLevelDBFile(levelDBFile);
+ repo.setRepositoryName("repo1");
+ repo.setReturnOldExchange(true);
+ repo.setUseRecovery(true);
+ repo.start();
+
+ // Store it..
+ Exchange exchange1 = new DefaultExchange(context);
+ exchange1.getIn().setBody("counter:1");
+ Exchange actual = repo.add(context, "foo", exchange1);
+ assertEquals(null, actual);
+
+ // Remove it, which makes it in the pre confirm stage
+ repo.remove(context, "foo", exchange1);
+
+ String id = exchange1.getExchangeId();
+
+ // stop the repo
+ repo.stop();
+
+ Thread.sleep(1000);
+
+ // load the repo again
+ repo.start();
+
+ // Get it back..
+ actual = repo.get(context, "foo");
+ assertNull(actual);
+
+ // Recover it
+ actual = repo.recover(context, id);
+ assertNotNull(actual);
+ assertEquals("counter:1", actual.getIn().getBody());
+
+ repo.stop();
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregationRepositoryRecoverExistingTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date