You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/04 13:10:13 UTC
svn commit: r930663 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/main/java/org/apache/camel/processor/aggregate/
camel-core/src/main/java/org/apache/camel/spi/
components/camel-hawtdb/src/main/java/org/apache/c...
Author: davsclaus
Date: Sun Apr 4 11:10:13 2010
New Revision: 930663
URL: http://svn.apache.org/viewvc?rev=930663&view=rev
Log:
CAMEL-2568: Added redelivery and redeliveryCounter as headers to recovered aggregated exchanges.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Sun Apr 4 11:10:13 2010
@@ -27,7 +27,9 @@ import org.apache.commons.logging.LogFac
/**
* A useful base class for any processor which provides some kind of throttling
- * or delayed processing
+ * or delayed processing.
+ * <p/>
+ * This implementation will block while waiting.
*
* @version $Revision$
*/
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java Sun Apr 4 11:10:13 2010
@@ -24,7 +24,9 @@ import org.apache.camel.Processor;
* A <a href="http://camel.apache.org/delayer.html">Delayer</a> which
* delays processing the exchange until the correct amount of time has elapsed
* using an expression to determine the delivery time.
- *
+ * <p/>
+ * This implementation will block while waiting.
+ *
* @version $Revision$
*/
public class Delayer extends DelayProcessorSupport implements Traceable {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sun Apr 4 11:10:13 2010
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -36,6 +37,7 @@ import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.SynchronizationAdapter;
import org.apache.camel.processor.Traceable;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.ExceptionHandler;
@@ -80,7 +82,15 @@ public class AggregateProcessor extends
private ExceptionHandler exceptionHandler;
private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
private Map<Object, Object> closedCorrelationKeys;
+ private final AggregateOnCompletion aggregateOnCompletion = new AggregateOnCompletion();
private final Set<String> inProgressCompleteExchanges = new HashSet<String>();
+ private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>();
+
+ // keep booking about redelivery
+ private class RedeliveryData {
+ int redeliveryCounter;
+ long redeliveryDelay;
+ }
// options
private boolean ignoreBadCorrelationKeys;
@@ -324,6 +334,10 @@ public class AggregateProcessor extends
// send this exchange
executorService.submit(new Runnable() {
public void run() {
+
+ // add on completion task so we remember to update the inProgressCompleteExchanges
+ exchange.addOnCompletion(aggregateOnCompletion);
+
try {
processor.process(exchange);
} catch (Exception e) {
@@ -333,18 +347,13 @@ public class AggregateProcessor extends
exchange.setException(new CamelExchangeException("Error processing aggregated exchange", exchange, t));
}
- try {
- // was it good or bad?
- if (exchange.getException() == null) {
- // only confirm if we processed without a problem
- aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId());
- } else {
- // if there was an exception then let the exception handler handle it
- getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException());
- }
- } finally {
- // must remember to remove when we are done
- inProgressCompleteExchanges.remove(exchange.getExchangeId());
+ // was it good or bad?
+ if (exchange.getException() == null) {
+ // only confirm if we processed without a problem
+ aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId());
+ } else {
+ // if there was an exception then let the exception handler handle it
+ getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException());
}
}
});
@@ -450,6 +459,29 @@ public class AggregateProcessor extends
}
/**
+ * On completion task which keeps the booking of the in progress up to date
+ */
+ private class AggregateOnCompletion extends SynchronizationAdapter {
+
+ @Override
+ public void onDone(Exchange exchange) {
+ // must remember to remove when we are done (done = success or failure)
+ inProgressCompleteExchanges.remove(exchange.getExchangeId());
+ }
+
+ @Override
+ public void onComplete(Exchange exchange) {
+ // remove redelivery state when it was processed successfully
+ redeliveryState.remove(exchange.getExchangeId());
+ }
+
+ @Override
+ public String toString() {
+ return "AggregateOnCompletion";
+ }
+ }
+
+ /**
* Background task that looks for aggregated exchanges which is triggered by completion timeouts.
*/
private final class AggregationTimeoutMap extends DefaultTimeoutMap<Object, Exchange> {
@@ -501,8 +533,8 @@ public class AggregateProcessor extends
boolean inProgress = inProgressCompleteExchanges.contains(exchangeId);
if (inProgress) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Aggregated exchange with id " + exchangeId + " is already in progress.");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Aggregated exchange with id " + exchangeId + " is already in progress.");
}
} else {
if (LOG.isDebugEnabled()) {
@@ -514,6 +546,22 @@ public class AggregateProcessor extends
String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
// and mark it as redelivered
exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
+
+ // update current redelivery state
+ RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
+ if (data == null) {
+ // create new data
+ data = new RedeliveryData();
+ redeliveryState.put(exchange.getExchangeId(), data);
+ }
+ data.redeliveryCounter++;
+
+ // TODO: support delay and have a DelayQueue to avoid blocking
+ // if so we need to pre add in progress so we wont add again to delay queue
+
+ // set redelivery counter
+ exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
+
// resubmit the recovered exchange
onSubmitCompletion(key, exchange);
}
@@ -550,7 +598,7 @@ public class AggregateProcessor extends
if (aggregationRepository instanceof RecoverableAggregationRepository) {
RecoverableAggregationRepository<Object> recoverable = (RecoverableAggregationRepository<Object>) aggregationRepository;
if (recoverable.isUseRecovery()) {
- long interval = recoverable.getCheckIntervalInMillis();
+ long interval = recoverable.getRecoveryIntervalInMillis();
if (interval <= 0) {
throw new IllegalArgumentException("AggregationRepository has recovery enabled and the CheckInterval option must be a positive number, was: " + interval);
}
@@ -574,14 +622,18 @@ public class AggregateProcessor extends
@Override
protected void doStop() throws Exception {
- ServiceHelper.stopService(timeoutMap);
- ServiceHelper.stopService(recoverService);
-
- ServiceHelper.stopService(aggregationRepository);
+ ServiceHelper.stopServices(timeoutMap, recoverService, aggregationRepository);
if (closedCorrelationKeys != null) {
closedCorrelationKeys.clear();
}
+ redeliveryState.clear();
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ // cleanup when shutting down
+ inProgressCompleteExchanges.clear();
}
}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java Sun Apr 4 11:10:13 2010
@@ -48,26 +48,26 @@ public interface RecoverableAggregationR
Exchange recover(CamelContext camelContext, String exchangeId);
/**
- * Sets the interval between scans
+ * Sets the interval between recovery scans
*
* @param interval the interval
* @param timeUnit the time unit
*/
- void setCheckInterval(long interval, TimeUnit timeUnit);
+ void setRecoveryInterval(long interval, TimeUnit timeUnit);
/**
- * Sets the interval between scans
+ * Sets the interval between recovery scans
*
* @param interval the interval in millis
*/
- void setCheckInterval(long interval);
+ void setRecoveryInterval(long interval);
/**
- * Gets the interval between scans in millis.
+ * Gets the interval between recovery scans in millis.
*
* @return the interval in millis
*/
- long getCheckIntervalInMillis();
+ long getRecoveryIntervalInMillis();
/**
* Whether or not recovery is enabled or not
Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Sun Apr 4 11:10:13 2010
@@ -50,7 +50,7 @@ public class HawtDBAggregationRepository
private boolean sync;
private boolean returnOldExchange;
private HawtDBCamelMarshaller<K> marshaller = new HawtDBCamelMarshaller<K>();
- private long interval = 5000;
+ private long recoveryInterval = 5000;
private boolean useRecovery = true;
/**
@@ -344,16 +344,16 @@ public class HawtDBAggregationRepository
this.returnOldExchange = returnOldExchange;
}
- public void setCheckInterval(long interval, TimeUnit timeUnit) {
- this.interval = timeUnit.toMillis(interval);
+ public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
+ this.recoveryInterval = timeUnit.toMillis(interval);
}
- public void setCheckInterval(long interval) {
- this.interval = interval;
+ public void setRecoveryInterval(long interval) {
+ this.recoveryInterval = interval;
}
- public long getCheckIntervalInMillis() {
- return interval;
+ public long getRecoveryIntervalInMillis() {
+ return recoveryInterval;
}
public boolean isUseRecovery() {
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java Sun Apr 4 11:10:13 2010
@@ -70,7 +70,7 @@ public class HawtDBAggregateConcurrentDi
private void doSendMessages(int files, int poolSize) throws Exception {
MockEndpoint mock = getMockEndpoint("mock:aggregated");
mock.expectedMessageCount(2);
- mock.setResultWaitTime(20 * 1000L);
+ mock.setResultWaitTime(30 * 1000L);
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
for (int i = 0; i < files; i++) {
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java Sun Apr 4 11:10:13 2010
@@ -69,7 +69,7 @@ public class HawtDBAggregateConcurrentSa
private void doSendMessages(int files, int poolSize) throws Exception {
MockEndpoint mock = getMockEndpoint("mock:aggregated");
- mock.setResultWaitTime(20 * 1000L);
+ mock.setResultWaitTime(30 * 1000L);
mock.expectedMessageCount(1);
// match number of expected numbers
mock.message(0).body(String.class).regex("[0-9]{" + files + "}");
Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java Sun Apr 4 11:10:13 2010
@@ -38,7 +38,7 @@ public class HawtDBAggregateRecoverTest
// enable recovery
repo.setUseRecovery(true);
// check faster
- repo.setCheckInterval(1, TimeUnit.SECONDS);
+ repo.setRecoveryInterval(1, TimeUnit.SECONDS);
super.setUp();
}
@@ -47,7 +47,10 @@ public class HawtDBAggregateRecoverTest
// should fail the first 2 times and then recover
getMockEndpoint("mock:aggregated").expectedMessageCount(3);
getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+ // should be marked as redelivered
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ // on the 2nd redelivery attempt we success
+ getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);