You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/24 09:34:52 UTC
svn commit: r926977 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/processor/aggregate/
camel-core/src/main/java/org/apache/camel/spi/ camel-core/s...
Author: davsclaus
Date: Wed Mar 24 08:34:52 2010
New Revision: 926977
URL: http://svn.apache.org/viewvc?rev=926977&view=rev
Log:
CAMEL-2568: Improving AggregationRepository to support transactional behavior in impls such as camel-hawtdb. Work in progress.
Added:
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java (with props)
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java
- copied, changed from r926504, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeHolderTest.java
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed Mar 24 08:34:52 2010
@@ -31,12 +31,13 @@ import org.apache.camel.spi.UnitOfWork;
*/
public interface Exchange {
- String ACCEPT_CONTENT_TYPE = "CamelAcceptContentType";
+ String ACCEPT_CONTENT_TYPE = "CamelAcceptContentType";
@Deprecated
- String AGGREGATED_INDEX = "CamelAggregatedIndex";
- String AGGREGATED_SIZE = "CamelAggregatedSize";
- String AGGREGATED_COMPLETED_BY = "CamelAggregatedCompletedBy";
- String ASYNC_WAIT = "CamelAsyncWait";
+ String AGGREGATED_INDEX = "CamelAggregatedIndex";
+ String AGGREGATED_SIZE = "CamelAggregatedSize";
+ String AGGREGATED_COMPLETED_BY = "CamelAggregatedCompletedBy";
+ String AGGREGATED_CORRELATION_KEY = "CamelAggregatedCorrelationKey";
+ String ASYNC_WAIT = "CamelAsyncWait";
String BATCH_INDEX = "CamelBatchIndex";
String BATCH_SIZE = "CamelBatchSize";
@@ -80,8 +81,8 @@ public interface Exchange {
String HTTP_URI = "CamelHttpUri";
String HTTP_URL = "CamelHttpUrl";
String HTTP_CHUNKED = "CamelHttpChunked";
- String HTTP_SERVLET_REQUEST = "CamelHttpServletRequest";
- String HTTP_SERVLET_RESPONSE = "CamelHttpServletResponse";
+ String HTTP_SERVLET_REQUEST = "CamelHttpServletRequest";
+ String HTTP_SERVLET_RESPONSE = "CamelHttpServletResponse";
String INTERCEPTED_ENDPOINT = "CamelInterceptedEndpoint";
String TO_ENDPOINT = "CamelToEndpoint";
@@ -95,19 +96,19 @@ public interface Exchange {
String ON_COMPLETION = "CamelOnCompletion";
- String ROUTE_STOP = "CamelRouteStop";
- String REDELIVERED = "CamelRedelivered";
- String REDELIVERY_COUNTER = "CamelRedeliveryCounter";
+ String ROUTE_STOP = "CamelRouteStop";
+ String REDELIVERED = "CamelRedelivered";
+ String REDELIVERY_COUNTER = "CamelRedeliveryCounter";
String REDELIVERY_EXHAUSTED = "CamelRedeliveryExhausted";
- String ROLLBACK_ONLY = "CamelRollbackOnly";
- String ROLLBACK_ONLY_LAST = "CamelRollbackOnlyLast";
+ String ROLLBACK_ONLY = "CamelRollbackOnly";
+ String ROLLBACK_ONLY_LAST = "CamelRollbackOnlyLast";
String SOAP_ACTION = "CamelSoapAction";
String SPLIT_INDEX = "CamelSplitIndex";
String SPLIT_SIZE = "CamelSplitSize";
- String TRANSACTED = "CamelTransacted";
- String TRANSFER_ENCODING = "Transfer-Encoding";
+ String TRANSACTED = "CamelTransacted";
+ String TRANSFER_ENCODING = "Transfer-Encoding";
String TRACE_EVENT = "CamelTraceEvent";
String TRACE_EVENT_NODE_ID = "CamelTraceEventNodeId";
String TRACE_EVENT_TIMESTAMP = "CamelTraceEventTimestamp";
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java Wed Mar 24 08:34:52 2010
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFac
* As opposed to normal usage where only the body part of the exchange is transferred over the wire,
* this holder object serializes the following fields over the wire:
* <ul>
+ * <li>exchangeId</li>
* <li>in body</li>
* <li>out body</li>
* <li>in headers</li>
@@ -46,9 +47,10 @@ import org.apache.commons.logging.LogFac
*/
public class DefaultExchangeHolder implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private static final transient Log LOG = LogFactory.getLog(DefaultExchangeHolder.class);
+ private String exchangeId;
private Object inBody;
private Object outBody;
private Boolean outFaultFlag = Boolean.FALSE;
@@ -79,6 +81,7 @@ public class DefaultExchangeHolder imple
public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) {
DefaultExchangeHolder payload = new DefaultExchangeHolder();
+ payload.exchangeId = exchange.getExchangeId();
payload.inBody = checkSerializableObject("in body", exchange, exchange.getIn().getBody());
payload.safeSetInHeaders(exchange);
if (exchange.hasOut()) {
@@ -101,6 +104,7 @@ public class DefaultExchangeHolder imple
* @param payload the payload with the values
*/
public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) {
+ exchange.setExchangeId(payload.exchangeId);
exchange.getIn().setBody(payload.inBody);
if (payload.inHeaders != null) {
exchange.getIn().setHeaders(payload.inHeaders);
@@ -141,7 +145,7 @@ public class DefaultExchangeHolder imple
}
public String toString() {
- StringBuilder sb = new StringBuilder("DefaultExchangeHolder[");
+ StringBuilder sb = new StringBuilder("DefaultExchangeHolder[exchangeId=").append(exchangeId);
sb.append("inBody=").append(inBody).append(", outBody=").append(outBody);
sb.append(", inHeaders=").append(inHeaders).append(", outHeaders=").append(outHeaders);
sb.append(", properties=").append(properties).append(", exception=").append(exception);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Wed Mar 24 08:34:52 2010
@@ -69,6 +69,8 @@ public abstract class ServiceSupport imp
starting.set(false);
stopping.set(false);
stopped.set(false);
+ shutdown.set(false);
+ shuttingdown.set(false);
}
}
}
@@ -94,6 +96,8 @@ public abstract class ServiceSupport imp
stopping.set(false);
starting.set(false);
started.set(false);
+ shutdown.set(false);
+ shuttingdown.set(false);
}
}
}
@@ -118,6 +122,7 @@ public abstract class ServiceSupport imp
}
}
} finally {
+ // shutdown is also stopped so only set shutdown flags
shutdown.set(true);
shuttingdown.set(false);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Mar 24 08:34:52 2010
@@ -289,9 +289,11 @@ public class AggregateProcessor extends
return aggregationStrategy.aggregate(oldExchange, newExchange);
}
- protected void onCompletion(Object key, final Exchange exchange, boolean fromTimeout) {
+ protected void onCompletion(final Object key, final Exchange exchange, boolean fromTimeout) {
+ // store the correlation key as property
+ exchange.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
// remove from repository as its completed
- aggregationRepository.remove(exchange.getContext(), key);
+ aggregationRepository.remove(exchange.getContext(), key, exchange);
if (!fromTimeout && timeoutMap != null) {
// cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
timeoutMap.remove(key);
@@ -302,6 +304,10 @@ public class AggregateProcessor extends
closedCorrelationKeys.put(key, key);
}
+ onSubmitCompletion(key, exchange);
+ }
+
+ private void onSubmitCompletion(final Object key, final Exchange exchange) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aggregation complete for correlation key " + key + " sending aggregated exchange: " + exchange);
}
@@ -316,6 +322,8 @@ public class AggregateProcessor extends
} catch (Throwable t) {
// must catch throwable so we will handle all exceptions as the executor service will by default ignore them
exchange.setException(new CamelExchangeException("Error processing aggregated exchange", exchange, t));
+ } finally {
+ aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId());
}
// if there was an exception then let the exception handler handle it
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java Wed Mar 24 08:34:52 2010
@@ -41,10 +41,14 @@ public class MemoryAggregationRepository
return cache.get(key);
}
- public void remove(CamelContext camelContext, Object key) {
+ public void remove(CamelContext camelContext, Object key, Exchange exchange) {
cache.remove(key);
}
+ public void confirm(CamelContext camelContext, String exchangeId) {
+ // noop
+ }
+
@Override
protected void doStart() throws Exception {
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java Wed Mar 24 08:34:52 2010
@@ -31,9 +31,9 @@ public interface AggregationRepository<K
* <p/>
* Will replace any existing exchange.
*
- * @param camelContext the current CamelContext
- * @param key the correlation key
- * @param exchange the aggregated exchange
+ * @param camelContext the current CamelContext
+ * @param key the correlation key
+ * @param exchange the aggregated exchange
* @return the old exchange if any existed
*/
Exchange add(CamelContext camelContext, K key, Exchange exchange);
@@ -41,18 +41,28 @@ public interface AggregationRepository<K
/**
* Gets the given exchange with the correlation key
*
- * @param camelContext the current CamelContext
- * @param key the correlation key
+ * @param camelContext the current CamelContext
+ * @param key the correlation key
* @return the exchange, or <tt>null</tt> if no exchange was previously added
*/
Exchange get(CamelContext camelContext, K key);
/**
- * Removes the exchange with the given correlation key
+ * Removes the exchange with the given correlation key, which should happen
+ * when an {@link Exchange} is completed
*
- * @param camelContext the current CamelContext
- * @param key the correlation key
+ * @param camelContext the current CamelContext
+ * @param key the correlation key
+ * @param exchange the exchange to remove
*/
- void remove(CamelContext camelContext, K key);
+ void remove(CamelContext camelContext, K key, Exchange exchange);
+
+ /**
+ * Confirms the completion of the {@link Exchange}.
+ *
+ * @param camelContext the current CamelContext
+ * @param exchangeId exchange id to confirm
+ */
+ void confirm(CamelContext camelContext, String exchangeId);
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeHolderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeHolderTest.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeHolderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeHolderTest.java Wed Mar 24 08:34:52 2010
@@ -24,6 +24,8 @@ import org.apache.camel.Exchange;
*/
public class DefaultExchangeHolderTest extends ContextTestSupport {
+ private String id;
+
public void testMarshal() throws Exception {
DefaultExchangeHolder holder = createHolder();
assertNotNull(holder);
@@ -31,6 +33,7 @@ public class DefaultExchangeHolderTest e
}
public void testUnmarshal() throws Exception {
+ id = null;
Exchange exchange = new DefaultExchange(context);
DefaultExchangeHolder.unmarshal(exchange, createHolder());
@@ -38,10 +41,12 @@ public class DefaultExchangeHolderTest e
assertEquals("Bye World", exchange.getOut().getBody());
assertEquals(123, exchange.getIn().getHeader("foo"));
assertEquals(444, exchange.getProperty("bar"));
+ assertEquals(id, exchange.getExchangeId());
}
private DefaultExchangeHolder createHolder() {
Exchange exchange = new DefaultExchange(context);
+ id = exchange.getExchangeId();
exchange.getIn().setBody("Hello World");
exchange.getIn().setHeader("foo", 123);
exchange.setProperty("bar", 444);
Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Wed Mar 24 08:34:52 2010
@@ -20,10 +20,7 @@ import java.io.File;
import java.io.IOException;
import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.impl.DefaultExchangeHolder;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.util.ObjectHelper;
@@ -33,10 +30,6 @@ import org.apache.commons.logging.LogFac
import org.fusesource.hawtdb.api.Index;
import org.fusesource.hawtdb.api.Transaction;
import org.fusesource.hawtdb.util.buffer.Buffer;
-import org.fusesource.hawtdb.util.buffer.DataByteArrayInputStream;
-import org.fusesource.hawtdb.util.buffer.DataByteArrayOutputStream;
-import org.fusesource.hawtdb.util.marshaller.Marshaller;
-import org.fusesource.hawtdb.util.marshaller.ObjectMarshaller;
/**
* An instance of AggregationRepository which is backed by a HawtDB.
@@ -50,8 +43,7 @@ public class HawtDBAggregationRepository
private Integer bufferSize;
private boolean sync;
private boolean returnOldExchange;
- private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>();
- private Marshaller<DefaultExchangeHolder> exchangeMarshaller = new ObjectMarshaller<DefaultExchangeHolder>();
+ private HawtDBCamelMarshaller<K> marshaller = new HawtDBCamelMarshaller<K>();
/**
* Creates an aggregation repository
@@ -96,7 +88,7 @@ public class HawtDBAggregationRepository
this.repositoryName = repositoryName;
}
- public Exchange add(CamelContext camelContext, final K key, Exchange exchange) {
+ public Exchange add(final CamelContext camelContext, final K key, final Exchange exchange) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding key [" + key + "] -> " + exchange);
}
@@ -106,8 +98,8 @@ public class HawtDBAggregationRepository
// HawtDB could then eliminate the need to marshal and un-marshal
// in some cases. But since we can.. we are going to force
// early marshaling.
- final Buffer keyBuffer = marshallKey(key);
- final Buffer exchangeBuffer = marshallExchange(camelContext, exchange);
+ final Buffer keyBuffer = marshaller.marshallKey(key);
+ final Buffer exchangeBuffer = marshaller.marshallExchange(camelContext, exchange);
Buffer rc = hawtDBFile.execute(new Work<Buffer>() {
public Buffer execute(Transaction tx) {
Index<Buffer, Buffer> index = hawtDBFile.getRepositoryIndex(tx, repositoryName);
@@ -125,7 +117,7 @@ public class HawtDBAggregationRepository
// only return old exchange if enabled
if (isReturnOldExchange()) {
- return unmarshallExchange(camelContext, rc);
+ return marshaller.unmarshallExchange(camelContext, rc);
}
} catch (IOException e) {
throw new RuntimeException("Error adding to repository " + repositoryName + " with key " + key, e);
@@ -134,11 +126,10 @@ public class HawtDBAggregationRepository
return null;
}
-
- public Exchange get(CamelContext camelContext, final K key) {
+ public Exchange get(final CamelContext camelContext, final K key) {
Exchange answer = null;
try {
- final Buffer keyBuffer = marshallKey(key);
+ final Buffer keyBuffer = marshaller.marshallKey(key);
Buffer rc = hawtDBFile.execute(new Work<Buffer>() {
public Buffer execute(Transaction tx) {
Index<Buffer, Buffer> index = hawtDBFile.getRepositoryIndex(tx, repositoryName);
@@ -151,7 +142,7 @@ public class HawtDBAggregationRepository
}
});
if (rc != null) {
- answer = unmarshallExchange(camelContext, rc);
+ answer = marshaller.unmarshallExchange(camelContext, rc);
}
} catch (IOException e) {
throw new RuntimeException("Error getting key " + key + " from repository " + repositoryName, e);
@@ -163,16 +154,24 @@ public class HawtDBAggregationRepository
return answer;
}
- public void remove(CamelContext camelContext, final K key) {
+ public void remove(final CamelContext camelContext, final K key, final Exchange exchange) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing key [" + key + "]");
}
try {
- final Buffer keyBuffer = marshallKey(key);
+ final Buffer keyBuffer = marshaller.marshallKey(key);
+ final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchange.getExchangeId());
+ final Buffer exchangeBuffer = marshaller.marshallExchange(camelContext, exchange);
hawtDBFile.execute(new Work<Buffer>() {
public Buffer execute(Transaction tx) {
Index<Buffer, Buffer> index = hawtDBFile.getRepositoryIndex(tx, repositoryName);
- return index.remove(keyBuffer);
+ // remove from the in progress index
+ index.remove(keyBuffer);
+
+ // and add it to the confirmed index
+ Index<Buffer, Buffer> indexCompleted = hawtDBFile.getRepositoryIndex(tx, getRepositoryNameCompleted());
+ indexCompleted.put(confirmKeyBuffer, exchangeBuffer);
+ return null;
}
@Override
@@ -180,45 +179,33 @@ public class HawtDBAggregationRepository
return "Removing key [" + key + "]";
}
});
+
} catch (IOException e) {
throw new RuntimeException("Error removing key " + key + " from repository " + repositoryName, e);
}
}
- protected Buffer marshallKey(K key) throws IOException {
- DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
- keyMarshaller.writePayload(key, baos);
- return baos.toBuffer();
- }
-
- protected Buffer marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException {
- DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
- // use DefaultExchangeHolder to marshal to a serialized object
- DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
- // add the aggregated size property as the only property we want to retain
- DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
- // persist the from endpoint as well
- if (exchange.getFromEndpoint() != null) {
- DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());
+ public void confirm(final CamelContext camelContext, final String exchangeId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Confirming exchangeId [" + exchangeId + "]");
}
- exchangeMarshaller.writePayload(pe, baos);
- return baos.toBuffer();
- }
+ try {
+ final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchangeId);
+ hawtDBFile.execute(new Work<Buffer>() {
+ public Buffer execute(Transaction tx) {
+ Index<Buffer, Buffer> indexCompleted = hawtDBFile.getRepositoryIndex(tx, getRepositoryNameCompleted());
+ return indexCompleted.remove(confirmKeyBuffer);
+ }
- protected Exchange unmarshallExchange(CamelContext camelContext, Buffer buffer) throws IOException {
- DataByteArrayInputStream bais = new DataByteArrayInputStream(buffer);
- DefaultExchangeHolder pe = exchangeMarshaller.readPayload(bais);
- Exchange answer = new DefaultExchange(camelContext);
- DefaultExchangeHolder.unmarshal(answer, pe);
- // restore the from endpoint
- String fromEndpointUri = (String) answer.removeProperty("CamelAggregatedFromEndpoint");
- if (fromEndpointUri != null) {
- Endpoint fromEndpoint = camelContext.hasEndpoint(fromEndpointUri);
- if (fromEndpoint != null) {
- answer.setFromEndpoint(fromEndpoint);
- }
+ @Override
+ public String toString() {
+ return "Confirming exchangeId [" + exchangeId + "]";
+ }
+ });
+
+ } catch (IOException e) {
+ throw new RuntimeException("Error confirming exchangeId " + exchangeId + " from repository " + repositoryName, e);
}
- return answer;
}
public HawtDBFile getHawtDBFile() {
@@ -233,6 +220,10 @@ public class HawtDBAggregationRepository
return repositoryName;
}
+ private String getRepositoryNameCompleted() {
+ return repositoryName + "-completed";
+ }
+
public void setRepositoryName(String repositoryName) {
this.repositoryName = repositoryName;
}
Added: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java?rev=926977&view=auto
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java (added)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java Wed Mar 24 08:34:52 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hawtdb;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.fusesource.hawtdb.util.buffer.Buffer;
+import org.fusesource.hawtdb.util.buffer.DataByteArrayInputStream;
+import org.fusesource.hawtdb.util.buffer.DataByteArrayOutputStream;
+import org.fusesource.hawtdb.util.marshaller.Marshaller;
+import org.fusesource.hawtdb.util.marshaller.ObjectMarshaller;
+import org.fusesource.hawtdb.util.marshaller.StringMarshaller;
+
+/**
+ * @version $Revision$
+ */
+public final class HawtDBCamelMarshaller<K> {
+
+ private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>();
+ private Marshaller<String> confirmKeyMarshaller = new StringMarshaller();
+ private Marshaller<DefaultExchangeHolder> exchangeMarshaller = new ObjectMarshaller<DefaultExchangeHolder>();
+
+ public Buffer marshallKey(K key) throws IOException {
+ DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+ keyMarshaller.writePayload(key, baos);
+ return baos.toBuffer();
+ }
+
+ public Buffer marshallConfirmKey(String exchangeId) throws IOException {
+ DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+ confirmKeyMarshaller.writePayload(exchangeId, baos);
+ return baos.toBuffer();
+ }
+
+ public Buffer marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException {
+ DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+ // use DefaultExchangeHolder to marshal to a serialized object
+ DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
+ // add the aggregated size property as the only property we want to retain
+ DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
+ // add the aggregated completed by property to retain
+ DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
+ // add the aggregated correlation key property to retain
+ DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_CORRELATION_KEY, exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, Serializable.class));
+ // persist the from endpoint as well
+ if (exchange.getFromEndpoint() != null) {
+ DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());
+ }
+ exchangeMarshaller.writePayload(pe, baos);
+ return baos.toBuffer();
+ }
+
+ public Exchange unmarshallExchange(CamelContext camelContext, Buffer buffer) throws IOException {
+ DataByteArrayInputStream bais = new DataByteArrayInputStream(buffer);
+ DefaultExchangeHolder pe = exchangeMarshaller.readPayload(bais);
+ Exchange answer = new DefaultExchange(camelContext);
+ DefaultExchangeHolder.unmarshal(answer, pe);
+ // restore the from endpoint
+ String fromEndpointUri = (String) answer.removeProperty("CamelAggregatedFromEndpoint");
+ if (fromEndpointUri != null) {
+ Endpoint fromEndpoint = camelContext.hasEndpoint(fromEndpointUri);
+ if (fromEndpoint != null) {
+ answer.setFromEndpoint(fromEndpoint);
+ }
+ }
+ return answer;
+ }
+
+}
Propchange: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java (from r926504, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java&r1=926504&r2=926977&rev=926977&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java Wed Mar 24 08:34:52 2010
@@ -18,23 +18,28 @@ package org.apache.camel.component.hawtd
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.test.junit4.CamelTestSupport;
+import org.fusesource.hawtdb.api.Index;
+import org.fusesource.hawtdb.api.Transaction;
+import org.fusesource.hawtdb.util.buffer.Buffer;
import org.junit.Test;
-public class HawtDBAggregateTest extends CamelTestSupport {
+public class HawtDBAggregateNotLostTest extends CamelTestSupport {
+
+ private HawtDBAggregationRepository<String> repo;
@Override
public void setUp() throws Exception {
deleteDirectory("target/data");
+ repo = new HawtDBAggregationRepository<String>("repo1", "target/data/hawtdb.dat");
super.setUp();
}
@Test
- public void testHawtDBAggregate() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:aggregated");
- mock.expectedBodiesReceived("ABCDE");
+ public void testHawtDBAggregateNotLost() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedBodiesReceived("ABCDE");
+ getMockEndpoint("mock:result").expectedMessageCount(0);
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
@@ -44,27 +49,47 @@ public class HawtDBAggregateTest extends
assertMockEndpointsSatisfied();
- // from endpoint should be preserved
- assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri());
+ String exchangeId = getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId();
+
+ // the exchange should be in the completed repo where we should be able to find it
+ final HawtDBFile hawtDBFile = repo.getHawtDBFile();
+ final HawtDBCamelMarshaller<Object> marshaller = new HawtDBCamelMarshaller<Object>();
+ final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchangeId);
+ Buffer bf = hawtDBFile.execute(new Work<Buffer>() {
+ public Buffer execute(Transaction tx) {
+ Index<Buffer, Buffer> index = hawtDBFile.getRepositoryIndex(tx, "repo1-completed");
+ return index.get(confirmKeyBuffer);
+ }
+ });
+
+ // assert the exchange was not lost and we got all the information still
+ assertNotNull(bf);
+ Exchange completed = marshaller.unmarshallExchange(context, bf);
+ assertNotNull(completed);
+ // should retain the exchange id
+ assertEquals(exchangeId, completed.getExchangeId());
+ assertEquals("ABCDE", completed.getIn().getBody());
+ assertEquals(123, completed.getIn().getHeader("id"));
+ assertEquals("size", completed.getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+ assertEquals(5, completed.getProperty(Exchange.AGGREGATED_SIZE));
+ assertEquals(123, completed.getProperty(Exchange.AGGREGATED_CORRELATION_KEY));
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
- // START SNIPPET: e1
public void configure() throws Exception {
- // create the hawtdb repo
- HawtDBAggregationRepository<String> repo = new HawtDBAggregationRepository<String>("repo1", "target/data/hawtdb.dat");
-
- // here is the Camel route where we aggregate
from("direct:start")
.aggregate(header("id"), new MyAggregationStrategy())
- // use our created hawtdb repo as aggregation repository
.completionSize(5).aggregationRepository(repo)
- .to("mock:aggregated");
+ .log("aggregated exchange id ${exchangeId} with ${body}")
+ .to("mock:aggregated")
+ // throw an exception to fail, which we then will loose this message
+ .throwException(new IllegalArgumentException("Damn"))
+ .to("mock:result")
+ .end();
}
- // END SNIPPET: e1
};
}
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java?rev=926977&r1=926976&r2=926977&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryTest.java Wed Mar 24 08:34:52 2010
@@ -76,7 +76,7 @@ public class HawtDBAggregationRepository
assertEquals("counter:2", actual.getIn().getBody());
// now remove it
- repo.remove(context, "foo");
+ repo.remove(context, "foo", actual);
actual = repo.get(context, "foo");
assertEquals(null, actual);