You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/24 10:00:59 UTC
svn commit: r915726 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/processor/aggregate/
camel-core/src/main/java/org/apache/camel/spi/
components/camel-hawtdb/src/main/java/org/apache/camel/...
Author: davsclaus
Date: Wed Feb 24 09:00:58 2010
New Revision: 915726
URL: http://svn.apache.org/viewvc?rev=915726&view=rev
Log:
CAMEL-217: Integrated persistence to Aggregator EIP using camel-hawtdb.
Added:
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java
- copied, changed from r915717, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBExchangeSerializationTest.java
- copied, changed from r915717, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryAlotDataTest.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryMultipleRepoTest.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java Wed Feb 24 09:00:58 2010
@@ -120,6 +120,26 @@
exchange.setException(payload.exception);
}
+ /**
+ * Adds a property to the payload.
+ * <p/>
+ * This can be done in special situations where additional information must be added which was not provided
+ * from the source.
+ *
+ * @param payload the serialized payload
+ * @param key the property key to add
+ * @param property the property value to add
+ */
+ public static void addProperty(DefaultExchangeHolder payload, String key, Serializable property) {
+ if (key == null || property == null) {
+ return;
+ }
+ if (payload.properties == null) {
+ payload.properties = new LinkedHashMap<String, Object>();
+ }
+ payload.properties.put(key, property);
+ }
+
public String toString() {
StringBuilder sb = new StringBuilder("DefaultExchangeHolder[");
sb.append("inBody=").append(inBody).append(", outBody=").append(outBody);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Feb 24 09:00:58 2010
@@ -165,7 +165,7 @@
}
Exchange answer;
- Exchange oldExchange = aggregationRepository.get(key);
+ Exchange oldExchange = aggregationRepository.get(exchange.getContext(), key);
Exchange newExchange = exchange;
Integer size = 1;
@@ -201,7 +201,7 @@
if (LOG.isTraceEnabled()) {
LOG.trace("In progress aggregated exchange: " + answer + " with correlation key:" + key);
}
- aggregationRepository.add(key, answer);
+ aggregationRepository.add(exchange.getContext(), key, answer);
}
if (complete) {
@@ -281,7 +281,7 @@
protected void onCompletion(Object key, final Exchange exchange, boolean fromTimeout) {
// remove from repository as its completed
- aggregationRepository.remove(key);
+ aggregationRepository.remove(exchange.getContext(), key);
if (!fromTimeout && timeoutMap != null) {
// cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
timeoutMap.remove(key);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java Wed Feb 24 09:00:58 2010
@@ -19,6 +19,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.AggregationRepository;
@@ -32,15 +33,15 @@
private final Map<Object, Exchange> cache = new ConcurrentHashMap<Object, Exchange>();
- public Exchange add(Object key, Exchange exchange) {
+ public Exchange add(CamelContext camelContext, Object key, Exchange exchange) {
return cache.put(key, exchange);
}
- public Exchange get(Object key) {
+ public Exchange get(CamelContext camelContext, Object key) {
return cache.get(key);
}
- public void remove(Object key) {
+ public void remove(CamelContext camelContext, Object key) {
cache.remove(key);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java Wed Feb 24 09:00:58 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.spi;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
/**
@@ -30,25 +31,28 @@
* <p/>
* Will replace any existing exchange.
*
+ * @param camelContext the current CamelContext
* @param key the correlation key
* @param exchange the aggregated exchange
* @return the old exchange if any existed
*/
- Exchange add(K key, Exchange exchange);
+ Exchange add(CamelContext camelContext, K key, Exchange exchange);
/**
* Gets the given exchange with the correlation key
*
+ * @param camelContext the current CamelContext
* @param key the correlation key
* @return the exchange, or <tt>null</tt> if no exchange was previously added
*/
- Exchange get(K key);
+ Exchange get(CamelContext camelContext, K key);
/**
* Removes the exchange with the given correlation key
*
+ * @param camelContext the current CamelContext
* @param key the correlation key
*/
- void remove(K key);
+ void remove(CamelContext camelContext, K key);
}
Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Wed Feb 24 09:00:58 2010
@@ -41,7 +41,7 @@
private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>();
private Marshaller<DefaultExchangeHolder> exchangeMarshaller = new ObjectMarshaller<DefaultExchangeHolder>();
- public Exchange add(K key, Exchange exchange) {
+ public Exchange add(CamelContext camelContext, K key, Exchange exchange) {
try {
// If we could guarantee that the key and exchange are immutable,
// then we could have stuck them directly into the index,
@@ -49,7 +49,7 @@
// in some cases. But since we can.. we are going to force
// early marshaling.
final Buffer keyBuffer = marshallKey(key);
- final Buffer exchangeBuffer = marshallExchange(exchange);
+ final Buffer exchangeBuffer = marshallExchange(camelContext, exchange);
Buffer rc = file.execute(new Work<Buffer>() {
public Buffer execute(Transaction tx) {
Index<Buffer, Buffer> index = file.getRepositoryIndex(tx, name);
@@ -60,14 +60,14 @@
return null;
}
// TODO: We can improve performance by not returning the old when adding
- return unmarshallExchange(rc);
+ return unmarshallExchange(camelContext, rc);
} catch (IOException e) {
throw new RuntimeException("Error adding to repository " + name + " with key " + key, e);
}
}
- public Exchange get(K key) {
+ public Exchange get(CamelContext camelContext, K key) {
try {
final Buffer keyBuffer = marshallKey(key);
Buffer rc = file.execute(new Work<Buffer>() {
@@ -79,13 +79,13 @@
if (rc == null) {
return null;
}
- return unmarshallExchange(rc);
+ return unmarshallExchange(camelContext, rc);
} catch (IOException e) {
throw new RuntimeException("Error getting key " + key + " from repository " + name, e);
}
}
- public void remove(K key) {
+ public void remove(CamelContext camelContext, K key) {
try {
final Buffer keyBuffer = marshallKey(key);
file.execute(new Work<Buffer>() {
@@ -105,23 +105,20 @@
return baos.toBuffer();
}
- protected Buffer marshallExchange(Exchange exchange) throws IOException {
+ protected Buffer marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException {
DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
// use DefaultExchangeHolder to marshal to a serialized object
DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
- // TODO: store aggregation size
+ // add the aggregated size property as the only property we want to retain
+ DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
exchangeMarshaller.writePayload(pe, baos);
return baos.toBuffer();
}
- protected Exchange unmarshallExchange(Buffer buffer) throws IOException {
+ protected Exchange unmarshallExchange(CamelContext camelContext, Buffer buffer) throws IOException {
DataByteArrayInputStream bais = new DataByteArrayInputStream(buffer);
-
DefaultExchangeHolder pe = exchangeMarshaller.readPayload(bais);
-
- // create a new dummy default exchange which the aggregator must
- // set the CamelContext
- Exchange answer = new DefaultExchange((CamelContext) null);
+ Exchange answer = new DefaultExchange(camelContext);
DefaultExchangeHolder.unmarshal(answer, pe);
return answer;
}
Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java Wed Feb 24 09:00:58 2010
@@ -30,10 +30,10 @@
import org.fusesource.hawtdb.util.marshaller.VariableBufferMarshaller;
/**
- * Manages access to a shared HawtDB file.
+ * Manages access to a shared <a href="http://hawtdb.fusesource.org/">HawtDB</a> file.
* <p/>
* Will by default not sync writes which allows it to be faster.
- * You can force syncing by setting sync=true.
+ * You can force syncing by setting the sync option to <tt>true</tt>.
*/
public class HawtDBFile extends HawtPageFileFactory implements Service {
Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html Wed Feb 24 09:00:58 2010
@@ -18,7 +18,7 @@
<head>
</head>
<body>
-Camel HawtDB support
+Camel <a href="http://hawtdb.fusesource.org/">HawtDB</a> support
</body>
</html>
Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java (from r915717, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java&r1=915717&r2=915726&rev=915726&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java Wed Feb 24 09:00:58 2010
@@ -19,22 +19,25 @@
import java.io.File;
import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
-public class HawtDBAggregationRepositoryTest extends CamelTestSupport {
+public class HawtDBAggregateTest extends CamelTestSupport {
private HawtDBFile hawtDBFile;
@Override
public void setUp() throws Exception {
- super.setUp();
deleteDirectory("target/data");
File file = new File("target/data/hawtdb.dat");
hawtDBFile = new HawtDBFile();
hawtDBFile.setFile(file);
hawtDBFile.start();
+
+ super.setUp();
}
@Override
@@ -44,50 +47,47 @@
}
@Test
- public void testOperations() {
- HawtDBAggregationRepository<String> repo = new HawtDBAggregationRepository<String>();
- repo.setFile(hawtDBFile);
- repo.setName("repo1");
-
- // Can't get something we have not put in...
- Exchange actual = repo.get("missing");
- assertEquals(null, actual);
-
- // Store it..
- Exchange exchange1 = new DefaultExchange(context);
- exchange1.getIn().setBody("counter:1");
- actual = repo.add("foo", exchange1);
- assertEquals(null, actual);
-
- // Get it back..
- actual = repo.get("foo");
- assertEquals("counter:1", actual.getIn().getBody());
-
- // Change it..
- Exchange exchange2 = new DefaultExchange(context);
- exchange2.getIn().setBody("counter:2");
- actual = repo.add("foo", exchange2);
- // the old one
- assertEquals("counter:1", actual.getIn().getBody());
-
- // Get it back..
- actual = repo.get("foo");
- assertEquals("counter:2", actual.getIn().getBody());
-
- // now remove it
- repo.remove("foo");
- actual = repo.get("foo");
- assertEquals(null, actual);
-
- // add it again
- exchange1 = new DefaultExchange(context);
- exchange1.getIn().setBody("counter:3");
- actual = repo.add("foo", exchange1);
- assertEquals(null, actual);
-
- // Get it back..
- actual = repo.get("foo");
- assertEquals("counter:3", actual.getIn().getBody());
+ public void testHawtDBAggregate() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedBodiesReceived("ABCDE");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+ assertMockEndpointsSatisfied();
}
-}
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ HawtDBAggregationRepository<String> repo = new HawtDBAggregationRepository<String>();
+ repo.setFile(hawtDBFile);
+ repo.setName("repo1");
+
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionSize(5).aggregationRepository(repo)
+ .to("mock:aggregated");
+ }
+ };
+ }
+
+ private class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryAlotDataTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryAlotDataTest.java?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryAlotDataTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryAlotDataTest.java Wed Feb 24 09:00:58 2010
@@ -52,11 +52,11 @@
for (int i = 0; i < 100; i++) {
Exchange exchange1 = new DefaultExchange(context);
exchange1.getIn().setBody("counter:" + i);
- repo.add("foo", exchange1);
+ repo.add(context, "foo", exchange1);
}
// Get it back..
- Exchange actual = repo.get("foo");
+ Exchange actual = repo.get(context, "foo");
assertEquals("counter:99", actual.getIn().getBody());
}
@@ -70,14 +70,14 @@
Exchange exchange1 = new DefaultExchange(context);
exchange1.getIn().setBody("counter:" + i);
String key = i % 2 == 0 ? "foo" : "bar";
- repo.add(key, exchange1);
+ repo.add(context, key, exchange1);
}
// Get it back..
- Exchange actual = repo.get("foo");
+ Exchange actual = repo.get(context, "foo");
assertEquals("counter:98", actual.getIn().getBody());
- actual = repo.get("bar");
+ actual = repo.get(context, "bar");
assertEquals("counter:99", actual.getIn().getBody());
}
@@ -91,12 +91,12 @@
Exchange exchange1 = new DefaultExchange(context);
exchange1.getIn().setBody("counter:" + i);
String key = "key" + i;
- repo.add(key, exchange1);
+ repo.add(context, key, exchange1);
}
// Get it back..
for (int i = 0; i < 100; i++) {
- Exchange actual = repo.get("key" + i);
+ Exchange actual = repo.get(context, "key" + i);
assertEquals("counter:" + i, actual.getIn().getBody());
}
}
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java Wed Feb 24 09:00:58 2010
@@ -52,7 +52,7 @@
// Store it..
Exchange exchange1 = new DefaultExchange(context);
exchange1.getIn().setBody("counter:1");
- Exchange actual = repo.add("foo", exchange1);
+ Exchange actual = repo.add(context, "foo", exchange1);
assertEquals(null, actual);
// stop the repo
@@ -64,18 +64,18 @@
hawtDBFile.start();
// Get it back..
- actual = repo.get("foo");
+ actual = repo.get(context, "foo");
assertEquals("counter:1", actual.getIn().getBody());
// Change it..
Exchange exchange2 = new DefaultExchange(context);
exchange2.getIn().setBody("counter:2");
- actual = repo.add("foo", exchange2);
+ actual = repo.add(context, "foo", exchange2);
// the old one
assertEquals("counter:1", actual.getIn().getBody());
// Get it back..
- actual = repo.get("foo");
+ actual = repo.get(context, "foo");
assertEquals("counter:2", actual.getIn().getBody());
}
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryMultipleRepoTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryMultipleRepoTest.java?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryMultipleRepoTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryMultipleRepoTest.java Wed Feb 24 09:00:58 2010
@@ -54,45 +54,45 @@
repo2.setName("repo2");
// Can't get something we have not put in...
- Exchange actual = repo1.get("missing");
+ Exchange actual = repo1.get(context, "missing");
assertEquals(null, actual);
- actual = repo2.get("missing");
+ actual = repo2.get(context, "missing");
assertEquals(null, actual);
// Store it..
Exchange exchange1 = new DefaultExchange(context);
exchange1.getIn().setBody("counter:1");
- actual = repo1.add("foo", exchange1);
+ actual = repo1.add(context, "foo", exchange1);
assertEquals(null, actual);
// Get it back..
- actual = repo1.get("foo");
+ actual = repo1.get(context, "foo");
assertEquals("counter:1", actual.getIn().getBody());
- assertEquals(null, repo2.get("foo"));
+ assertEquals(null, repo2.get(context, "foo"));
// Change it..
Exchange exchange2 = new DefaultExchange(context);
exchange2.getIn().setBody("counter:2");
- actual = repo1.add("foo", exchange2);
+ actual = repo1.add(context, "foo", exchange2);
// the old one
assertEquals("counter:1", actual.getIn().getBody());
// add to repo2
Exchange exchange3 = new DefaultExchange(context);
exchange3.getIn().setBody("Hello World");
- actual = repo2.add("bar", exchange3);
+ actual = repo2.add(context, "bar", exchange3);
assertEquals(null, actual);
- assertEquals(null, repo1.get("bar"));
+ assertEquals(null, repo1.get(context, "bar"));
// Get it back..
- actual = repo1.get("foo");
+ actual = repo1.get(context, "foo");
assertEquals("counter:2", actual.getIn().getBody());
- assertEquals(null, repo2.get("foo"));
+ assertEquals(null, repo2.get(context, "foo"));
- actual = repo2.get("bar");
+ actual = repo2.get(context, "bar");
assertEquals("Hello World", actual.getIn().getBody());
- assertEquals(null, repo1.get("bar"));
+ assertEquals(null, repo1.get(context, "bar"));
}
@Test
@@ -107,15 +107,15 @@
Exchange exchange1 = new DefaultExchange(context);
exchange1.getIn().setBody("Hello World");
- repo1.add("foo", exchange1);
+ repo1.add(context, "foo", exchange1);
Exchange exchange2 = new DefaultExchange(context);
exchange2.getIn().setBody("Bye World");
- repo2.add("foo", exchange2);
+ repo2.add(context, "foo", exchange2);
- Exchange actual = repo1.get("foo");
+ Exchange actual = repo1.get(context, "foo");
assertEquals("Hello World", actual.getIn().getBody());
- actual = repo2.get("foo");
+ actual = repo2.get(context, "foo");
assertEquals("Bye World", actual.getIn().getBody());
}
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java Wed Feb 24 09:00:58 2010
@@ -50,43 +50,43 @@
repo.setName("repo1");
// Can't get something we have not put in...
- Exchange actual = repo.get("missing");
+ Exchange actual = repo.get(context, "missing");
assertEquals(null, actual);
// Store it..
Exchange exchange1 = new DefaultExchange(context);
exchange1.getIn().setBody("counter:1");
- actual = repo.add("foo", exchange1);
+ actual = repo.add(context, "foo", exchange1);
assertEquals(null, actual);
// Get it back..
- actual = repo.get("foo");
+ actual = repo.get(context, "foo");
assertEquals("counter:1", actual.getIn().getBody());
// Change it..
Exchange exchange2 = new DefaultExchange(context);
exchange2.getIn().setBody("counter:2");
- actual = repo.add("foo", exchange2);
+ actual = repo.add(context, "foo", exchange2);
// the old one
assertEquals("counter:1", actual.getIn().getBody());
// Get it back..
- actual = repo.get("foo");
+ actual = repo.get(context, "foo");
assertEquals("counter:2", actual.getIn().getBody());
// now remove it
- repo.remove("foo");
- actual = repo.get("foo");
+ repo.remove(context, "foo");
+ actual = repo.get(context, "foo");
assertEquals(null, actual);
// add it again
exchange1 = new DefaultExchange(context);
exchange1.getIn().setBody("counter:3");
- actual = repo.add("foo", exchange1);
+ actual = repo.add(context, "foo", exchange1);
assertEquals(null, actual);
// Get it back..
- actual = repo.get("foo");
+ actual = repo.get(context, "foo");
assertEquals("counter:3", actual.getIn().getBody());
}
Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBExchangeSerializationTest.java (from r915717, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBExchangeSerializationTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBExchangeSerializationTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java&r1=915717&r2=915726&rev=915726&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBExchangeSerializationTest.java Wed Feb 24 09:00:58 2010
@@ -17,13 +17,14 @@
package org.apache.camel.component.hawtdb;
import java.io.File;
+import java.util.Date;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
-public class HawtDBAggregationRepositoryTest extends CamelTestSupport {
+public class HawtDBExchangeSerializationTest extends CamelTestSupport {
private HawtDBFile hawtDBFile;
@@ -44,50 +45,47 @@
}
@Test
- public void testOperations() {
+ public void testExchangeSerialization() {
HawtDBAggregationRepository<String> repo = new HawtDBAggregationRepository<String>();
repo.setFile(hawtDBFile);
repo.setName("repo1");
- // Can't get something we have not put in...
- Exchange actual = repo.get("missing");
- assertEquals(null, actual);
-
- // Store it..
- Exchange exchange1 = new DefaultExchange(context);
- exchange1.getIn().setBody("counter:1");
- actual = repo.add("foo", exchange1);
- assertEquals(null, actual);
-
- // Get it back..
- actual = repo.get("foo");
- assertEquals("counter:1", actual.getIn().getBody());
-
- // Change it..
- Exchange exchange2 = new DefaultExchange(context);
- exchange2.getIn().setBody("counter:2");
- actual = repo.add("foo", exchange2);
- // the old one
- assertEquals("counter:1", actual.getIn().getBody());
-
- // Get it back..
- actual = repo.get("foo");
- assertEquals("counter:2", actual.getIn().getBody());
-
- // now remove it
- repo.remove("foo");
- actual = repo.get("foo");
- assertEquals(null, actual);
-
- // add it again
- exchange1 = new DefaultExchange(context);
- exchange1.getIn().setBody("counter:3");
- actual = repo.add("foo", exchange1);
- assertEquals(null, actual);
-
- // Get it back..
- actual = repo.get("foo");
- assertEquals("counter:3", actual.getIn().getBody());
+ Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setBody("Hello World");
+ exchange.getIn().setHeader("name", "Claus");
+ exchange.getIn().setHeader("number", 123);
+ exchange.setProperty("quote", "Camel rocks");
+
+ Date now = new Date();
+ exchange.getIn().setHeader("date", now);
+
+ repo.add(context, "foo", exchange);
+
+ Exchange actual = repo.get(context, "foo");
+ assertEquals("Hello World", actual.getIn().getBody());
+ assertEquals("Claus", actual.getIn().getHeader("name"));
+ assertEquals(123, actual.getIn().getHeader("number"));
+ Date date = actual.getIn().getHeader("date", Date.class);
+ assertNotNull(date);
+ assertEquals(now.getTime(), date.getTime());
+ // we do not serialize properties to avoid storing all kind of not needed information
+ assertNull(actual.getProperty("quote"));
+ assertSame(context, actual.getContext());
+
+ // change something
+ exchange.getIn().setBody("Bye World");
+ exchange.getIn().setHeader("name", "Hiram");
+ exchange.getIn().removeHeader("date");
+
+ repo.add(context, "foo", exchange);
+
+ actual = repo.get(context, "foo");
+ assertEquals("Bye World", actual.getIn().getBody());
+ assertEquals("Hiram", actual.getIn().getHeader("name"));
+ assertEquals(123, actual.getIn().getHeader("number"));
+ date = actual.getIn().getHeader("date", Date.class);
+ assertNull(date);
+ assertSame(context, actual.getContext());
}
-}
+}
\ No newline at end of file
Modified: camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties?rev=915726&r1=915725&r2=915726&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties Wed Feb 24 09:00:58 2010
@@ -22,6 +22,7 @@
# uncomment the following to enable camel debugging
#log4j.logger.org.apache.camel=DEBUG
+#log4j.logger.org.apache.camel.component.hawtdb=DEBUG
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender