You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/24 09:34:52 UTC

svn commit: r926977 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/main/java/org/apache/camel/spi/ camel-core/s...

Author: davsclaus
Date: Wed Mar 24 08:34:52 2010
New Revision: 926977

URL: http://svn.apache.org/viewvc?rev=926977&view=rev
Log:
CAMEL-2568: Improving AggregationRepository to support transactional behavior in impls such as camel-hawtdb. Work in progress.

Added:
    camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java   (with props)
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java
      - copied, changed from r926504, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeHolderTest.java
    camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed Mar 24 08:34:52 2010
@@ -31,12 +31,13 @@ import org.apache.camel.spi.UnitOfWork;
  */
 public interface Exchange {
 
-    String ACCEPT_CONTENT_TYPE = "CamelAcceptContentType";
+    String ACCEPT_CONTENT_TYPE        = "CamelAcceptContentType";
     @Deprecated
-    String AGGREGATED_INDEX = "CamelAggregatedIndex";
-    String AGGREGATED_SIZE  = "CamelAggregatedSize";
-    String AGGREGATED_COMPLETED_BY = "CamelAggregatedCompletedBy";
-    String ASYNC_WAIT = "CamelAsyncWait";
+    String AGGREGATED_INDEX           = "CamelAggregatedIndex";
+    String AGGREGATED_SIZE            = "CamelAggregatedSize";
+    String AGGREGATED_COMPLETED_BY    = "CamelAggregatedCompletedBy";
+    String AGGREGATED_CORRELATION_KEY = "CamelAggregatedCorrelationKey";
+    String ASYNC_WAIT                 = "CamelAsyncWait";
 
     String BATCH_INDEX    = "CamelBatchIndex";
     String BATCH_SIZE     = "CamelBatchSize";
@@ -80,8 +81,8 @@ public interface Exchange {
     String HTTP_URI                = "CamelHttpUri";
     String HTTP_URL                = "CamelHttpUrl";
     String HTTP_CHUNKED            = "CamelHttpChunked";
-    String HTTP_SERVLET_REQUEST = "CamelHttpServletRequest";
-    String HTTP_SERVLET_RESPONSE = "CamelHttpServletResponse";
+    String HTTP_SERVLET_REQUEST   = "CamelHttpServletRequest";
+    String HTTP_SERVLET_RESPONSE  = "CamelHttpServletResponse";
 
     String INTERCEPTED_ENDPOINT = "CamelInterceptedEndpoint";
     String TO_ENDPOINT          = "CamelToEndpoint";
@@ -95,19 +96,19 @@ public interface Exchange {
 
     String ON_COMPLETION = "CamelOnCompletion";
 
-    String ROUTE_STOP         = "CamelRouteStop";
-    String REDELIVERED        = "CamelRedelivered";
-    String REDELIVERY_COUNTER = "CamelRedeliveryCounter";
+    String ROUTE_STOP           = "CamelRouteStop";
+    String REDELIVERED          = "CamelRedelivered";
+    String REDELIVERY_COUNTER   = "CamelRedeliveryCounter";
     String REDELIVERY_EXHAUSTED = "CamelRedeliveryExhausted";
-    String ROLLBACK_ONLY      = "CamelRollbackOnly";
-    String ROLLBACK_ONLY_LAST = "CamelRollbackOnlyLast";
+    String ROLLBACK_ONLY        = "CamelRollbackOnly";
+    String ROLLBACK_ONLY_LAST   = "CamelRollbackOnlyLast";
 
     String SOAP_ACTION = "CamelSoapAction";
     String SPLIT_INDEX = "CamelSplitIndex";
     String SPLIT_SIZE  = "CamelSplitSize";
 
-    String TRANSACTED        = "CamelTransacted";
-    String TRANSFER_ENCODING = "Transfer-Encoding";
+    String TRANSACTED            = "CamelTransacted";
+    String TRANSFER_ENCODING     = "Transfer-Encoding";
     String TRACE_EVENT           = "CamelTraceEvent";
     String TRACE_EVENT_NODE_ID   = "CamelTraceEventNodeId";
     String TRACE_EVENT_TIMESTAMP = "CamelTraceEventTimestamp";

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java Wed Mar 24 08:34:52 2010
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFac
  * As opposed to normal usage where only the body part of the exchange is transferred over the wire,
  * this holder object serializes the following fields over the wire:
  * <ul>
+ * <li>exchangeId</li>
  * <li>in body</li>
  * <li>out body</li>
  * <li>in headers</li>
@@ -46,9 +47,10 @@ import org.apache.commons.logging.LogFac
  */
 public class DefaultExchangeHolder implements Serializable {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private static final transient Log LOG = LogFactory.getLog(DefaultExchangeHolder.class);
 
+    private String exchangeId;
     private Object inBody;
     private Object outBody;
     private Boolean outFaultFlag = Boolean.FALSE;
@@ -79,6 +81,7 @@ public class DefaultExchangeHolder imple
     public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) {
         DefaultExchangeHolder payload = new DefaultExchangeHolder();
 
+        payload.exchangeId = exchange.getExchangeId();
         payload.inBody = checkSerializableObject("in body", exchange, exchange.getIn().getBody());
         payload.safeSetInHeaders(exchange);
         if (exchange.hasOut()) {
@@ -101,6 +104,7 @@ public class DefaultExchangeHolder imple
      * @param payload  the payload with the values
      */
     public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) {
+        exchange.setExchangeId(payload.exchangeId);
         exchange.getIn().setBody(payload.inBody);
         if (payload.inHeaders != null) {
             exchange.getIn().setHeaders(payload.inHeaders);
@@ -141,7 +145,7 @@ public class DefaultExchangeHolder imple
     }
 
     public String toString() {
-        StringBuilder sb = new StringBuilder("DefaultExchangeHolder[");
+        StringBuilder sb = new StringBuilder("DefaultExchangeHolder[exchangeId=").append(exchangeId);
         sb.append("inBody=").append(inBody).append(", outBody=").append(outBody);
         sb.append(", inHeaders=").append(inHeaders).append(", outHeaders=").append(outHeaders);
         sb.append(", properties=").append(properties).append(", exception=").append(exception);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Wed Mar 24 08:34:52 2010
@@ -69,6 +69,8 @@ public abstract class ServiceSupport imp
                         starting.set(false);
                         stopping.set(false);
                         stopped.set(false);
+                        shutdown.set(false);
+                        shuttingdown.set(false);
                     }
                 }
             }
@@ -94,6 +96,8 @@ public abstract class ServiceSupport imp
                 stopping.set(false);
                 starting.set(false);
                 started.set(false);
+                shutdown.set(false);
+                shuttingdown.set(false);
             }
         }
     }
@@ -118,6 +122,7 @@ public abstract class ServiceSupport imp
                     }
                 }
             } finally {
+                // shutdown is also stopped so only set shutdown flags
                 shutdown.set(true);
                 shuttingdown.set(false);
             }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Mar 24 08:34:52 2010
@@ -289,9 +289,11 @@ public class AggregateProcessor extends 
         return aggregationStrategy.aggregate(oldExchange, newExchange);
     }
 
-    protected void onCompletion(Object key, final Exchange exchange, boolean fromTimeout) {
+    protected void onCompletion(final Object key, final Exchange exchange, boolean fromTimeout) {
+        // store the correlation key as property
+        exchange.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
         // remove from repository as its completed
-        aggregationRepository.remove(exchange.getContext(), key);
+        aggregationRepository.remove(exchange.getContext(), key, exchange);
         if (!fromTimeout && timeoutMap != null) {
             // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
             timeoutMap.remove(key);
@@ -302,6 +304,10 @@ public class AggregateProcessor extends 
             closedCorrelationKeys.put(key, key);
         }
 
+        onSubmitCompletion(key, exchange);
+    }
+
+    private void onSubmitCompletion(final Object key, final Exchange exchange) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Aggregation complete for correlation key " + key + " sending aggregated exchange: " + exchange);
         }
@@ -316,6 +322,8 @@ public class AggregateProcessor extends 
                 } catch (Throwable t) {
                     // must catch throwable so we will handle all exceptions as the executor service will by default ignore them
                     exchange.setException(new CamelExchangeException("Error processing aggregated exchange", exchange, t));
+                } finally {
+                    aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId());
                 }
 
                 // if there was an exception then let the exception handler handle it

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java Wed Mar 24 08:34:52 2010
@@ -41,10 +41,14 @@ public class MemoryAggregationRepository
         return cache.get(key);
     }
 
-    public void remove(CamelContext camelContext, Object key) {
+    public void remove(CamelContext camelContext, Object key, Exchange exchange) {
         cache.remove(key);
     }
 
+    public void confirm(CamelContext camelContext, String exchangeId) {
+        // noop
+    }
+
     @Override
     protected void doStart() throws Exception {
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java Wed Mar 24 08:34:52 2010
@@ -31,9 +31,9 @@ public interface AggregationRepository<K
      * <p/>
      * Will replace any existing exchange.
      *
-     * @param camelContext the current CamelContext
-     * @param key  the correlation key
-     * @param exchange the aggregated exchange
+     * @param camelContext   the current CamelContext
+     * @param key            the correlation key
+     * @param exchange       the aggregated exchange
      * @return the old exchange if any existed
      */
     Exchange add(CamelContext camelContext, K key, Exchange exchange);
@@ -41,18 +41,28 @@ public interface AggregationRepository<K
     /**
      * Gets the given exchange with the correlation key
      *
-     * @param camelContext the current CamelContext
-     * @param key the correlation key
+     * @param camelContext   the current CamelContext
+     * @param key            the correlation key
      * @return the exchange, or <tt>null</tt> if no exchange was previously added
      */
     Exchange get(CamelContext camelContext, K key);
 
     /**
-     * Removes the exchange with the given correlation key
+     * Removes the exchange with the given correlation key, which should happen
+     * when an {@link Exchange} is completed
      *
-     * @param camelContext the current CamelContext
-     * @param key the correlation key
+     * @param camelContext   the current CamelContext
+     * @param key            the correlation key
+     * @param exchange       the exchange to remove
      */
-    void remove(CamelContext camelContext, K key);
+    void remove(CamelContext camelContext, K key, Exchange exchange);
+
+    /**
+     * Confirms the completion of the {@link Exchange}.
+     *
+     * @param camelContext  the current CamelContext
+     * @param exchangeId    exchange id to confirm
+     */
+    void confirm(CamelContext camelContext, String exchangeId);
 
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeHolderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeHolderTest.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeHolderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeHolderTest.java Wed Mar 24 08:34:52 2010
@@ -24,6 +24,8 @@ import org.apache.camel.Exchange;
  */
 public class DefaultExchangeHolderTest extends ContextTestSupport {
 
+    private String id;
+
     public void testMarshal() throws Exception {
         DefaultExchangeHolder holder = createHolder();
         assertNotNull(holder);
@@ -31,6 +33,7 @@ public class DefaultExchangeHolderTest e
     }
 
     public void testUnmarshal() throws Exception {
+        id = null;
         Exchange exchange = new DefaultExchange(context);
 
         DefaultExchangeHolder.unmarshal(exchange, createHolder());
@@ -38,10 +41,12 @@ public class DefaultExchangeHolderTest e
         assertEquals("Bye World", exchange.getOut().getBody());
         assertEquals(123, exchange.getIn().getHeader("foo"));
         assertEquals(444, exchange.getProperty("bar"));
+        assertEquals(id, exchange.getExchangeId());
     }
 
     private DefaultExchangeHolder createHolder() {
         Exchange exchange = new DefaultExchange(context);
+        id = exchange.getExchangeId();
         exchange.getIn().setBody("Hello World");
         exchange.getIn().setHeader("foo", 123);
         exchange.setProperty("bar", 444);

Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Wed Mar 24 08:34:52 2010
@@ -20,10 +20,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.impl.DefaultExchangeHolder;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.util.ObjectHelper;
@@ -33,10 +30,6 @@ import org.apache.commons.logging.LogFac
 import org.fusesource.hawtdb.api.Index;
 import org.fusesource.hawtdb.api.Transaction;
 import org.fusesource.hawtdb.util.buffer.Buffer;
-import org.fusesource.hawtdb.util.buffer.DataByteArrayInputStream;
-import org.fusesource.hawtdb.util.buffer.DataByteArrayOutputStream;
-import org.fusesource.hawtdb.util.marshaller.Marshaller;
-import org.fusesource.hawtdb.util.marshaller.ObjectMarshaller;
 
 /**
  * An instance of AggregationRepository which is backed by a HawtDB.
@@ -50,8 +43,7 @@ public class HawtDBAggregationRepository
     private Integer bufferSize;
     private boolean sync;
     private boolean returnOldExchange;
-    private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>();
-    private Marshaller<DefaultExchangeHolder> exchangeMarshaller = new ObjectMarshaller<DefaultExchangeHolder>();
+    private HawtDBCamelMarshaller<K> marshaller = new HawtDBCamelMarshaller<K>();
 
     /**
      * Creates an aggregation repository
@@ -96,7 +88,7 @@ public class HawtDBAggregationRepository
         this.repositoryName = repositoryName;
     }
 
-    public Exchange add(CamelContext camelContext, final K key, Exchange exchange) {
+    public Exchange add(final CamelContext camelContext, final K key, final Exchange exchange) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Adding key   [" + key + "] -> " + exchange);
         }
@@ -106,8 +98,8 @@ public class HawtDBAggregationRepository
             // HawtDB could then eliminate the need to marshal and un-marshal  
             // in some cases.  But since we can.. we are going to force
             // early marshaling.
-            final Buffer keyBuffer = marshallKey(key);
-            final Buffer exchangeBuffer = marshallExchange(camelContext, exchange);
+            final Buffer keyBuffer = marshaller.marshallKey(key);
+            final Buffer exchangeBuffer = marshaller.marshallExchange(camelContext, exchange);
             Buffer rc = hawtDBFile.execute(new Work<Buffer>() {
                 public Buffer execute(Transaction tx) {
                     Index<Buffer, Buffer> index = hawtDBFile.getRepositoryIndex(tx, repositoryName);
@@ -125,7 +117,7 @@ public class HawtDBAggregationRepository
 
             // only return old exchange if enabled
             if (isReturnOldExchange()) {
-                return unmarshallExchange(camelContext, rc);
+                return marshaller.unmarshallExchange(camelContext, rc);
             }
         } catch (IOException e) {
             throw new RuntimeException("Error adding to repository " + repositoryName + " with key " + key, e);
@@ -134,11 +126,10 @@ public class HawtDBAggregationRepository
         return null;
     }
 
-
-    public Exchange get(CamelContext camelContext, final K key) {
+    public Exchange get(final CamelContext camelContext, final K key) {
         Exchange answer = null;
         try {
-            final Buffer keyBuffer = marshallKey(key);
+            final Buffer keyBuffer = marshaller.marshallKey(key);
             Buffer rc = hawtDBFile.execute(new Work<Buffer>() {
                 public Buffer execute(Transaction tx) {
                     Index<Buffer, Buffer> index = hawtDBFile.getRepositoryIndex(tx, repositoryName);
@@ -151,7 +142,7 @@ public class HawtDBAggregationRepository
                 }
             });
             if (rc != null) {
-                answer = unmarshallExchange(camelContext, rc);
+                answer = marshaller.unmarshallExchange(camelContext, rc);
             }
         } catch (IOException e) {
             throw new RuntimeException("Error getting key " + key + " from repository " + repositoryName, e);
@@ -163,16 +154,24 @@ public class HawtDBAggregationRepository
         return answer;
     }
 
-    public void remove(CamelContext camelContext, final K key) {
+    public void remove(final CamelContext camelContext, final K key, final Exchange exchange) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Removing key [" + key + "]");
         }
         try {
-            final Buffer keyBuffer = marshallKey(key);
+            final Buffer keyBuffer = marshaller.marshallKey(key);
+            final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchange.getExchangeId());
+            final Buffer exchangeBuffer = marshaller.marshallExchange(camelContext, exchange);
             hawtDBFile.execute(new Work<Buffer>() {
                 public Buffer execute(Transaction tx) {
                     Index<Buffer, Buffer> index = hawtDBFile.getRepositoryIndex(tx, repositoryName);
-                    return index.remove(keyBuffer);
+                    // remove from the in progress index
+                    index.remove(keyBuffer);
+
+                    // and add it to the confirmed index
+                    Index<Buffer, Buffer> indexCompleted = hawtDBFile.getRepositoryIndex(tx, getRepositoryNameCompleted());
+                    indexCompleted.put(confirmKeyBuffer, exchangeBuffer);
+                    return null;
                 }
 
                 @Override
@@ -180,45 +179,33 @@ public class HawtDBAggregationRepository
                     return "Removing key [" + key + "]";
                 }
             });
+
         } catch (IOException e) {
             throw new RuntimeException("Error removing key " + key + " from repository " + repositoryName, e);
         }
     }
 
-    protected Buffer marshallKey(K key) throws IOException {
-        DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
-        keyMarshaller.writePayload(key, baos);
-        return baos.toBuffer();
-    }
-
-    protected Buffer marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException {
-        DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
-        // use DefaultExchangeHolder to marshal to a serialized object
-        DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
-        // add the aggregated size property as the only property we want to retain
-        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
-        // persist the from endpoint as well
-        if (exchange.getFromEndpoint() != null) {
-            DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());
+    public void confirm(final CamelContext camelContext, final String exchangeId) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Confirming exchangeId [" + exchangeId + "]");
         }
-        exchangeMarshaller.writePayload(pe, baos);
-        return baos.toBuffer();
-    }
+        try {
+            final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchangeId);
+            hawtDBFile.execute(new Work<Buffer>() {
+                public Buffer execute(Transaction tx) {
+                    Index<Buffer, Buffer> indexCompleted = hawtDBFile.getRepositoryIndex(tx, getRepositoryNameCompleted());
+                    return indexCompleted.remove(confirmKeyBuffer);
+                }
 
-    protected Exchange unmarshallExchange(CamelContext camelContext, Buffer buffer) throws IOException {
-        DataByteArrayInputStream bais = new DataByteArrayInputStream(buffer);
-        DefaultExchangeHolder pe = exchangeMarshaller.readPayload(bais);
-        Exchange answer = new DefaultExchange(camelContext);
-        DefaultExchangeHolder.unmarshal(answer, pe);
-        // restore the from endpoint
-        String fromEndpointUri = (String) answer.removeProperty("CamelAggregatedFromEndpoint");
-        if (fromEndpointUri != null) {
-            Endpoint fromEndpoint = camelContext.hasEndpoint(fromEndpointUri);
-            if (fromEndpoint != null) {
-                answer.setFromEndpoint(fromEndpoint);
-            }
+                @Override
+                public String toString() {
+                    return "Confirming exchangeId [" + exchangeId + "]";
+                }
+            });
+
+        } catch (IOException e) {
+            throw new RuntimeException("Error confirming exchangeId " + exchangeId + " from repository " + repositoryName, e);
         }
-        return answer;
     }
 
     public HawtDBFile getHawtDBFile() {
@@ -233,6 +220,10 @@ public class HawtDBAggregationRepository
         return repositoryName;
     }
 
+    private String getRepositoryNameCompleted() {
+        return repositoryName + "-completed";
+    }
+
     public void setRepositoryName(String repositoryName) {
         this.repositoryName = repositoryName;
     }

Added: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java?rev=926977&view=auto
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java (added)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java Wed Mar 24 08:34:52 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hawtdb;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.fusesource.hawtdb.util.buffer.Buffer;
+import org.fusesource.hawtdb.util.buffer.DataByteArrayInputStream;
+import org.fusesource.hawtdb.util.buffer.DataByteArrayOutputStream;
+import org.fusesource.hawtdb.util.marshaller.Marshaller;
+import org.fusesource.hawtdb.util.marshaller.ObjectMarshaller;
+import org.fusesource.hawtdb.util.marshaller.StringMarshaller;
+
+/**
+ * @version $Revision$
+ */
+public final class HawtDBCamelMarshaller<K> {
+
+    private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>();
+    private Marshaller<String> confirmKeyMarshaller = new StringMarshaller();
+    private Marshaller<DefaultExchangeHolder> exchangeMarshaller = new ObjectMarshaller<DefaultExchangeHolder>();
+
+    public Buffer marshallKey(K key) throws IOException {
+        DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+        keyMarshaller.writePayload(key, baos);
+        return baos.toBuffer();
+    }
+
+    public Buffer marshallConfirmKey(String exchangeId) throws IOException {
+        DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+        confirmKeyMarshaller.writePayload(exchangeId, baos);
+        return baos.toBuffer();
+    }
+
+    public Buffer marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException {
+        DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+        // use DefaultExchangeHolder to marshal to a serialized object
+        DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
+        // add the aggregated size property as the only property we want to retain
+        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
+        // add the aggregated completed by property to retain
+        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
+        // add the aggregated correlation key property to retain
+        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_CORRELATION_KEY, exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, Serializable.class));
+        // persist the from endpoint as well
+        if (exchange.getFromEndpoint() != null) {
+            DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());
+        }
+        exchangeMarshaller.writePayload(pe, baos);
+        return baos.toBuffer();
+    }
+
+    public Exchange unmarshallExchange(CamelContext camelContext, Buffer buffer) throws IOException {
+        DataByteArrayInputStream bais = new DataByteArrayInputStream(buffer);
+        DefaultExchangeHolder pe = exchangeMarshaller.readPayload(bais);
+        Exchange answer = new DefaultExchange(camelContext);
+        DefaultExchangeHolder.unmarshal(answer, pe);
+        // restore the from endpoint
+        String fromEndpointUri = (String) answer.removeProperty("CamelAggregatedFromEndpoint");
+        if (fromEndpointUri != null) {
+            Endpoint fromEndpoint = camelContext.hasEndpoint(fromEndpointUri);
+            if (fromEndpoint != null) {
+                answer.setFromEndpoint(fromEndpoint);
+            }
+        }
+        return answer;
+    }
+
+}

Propchange: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java (from r926504, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java&r1=926504&r2=926977&rev=926977&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java Wed Mar 24 08:34:52 2010
@@ -18,23 +18,28 @@ package org.apache.camel.component.hawtd
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.fusesource.hawtdb.api.Index;
+import org.fusesource.hawtdb.api.Transaction;
+import org.fusesource.hawtdb.util.buffer.Buffer;
 import org.junit.Test;
 
-public class HawtDBAggregateTest extends CamelTestSupport {
+public class HawtDBAggregateNotLostTest extends CamelTestSupport {
+
+    private HawtDBAggregationRepository<String> repo;
 
     @Override
     public void setUp() throws Exception {
         deleteDirectory("target/data");
+        repo = new HawtDBAggregationRepository<String>("repo1", "target/data/hawtdb.dat");
         super.setUp();
     }
 
     @Test
-    public void testHawtDBAggregate() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:aggregated");
-        mock.expectedBodiesReceived("ABCDE");
+    public void testHawtDBAggregateNotLost() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("ABCDE");
+        getMockEndpoint("mock:result").expectedMessageCount(0);
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
@@ -44,27 +49,47 @@ public class HawtDBAggregateTest extends
 
         assertMockEndpointsSatisfied();
 
-        // from endpoint should be preserved
-        assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri());
+        String exchangeId = getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId();
+
+        // the exchange should be in the completed repo where we should be able to find it
+        final HawtDBFile hawtDBFile = repo.getHawtDBFile();
+        final HawtDBCamelMarshaller<Object> marshaller = new HawtDBCamelMarshaller<Object>();
+        final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchangeId);
+        Buffer bf = hawtDBFile.execute(new Work<Buffer>() {
+            public Buffer execute(Transaction tx) {
+                Index<Buffer, Buffer> index = hawtDBFile.getRepositoryIndex(tx, "repo1-completed");
+                return index.get(confirmKeyBuffer);
+            }
+        });
+
+        // assert the exchange was not lost and we got all the information still
+        assertNotNull(bf);
+        Exchange completed = marshaller.unmarshallExchange(context, bf);
+        assertNotNull(completed);
+        // should retain the exchange id
+        assertEquals(exchangeId, completed.getExchangeId());
+        assertEquals("ABCDE", completed.getIn().getBody());
+        assertEquals(123, completed.getIn().getHeader("id"));
+        assertEquals("size", completed.getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+        assertEquals(5, completed.getProperty(Exchange.AGGREGATED_SIZE));
+        assertEquals(123, completed.getProperty(Exchange.AGGREGATED_CORRELATION_KEY));
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
-            // START SNIPPET: e1
             public void configure() throws Exception {
-                // create the hawtdb repo
-                HawtDBAggregationRepository<String> repo = new HawtDBAggregationRepository<String>("repo1", "target/data/hawtdb.dat");
-
-                // here is the Camel route where we aggregate
                 from("direct:start")
                     .aggregate(header("id"), new MyAggregationStrategy())
-                        // use our created hawtdb repo as aggregation repository
                         .completionSize(5).aggregationRepository(repo)
-                        .to("mock:aggregated");
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                        // throw an exception to fail, which we then will loose this message
+                        .throwException(new IllegalArgumentException("Damn"))
+                        .to("mock:result")
+                    .end();
             }
-            // END SNIPPET: e1
         };
     }
 

Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java Wed Mar 24 08:34:52 2010
@@ -76,7 +76,7 @@ public class HawtDBAggregationRepository
         assertEquals("counter:2", actual.getIn().getBody());
 
         // now remove it
-        repo.remove(context, "foo");
+        repo.remove(context, "foo", actual);
         actual = repo.get(context, "foo");
         assertEquals(null, actual);