You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/23 17:35:06 UTC
svn commit: r915418 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/impl/
components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/
components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/
components/camel-ha...
Author: davsclaus
Date: Tue Feb 23 16:35:05 2010
New Revision: 915418
URL: http://svn.apache.org/viewvc?rev=915418&view=rev
Log:
CAMEL-217: Working on a persistent aggregator.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java Tue Feb 23 16:35:05 2010
@@ -28,7 +28,7 @@
* Holder object for sending an exchange over a remote wire as a serialized object.
* This is usually configured using the <tt>transferExchange=true</tt> option on the endpoint.
* <p/>
- * As opposed to normal usage where only the body part of the exchange is transfered over the wire,
+ * As opposed to normal usage where only the body part of the exchange is transferred over the wire,
* this holder object serializes the following fields over the wire:
* <ul>
* <li>in body</li>
@@ -52,9 +52,9 @@
private Object inBody;
private Object outBody;
private Boolean outFaultFlag = Boolean.FALSE;
- private final Map<String, Object> inHeaders = new LinkedHashMap<String, Object>();
- private final Map<String, Object> outHeaders = new LinkedHashMap<String, Object>();
- private final Map<String, Object> properties = new LinkedHashMap<String, Object>();
+ private Map<String, Object> inHeaders;
+ private Map<String, Object> outHeaders;
+ private Map<String, Object> properties;
private Exception exception;
/**
@@ -65,16 +65,30 @@
* @return the holder object with information copied form the exchange
*/
public static DefaultExchangeHolder marshal(Exchange exchange) {
+ return marshal(exchange, true);
+ }
+
+ /**
+ * Creates a payload object with the information from the given exchange.
+ * Only marshal the Serializable object
+ *
+ * @param exchange the exchange
+ * @param includeProperties whether or not to include exchange properties
+ * @return the holder object with information copied form the exchange
+ */
+ public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) {
DefaultExchangeHolder payload = new DefaultExchangeHolder();
payload.inBody = checkSerializableObject("in body", exchange, exchange.getIn().getBody());
- payload.inHeaders.putAll(checkMapSerializableObjects("in headers", exchange, exchange.getIn().getHeaders()));
+ payload.safeSetInHeaders(exchange);
if (exchange.hasOut()) {
payload.outBody = checkSerializableObject("out body", exchange, exchange.getOut().getBody());
- payload.outHeaders.putAll(checkMapSerializableObjects("out headers", exchange, exchange.getOut().getHeaders()));
payload.outFaultFlag = exchange.getOut().isFault();
+ payload.safeSetOutHeaders(exchange);
+ }
+ if (includeProperties) {
+ payload.safeSetProperties(exchange);
}
- payload.properties.putAll(checkMapSerializableObjects("exchange properties", exchange, exchange.getProperties()));
payload.exception = exchange.getException();
return payload;
@@ -88,14 +102,20 @@
*/
public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) {
exchange.getIn().setBody(payload.inBody);
- exchange.getIn().setHeaders(payload.inHeaders);
+ if (payload.inHeaders != null) {
+ exchange.getIn().setHeaders(payload.inHeaders);
+ }
if (payload.outBody != null) {
exchange.getOut().setBody(payload.outBody);
- exchange.getOut().setHeaders(payload.outHeaders);
+ if (payload.outHeaders != null) {
+ exchange.getOut().setHeaders(payload.outHeaders);
+ }
exchange.getOut().setFault(payload.outFaultFlag.booleanValue());
}
- for (String key : payload.properties.keySet()) {
- exchange.setProperty(key, payload.properties.get(key));
+ if (payload.properties != null) {
+ for (String key : payload.properties.keySet()) {
+ exchange.setProperty(key, payload.properties.get(key));
+ }
}
exchange.setException(payload.exception);
}
@@ -108,6 +128,36 @@
return sb.append(']').toString();
}
+ private Map<String, Object> safeSetInHeaders(Exchange exchange) {
+ if (exchange.getIn().hasHeaders()) {
+ Map<String, Object> map = checkMapSerializableObjects("in headers", exchange, exchange.getIn().getHeaders());
+ if (map != null && !map.isEmpty()) {
+ inHeaders = new LinkedHashMap<String, Object>(map);
+ }
+ }
+ return null;
+ }
+
+ private Map<String, Object> safeSetOutHeaders(Exchange exchange) {
+ if (exchange.hasOut() && exchange.getOut().hasHeaders()) {
+ Map<String, Object> map = checkMapSerializableObjects("out headers", exchange, exchange.getOut().getHeaders());
+ if (map != null && !map.isEmpty()) {
+ outHeaders = new LinkedHashMap<String, Object>(map);
+ }
+ }
+ return null;
+ }
+
+ private Map<String, Object> safeSetProperties(Exchange exchange) {
+ if (exchange.hasProperties()) {
+ Map<String, Object> map = checkMapSerializableObjects("properties", exchange, exchange.getProperties());
+ if (map != null && !map.isEmpty()) {
+ properties = new LinkedHashMap<String, Object>(map);
+ }
+ }
+ return null;
+ }
+
private static Object checkSerializableObject(String type, Exchange exchange, Object object) {
if (object == null) {
return null;
Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Tue Feb 23 16:35:05 2010
@@ -18,7 +18,10 @@
import java.io.IOException;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
import org.apache.camel.spi.AggregationRepository;
import org.fusesource.hawtdb.api.Index;
import org.fusesource.hawtdb.api.Transaction;
@@ -30,15 +33,13 @@
/**
* An instance of AggregationRepository which is backed by a HawtDB.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class HawtDBAggregationRepository<K> implements AggregationRepository<K> {
-
+
private HawtDBFile file;
private String name;
- private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>();
- private Marshaller<Exchange> exchangeMarshaller = new ObjectMarshaller<Exchange>();
+ private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>();
+ private Marshaller<DefaultExchangeHolder> exchangeMarshaller = new ObjectMarshaller<DefaultExchangeHolder>();
public Exchange add(K key, Exchange exchange) {
try {
@@ -55,7 +56,7 @@
return index.put(keyBuffer, exchangeBuffer);
}
});
- if( rc ==null ) {
+ if (rc == null) {
return null;
}
return unmarshallExchange(rc);
@@ -74,7 +75,7 @@
return index.get(keyBuffer);
}
});
- if( rc==null ) {
+ if (rc == null) {
return null;
}
return unmarshallExchange(rc);
@@ -102,57 +103,40 @@
keyMarshaller.writePayload(key, baos);
return baos.toBuffer();
}
-
+
protected Buffer marshallExchange(Exchange exchange) throws IOException {
DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
- exchangeMarshaller.writePayload(exchange, baos);
+ DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
+ exchangeMarshaller.writePayload(pe, baos);
return baos.toBuffer();
}
-
+
protected Exchange unmarshallExchange(Buffer buffer) throws IOException {
DataByteArrayInputStream bais = new DataByteArrayInputStream(buffer);
- return exchangeMarshaller.readPayload(bais);
- }
+ DefaultExchangeHolder pe = exchangeMarshaller.readPayload(bais);
+
+ // create a new dummy default exchange which the aggregator must
+ // set the CamelContext
+ Exchange answer = new DefaultExchange((CamelContext) null);
+ DefaultExchangeHolder.unmarshal(answer, pe);
+ return answer;
+ }
public HawtDBFile getFile() {
return file;
}
-
public void setFile(HawtDBFile file) {
this.file = file;
}
-
public String getName() {
return name;
}
-
public void setName(String name) {
this.name = name;
}
-
- public Marshaller<K> getKeyMarshaller() {
- return keyMarshaller;
- }
-
-
- public void setKeyMarshaller(Marshaller<K> keyMarshaller) {
- this.keyMarshaller = keyMarshaller;
- }
-
-
- public Marshaller<Exchange> getExchangeMarshaller() {
- return exchangeMarshaller;
- }
-
-
- public void setExchangeMarshaller(Marshaller<Exchange> exchangeMarshaller) {
- this.exchangeMarshaller = exchangeMarshaller;
- }
-
-
}
Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java Tue Feb 23 16:35:05 2010
@@ -17,6 +17,8 @@
package org.apache.camel.component.hawtdb;
import org.apache.camel.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.fusesource.hawtdb.api.BTreeIndexFactory;
import org.fusesource.hawtdb.api.Index;
import org.fusesource.hawtdb.api.Transaction;
@@ -29,18 +31,18 @@
/**
* Manages access to a shared HawtDB file from multiple HawtDBAggregationRepository objects.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class HawtDBFile extends HawtPageFileFactory implements Service {
- private final static BTreeIndexFactory<String,Integer> indexesFactory = new BTreeIndexFactory<String,Integer>();
- private final static BTreeIndexFactory<Buffer,Buffer> indexFactory = new BTreeIndexFactory<Buffer,Buffer>();
-
+ private static final transient Log LOG = LogFactory.getLog(HawtDBFile.class);
+
+ private final static BTreeIndexFactory<String, Integer> indexesFactory = new BTreeIndexFactory<String, Integer>();
+ private final static BTreeIndexFactory<Buffer, Buffer> indexFactory = new BTreeIndexFactory<Buffer, Buffer>();
+
public HawtDBFile() {
setSync(false);
}
-
+
static {
indexesFactory.setKeyMarshaller(StringMarshaller.INSTANCE);
indexesFactory.setValueMarshaller(IntegerMarshaller.INSTANCE);
@@ -51,23 +53,28 @@
}
private HawtPageFile pageFile;
-
+
public void start() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting HawtDB using file: " + getFile());
+ }
+
final boolean initialize = !file.exists();
open();
pageFile = getConcurrentPageFile();
-
+
execute(new Work<Boolean>() {
public Boolean execute(Transaction tx) {
- if( initialize ) {
+ if (initialize) {
int page = tx.allocator().alloc(1);
// if we just created the file, first allocated page should be 0
assert page == 0;
indexesFactory.create(tx, 0);
- System.out.println("Aggregation repository data store created.");
+ LOG.info("Aggregation repository data store created using file: " + getFile());
} else {
Index<String, Integer> indexes = indexesFactory.open(tx, 0);
- System.out.println("You have "+indexes.size()+" aggregation repositories stored.");
+ LOG.info("Aggregation repository data store loaded using file: " + getFile()
+ + " containing " + indexes.size() + " repositories.");
}
return true;
}
@@ -75,17 +82,21 @@
}
public void stop() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping HawtDB using file: " + getFile());
+ }
+
close();
pageFile = null;
}
-
+
public <T> T execute(Work<T> work) {
Transaction tx = pageFile.tx();
try {
T rc = work.execute(tx);
tx.commit();
return rc;
- } catch (RuntimeException e){
+ } catch (RuntimeException e) {
tx.rollback();
throw e;
}
@@ -94,12 +105,18 @@
public Index<Buffer, Buffer> getRepositoryIndex(Transaction tx, String name) {
Index<String, Integer> indexes = indexesFactory.open(tx, 0);
Integer location = indexes.get(name);
- if( location == null ) {
+ if (location == null) {
// create it..
- return indexFactory.create(tx, tx.allocator().alloc(1));
- } else {
+ int page = tx.allocator().alloc(1);
+ Index<Buffer, Buffer> created = indexFactory.create(tx, page);
+
+ // add it to indexes so we can find it the next time
+ indexes.put(name, page);
+
+ return created;
+ } else {
return indexFactory.open(tx, location);
}
}
-
+
}
Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java Tue Feb 23 16:35:05 2010
@@ -21,9 +21,15 @@
/**
* Demarcates the statements that need to be performed as a
* HawtDB transactional unit of work.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
interface Work<T> {
+
+ /**
+ * Executs the work within the bounds of the given transaction
+ *
+ * @param transaction the transaction
+ * @return result of the work, can be <tt>null</tt> if no result to return.
+ */
T execute(Transaction transaction);
+
}
\ No newline at end of file
Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html Tue Feb 23 16:35:05 2010
@@ -18,7 +18,7 @@
<head>
</head>
<body>
-
+Camel HawtDB support
</body>
</html>
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java Tue Feb 23 16:35:05 2010
@@ -19,37 +19,39 @@
import java.io.File;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeTestSupport;
import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
/**
* Tests the HawtDBAggregationRepository implementation.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class HawtDBAggregationRepositoryTest extends ExchangeTestSupport {
-
+public class HawtDBAggregationRepositoryTest extends CamelTestSupport {
+
private HawtDBFile hawtDBFile;
@Override
- protected void setUp() throws Exception {
- File file = new File("target/test-data/"+getClass().getName()+"-"+getName());
+ public void setUp() throws Exception {
+ super.setUp();
+ deleteDirectory("target/data");
+ File file = new File("target/data/hawtdb.dat");
hawtDBFile = new HawtDBFile();
hawtDBFile.setFile(file);
hawtDBFile.start();
}
-
+
@Override
- protected void tearDown() throws Exception {
+ public void tearDown() throws Exception {
hawtDBFile.stop();
+ super.tearDown();
}
-
+
+ @Test
public void testOperations() {
-
HawtDBAggregationRepository<String> repo = new HawtDBAggregationRepository<String>();
repo.setFile(hawtDBFile);
repo.setName("repo1");
-
+
// Can't get something we have not put in...
Exchange actual = repo.get("missing");
assertEquals(null, actual);
@@ -59,20 +61,21 @@
exchange1.getIn().setBody("counter:1");
actual = repo.add("foo", exchange1);
assertEquals(null, actual);
-
+
// Get it back..
actual = repo.get("foo");
- assertEquals(exchange1, actual);
-
+ assertEquals("counter:1", actual.getIn().getBody());
+
// Change it..
Exchange exchange2 = new DefaultExchange(context);
exchange2.getIn().setBody("counter:2");
actual = repo.add("foo", exchange2);
- assertEquals(exchange1, actual);
-
+ // the old one
+ assertEquals("counter:1", actual.getIn().getBody());
+
// Get it back..
actual = repo.get("foo");
- assertEquals(exchange2, actual);
+ assertEquals("counter:2", actual.getIn().getBody());
}
}
Modified: camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties Tue Feb 23 16:35:05 2010
@@ -33,4 +33,4 @@
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.file.file=target/camel-ognl-test.log
\ No newline at end of file
+log4j.appender.file.file=target/camel-hawtdb-test.log
\ No newline at end of file