You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/23 17:35:06 UTC

svn commit: r915418 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/impl/ components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/ components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/ components/camel-ha...

Author: davsclaus
Date: Tue Feb 23 16:35:05 2010
New Revision: 915418

URL: http://svn.apache.org/viewvc?rev=915418&view=rev
Log:
CAMEL-217: Working on a persistent aggregator.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
    camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
    camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
    camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java
    camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
    camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java Tue Feb 23 16:35:05 2010
@@ -28,7 +28,7 @@
  * Holder object for sending an exchange over a remote wire as a serialized object.
  * This is usually configured using the <tt>transferExchange=true</tt> option on the endpoint.
  * <p/>
- * As opposed to normal usage where only the body part of the exchange is transfered over the wire,
+ * As opposed to normal usage where only the body part of the exchange is transferred over the wire,
  * this holder object serializes the following fields over the wire:
  * <ul>
  * <li>in body</li>
@@ -52,9 +52,9 @@
     private Object inBody;
     private Object outBody;
     private Boolean outFaultFlag = Boolean.FALSE;
-    private final Map<String, Object> inHeaders = new LinkedHashMap<String, Object>();
-    private final Map<String, Object> outHeaders = new LinkedHashMap<String, Object>();
-    private final Map<String, Object> properties = new LinkedHashMap<String, Object>();
+    private Map<String, Object> inHeaders;
+    private Map<String, Object> outHeaders;
+    private Map<String, Object> properties;
     private Exception exception;
 
     /**
@@ -65,16 +65,30 @@
      * @return the holder object with information copied form the exchange
      */
     public static DefaultExchangeHolder marshal(Exchange exchange) {
+        return marshal(exchange, true);
+    }
+
+    /**
+     * Creates a payload object with the information from the given exchange.
+     * Only marshal the Serializable object
+     *
+     * @param exchange the exchange
+     * @param includeProperties whether or not to include exchange properties
+     * @return the holder object with information copied form the exchange
+     */
+    public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) {
         DefaultExchangeHolder payload = new DefaultExchangeHolder();
 
         payload.inBody = checkSerializableObject("in body", exchange, exchange.getIn().getBody());
-        payload.inHeaders.putAll(checkMapSerializableObjects("in headers", exchange, exchange.getIn().getHeaders()));
+        payload.safeSetInHeaders(exchange);
         if (exchange.hasOut()) {
             payload.outBody = checkSerializableObject("out body", exchange, exchange.getOut().getBody());
-            payload.outHeaders.putAll(checkMapSerializableObjects("out headers", exchange, exchange.getOut().getHeaders()));
             payload.outFaultFlag = exchange.getOut().isFault();
+            payload.safeSetOutHeaders(exchange);
+        }
+        if (includeProperties) {
+            payload.safeSetProperties(exchange);
         }
-        payload.properties.putAll(checkMapSerializableObjects("exchange properties", exchange, exchange.getProperties()));
         payload.exception = exchange.getException();
 
         return payload;
@@ -88,14 +102,20 @@
      */
     public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) {
         exchange.getIn().setBody(payload.inBody);
-        exchange.getIn().setHeaders(payload.inHeaders);
+        if (payload.inHeaders != null) {
+            exchange.getIn().setHeaders(payload.inHeaders);
+        }
         if (payload.outBody != null) {
             exchange.getOut().setBody(payload.outBody);
-            exchange.getOut().setHeaders(payload.outHeaders);
+            if (payload.outHeaders != null) {
+                exchange.getOut().setHeaders(payload.outHeaders);
+            }
             exchange.getOut().setFault(payload.outFaultFlag.booleanValue());
         }
-        for (String key : payload.properties.keySet()) {
-            exchange.setProperty(key, payload.properties.get(key));
+        if (payload.properties != null) {
+            for (String key : payload.properties.keySet()) {
+                exchange.setProperty(key, payload.properties.get(key));
+            }
         }
         exchange.setException(payload.exception);
     }
@@ -108,6 +128,36 @@
         return sb.append(']').toString();
     }
 
+    private Map<String, Object> safeSetInHeaders(Exchange exchange) {
+        if (exchange.getIn().hasHeaders()) {
+            Map<String, Object> map = checkMapSerializableObjects("in headers", exchange, exchange.getIn().getHeaders());
+            if (map != null && !map.isEmpty()) {
+                inHeaders = new LinkedHashMap<String, Object>(map);
+            }
+        }
+        return null;
+    }
+
+    private Map<String, Object> safeSetOutHeaders(Exchange exchange) {
+        if (exchange.hasOut() && exchange.getOut().hasHeaders()) {
+            Map<String, Object> map = checkMapSerializableObjects("out headers", exchange, exchange.getOut().getHeaders());
+            if (map != null && !map.isEmpty()) {
+                outHeaders = new LinkedHashMap<String, Object>(map);
+            }
+        }
+        return null;
+    }
+
+    private Map<String, Object> safeSetProperties(Exchange exchange) {
+        if (exchange.hasProperties()) {
+            Map<String, Object> map = checkMapSerializableObjects("properties", exchange, exchange.getProperties());
+            if (map != null && !map.isEmpty()) {
+                properties = new LinkedHashMap<String, Object>(map);
+            }
+        }
+        return null;
+    }
+
     private static Object checkSerializableObject(String type, Exchange exchange, Object object) {
         if (object == null) {
             return null;

Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Tue Feb 23 16:35:05 2010
@@ -18,7 +18,10 @@
 
 import java.io.IOException;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
 import org.apache.camel.spi.AggregationRepository;
 import org.fusesource.hawtdb.api.Index;
 import org.fusesource.hawtdb.api.Transaction;
@@ -30,15 +33,13 @@
 
 /**
  * An instance of AggregationRepository which is backed by a HawtDB.
- * 
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class HawtDBAggregationRepository<K> implements AggregationRepository<K> {
-    
+
     private HawtDBFile file;
     private String name;
-    private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>(); 
-    private Marshaller<Exchange> exchangeMarshaller = new ObjectMarshaller<Exchange>(); 
+    private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>();
+    private Marshaller<DefaultExchangeHolder> exchangeMarshaller = new ObjectMarshaller<DefaultExchangeHolder>();
 
     public Exchange add(K key, Exchange exchange) {
         try {
@@ -55,7 +56,7 @@
                     return index.put(keyBuffer, exchangeBuffer);
                 }
             });
-            if( rc ==null ) {
+            if (rc == null) {
                 return null;
             }
             return unmarshallExchange(rc);
@@ -74,7 +75,7 @@
                     return index.get(keyBuffer);
                 }
             });
-            if( rc==null ) {
+            if (rc == null) {
                 return null;
             }
             return unmarshallExchange(rc);
@@ -102,57 +103,40 @@
         keyMarshaller.writePayload(key, baos);
         return baos.toBuffer();
     }
-    
+
     protected Buffer marshallExchange(Exchange exchange) throws IOException {
         DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
-        exchangeMarshaller.writePayload(exchange, baos);
+        DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
+        exchangeMarshaller.writePayload(pe, baos);
         return baos.toBuffer();
     }
-    
+
     protected Exchange unmarshallExchange(Buffer buffer) throws IOException {
         DataByteArrayInputStream bais = new DataByteArrayInputStream(buffer);
-        return exchangeMarshaller.readPayload(bais);
-    }
 
+        DefaultExchangeHolder pe = exchangeMarshaller.readPayload(bais);
+
+        // create a new dummy default exchange which the aggregator must
+        // set the CamelContext
+        Exchange answer = new DefaultExchange((CamelContext) null);
+        DefaultExchangeHolder.unmarshal(answer, pe);
+        return answer;
+    }
 
     public HawtDBFile getFile() {
         return file;
     }
 
-
     public void setFile(HawtDBFile file) {
         this.file = file;
     }
 
-
     public String getName() {
         return name;
     }
 
-
     public void setName(String name) {
         this.name = name;
     }
 
-
-    public Marshaller<K> getKeyMarshaller() {
-        return keyMarshaller;
-    }
-
-
-    public void setKeyMarshaller(Marshaller<K> keyMarshaller) {
-        this.keyMarshaller = keyMarshaller;
-    }
-
-
-    public Marshaller<Exchange> getExchangeMarshaller() {
-        return exchangeMarshaller;
-    }
-
-
-    public void setExchangeMarshaller(Marshaller<Exchange> exchangeMarshaller) {
-        this.exchangeMarshaller = exchangeMarshaller;
-    }
-
-    
 }

Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java Tue Feb 23 16:35:05 2010
@@ -17,6 +17,8 @@
 package org.apache.camel.component.hawtdb;
 
 import org.apache.camel.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.fusesource.hawtdb.api.BTreeIndexFactory;
 import org.fusesource.hawtdb.api.Index;
 import org.fusesource.hawtdb.api.Transaction;
@@ -29,18 +31,18 @@
 
 /**
  * Manages access to a shared HawtDB file from multiple HawtDBAggregationRepository objects.
- * 
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class HawtDBFile extends HawtPageFileFactory implements Service {
 
-    private final static BTreeIndexFactory<String,Integer> indexesFactory = new BTreeIndexFactory<String,Integer>();
-    private final static BTreeIndexFactory<Buffer,Buffer> indexFactory = new BTreeIndexFactory<Buffer,Buffer>();
-    
+    private static final transient Log LOG = LogFactory.getLog(HawtDBFile.class);
+
+    private final static BTreeIndexFactory<String, Integer> indexesFactory = new BTreeIndexFactory<String, Integer>();
+    private final static BTreeIndexFactory<Buffer, Buffer> indexFactory = new BTreeIndexFactory<Buffer, Buffer>();
+
     public HawtDBFile() {
         setSync(false);
     }
-    
+
     static {
         indexesFactory.setKeyMarshaller(StringMarshaller.INSTANCE);
         indexesFactory.setValueMarshaller(IntegerMarshaller.INSTANCE);
@@ -51,23 +53,28 @@
     }
 
     private HawtPageFile pageFile;
-    
+
     public void start() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Starting HawtDB using file: " + getFile());
+        }
+
         final boolean initialize = !file.exists();
         open();
         pageFile = getConcurrentPageFile();
-        
+
         execute(new Work<Boolean>() {
             public Boolean execute(Transaction tx) {
-                if( initialize ) {
+                if (initialize) {
                     int page = tx.allocator().alloc(1);
                     // if we just created the file, first allocated page should be 0
                     assert page == 0;
                     indexesFactory.create(tx, 0);
-                    System.out.println("Aggregation repository data store created.");
+                    LOG.info("Aggregation repository data store created using file: " + getFile());
                 } else {
                     Index<String, Integer> indexes = indexesFactory.open(tx, 0);
-                    System.out.println("You have "+indexes.size()+" aggregation repositories stored.");
+                    LOG.info("Aggregation repository data store loaded using file: " + getFile()
+                            + " containing " + indexes.size() + " repositories.");
                 }
                 return true;
             }
@@ -75,17 +82,21 @@
     }
 
     public void stop() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Stopping HawtDB using file: " + getFile());
+        }
+
         close();
         pageFile = null;
     }
-    
+
     public <T> T execute(Work<T> work) {
         Transaction tx = pageFile.tx();
         try {
             T rc = work.execute(tx);
             tx.commit();
             return rc;
-        } catch (RuntimeException e){
+        } catch (RuntimeException e) {
             tx.rollback();
             throw e;
         }
@@ -94,12 +105,18 @@
     public Index<Buffer, Buffer> getRepositoryIndex(Transaction tx, String name) {
         Index<String, Integer> indexes = indexesFactory.open(tx, 0);
         Integer location = indexes.get(name);
-        if( location == null ) {
+        if (location == null) {
             // create it..
-            return indexFactory.create(tx, tx.allocator().alloc(1));
-        } else  {
+            int page = tx.allocator().alloc(1);
+            Index<Buffer, Buffer> created = indexFactory.create(tx, page);
+
+            // add it to indexes so we can find it the next time
+            indexes.put(name, page);
+
+            return created;
+        } else {
             return indexFactory.open(tx, location);
         }
     }
-    
+
 }

Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/Work.java Tue Feb 23 16:35:05 2010
@@ -21,9 +21,15 @@
 /**
  * Demarcates the statements that need to be performed as a 
  * HawtDB transactional unit of work.
- * 
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 interface Work<T> {
+
+    /**
+     * Executs the work within the bounds of the given transaction
+     *
+     * @param transaction the transaction
+     * @return result of the work, can be <tt>null</tt> if no result to return.
+     */
     T execute(Transaction transaction);
+
 }
\ No newline at end of file

Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/package.html Tue Feb 23 16:35:05 2010
@@ -18,7 +18,7 @@
 <head>
 </head>
 <body>
-
+Camel HawtDB support
 
 </body>
 </html>

Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java Tue Feb 23 16:35:05 2010
@@ -19,37 +19,39 @@
 import java.io.File;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeTestSupport;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
 
 /**
  * Tests the HawtDBAggregationRepository implementation.
- * 
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class HawtDBAggregationRepositoryTest extends ExchangeTestSupport {
-    
+public class HawtDBAggregationRepositoryTest extends CamelTestSupport {
+
     private HawtDBFile hawtDBFile;
 
     @Override
-    protected void setUp() throws Exception {
-        File file = new File("target/test-data/"+getClass().getName()+"-"+getName());
+    public void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("target/data");
+        File file = new File("target/data/hawtdb.dat");
         hawtDBFile = new HawtDBFile();
         hawtDBFile.setFile(file);
         hawtDBFile.start();
     }
-    
+
     @Override
-    protected void tearDown() throws Exception {
+    public void tearDown() throws Exception {
         hawtDBFile.stop();
+        super.tearDown();
     }
-    
+
+    @Test
     public void testOperations() {
-        
         HawtDBAggregationRepository<String> repo = new HawtDBAggregationRepository<String>();
         repo.setFile(hawtDBFile);
         repo.setName("repo1");
-        
+
         // Can't get something we have not put in...
         Exchange actual = repo.get("missing");
         assertEquals(null, actual);
@@ -59,20 +61,21 @@
         exchange1.getIn().setBody("counter:1");
         actual = repo.add("foo", exchange1);
         assertEquals(null, actual);
-        
+
         // Get it back..
         actual = repo.get("foo");
-        assertEquals(exchange1, actual);
-              
+        assertEquals("counter:1", actual.getIn().getBody());
+
         // Change it..
         Exchange exchange2 = new DefaultExchange(context);
         exchange2.getIn().setBody("counter:2");
         actual = repo.add("foo", exchange2);
-        assertEquals(exchange1, actual);
-        
+        // the old one
+        assertEquals("counter:1", actual.getIn().getBody());
+
         // Get it back..
         actual = repo.get("foo");
-        assertEquals(exchange2, actual);
+        assertEquals("counter:2", actual.getIn().getBody());
     }
 
 }

Modified: camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties?rev=915418&r1=915417&r2=915418&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-hawtdb/src/test/resources/log4j.properties Tue Feb 23 16:35:05 2010
@@ -33,4 +33,4 @@
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.file.file=target/camel-ognl-test.log
\ No newline at end of file
+log4j.appender.file.file=target/camel-hawtdb-test.log
\ No newline at end of file