You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/04 13:10:13 UTC

svn commit: r930663 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/main/java/org/apache/camel/spi/ components/camel-hawtdb/src/main/java/org/apache/c...

Author: davsclaus
Date: Sun Apr  4 11:10:13 2010
New Revision: 930663

URL: http://svn.apache.org/viewvc?rev=930663&view=rev
Log:
CAMEL-2568: Added redelivery and redeliveryCounter as headers to recovered aggregated exchanges.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
    camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Sun Apr  4 11:10:13 2010
@@ -27,7 +27,9 @@ import org.apache.commons.logging.LogFac
 
 /**
  * A useful base class for any processor which provides some kind of throttling
- * or delayed processing
+ * or delayed processing.
+ * <p/>
+ * This implementation will block while waiting.
  * 
  * @version $Revision$
  */

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java Sun Apr  4 11:10:13 2010
@@ -24,7 +24,9 @@ import org.apache.camel.Processor;
  * A <a href="http://camel.apache.org/delayer.html">Delayer</a> which
  * delays processing the exchange until the correct amount of time has elapsed
  * using an expression to determine the delivery time.
- * 
+ * <p/>
+ * This implementation will block while waiting.
+ *
  * @version $Revision$
  */
 public class Delayer extends DelayProcessorSupport implements Traceable {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sun Apr  4 11:10:13 2010
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -36,6 +37,7 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.SynchronizationAdapter;
 import org.apache.camel.processor.Traceable;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.ExceptionHandler;
@@ -80,7 +82,15 @@ public class AggregateProcessor extends 
     private ExceptionHandler exceptionHandler;
     private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
     private Map<Object, Object> closedCorrelationKeys;
+    private final AggregateOnCompletion aggregateOnCompletion = new AggregateOnCompletion();
     private final Set<String> inProgressCompleteExchanges = new HashSet<String>();
+    private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>();
+
+    // keep booking about redelivery
+    private class RedeliveryData {
+        int redeliveryCounter;
+        long redeliveryDelay;
+    }
 
     // options
     private boolean ignoreBadCorrelationKeys;
@@ -324,6 +334,10 @@ public class AggregateProcessor extends 
         // send this exchange
         executorService.submit(new Runnable() {
             public void run() {
+
+                // add on completion task so we remember to update the inProgressCompleteExchanges
+                exchange.addOnCompletion(aggregateOnCompletion);
+
                 try {
                     processor.process(exchange);
                 } catch (Exception e) {
@@ -333,18 +347,13 @@ public class AggregateProcessor extends 
                     exchange.setException(new CamelExchangeException("Error processing aggregated exchange", exchange, t));
                 }
 
-                try {
-                    // was it good or bad?
-                    if (exchange.getException() == null) {
-                        // only confirm if we processed without a problem
-                        aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId());
-                    } else {
-                        // if there was an exception then let the exception handler handle it
-                        getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException());
-                    }
-                } finally {
-                    // must remember to remove when we are done
-                    inProgressCompleteExchanges.remove(exchange.getExchangeId());
+                // was it good or bad?
+                if (exchange.getException() == null) {
+                    // only confirm if we processed without a problem
+                    aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId());
+                } else {
+                    // if there was an exception then let the exception handler handle it
+                    getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException());
                 }
             }
         });
@@ -450,6 +459,29 @@ public class AggregateProcessor extends 
     }
 
     /**
+     * On completion task which keeps the booking of the in progress up to date
+     */
+    private class AggregateOnCompletion extends SynchronizationAdapter {
+
+        @Override
+        public void onDone(Exchange exchange) {
+            // must remember to remove when we are done (done = success or failure)
+            inProgressCompleteExchanges.remove(exchange.getExchangeId());
+        }
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            // remove redelivery state when it was processed successfully
+            redeliveryState.remove(exchange.getExchangeId());
+        }
+
+        @Override
+        public String toString() {
+            return "AggregateOnCompletion";
+        }
+    }
+
+    /**
      * Background task that looks for aggregated exchanges which is triggered by completion timeouts.
      */
     private final class AggregationTimeoutMap extends DefaultTimeoutMap<Object, Exchange> {
@@ -501,8 +533,8 @@ public class AggregateProcessor extends 
 
                 boolean inProgress = inProgressCompleteExchanges.contains(exchangeId);
                 if (inProgress) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Aggregated exchange with id " + exchangeId + " is already in progress.");
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Aggregated exchange with id " + exchangeId + " is already in progress.");
                     }
                 } else {
                     if (LOG.isDebugEnabled()) {
@@ -514,6 +546,22 @@ public class AggregateProcessor extends 
                         String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
                         // and mark it as redelivered
                         exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
+
+                        // update current redelivery state
+                        RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
+                        if (data == null) {
+                            // create new data
+                            data = new RedeliveryData();
+                            redeliveryState.put(exchange.getExchangeId(), data);
+                        }
+                        data.redeliveryCounter++;
+
+                        // TODO: support delay and have a DelayQueue to avoid blocking
+                        // if so we need to pre add in progress so we wont add again to delay queue
+
+                        // set redelivery counter
+                        exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
+
                         // resubmit the recovered exchange
                         onSubmitCompletion(key, exchange);
                     }
@@ -550,7 +598,7 @@ public class AggregateProcessor extends 
         if (aggregationRepository instanceof RecoverableAggregationRepository) {
             RecoverableAggregationRepository<Object> recoverable = (RecoverableAggregationRepository<Object>) aggregationRepository;
             if (recoverable.isUseRecovery()) {
-                long interval = recoverable.getCheckIntervalInMillis();
+                long interval = recoverable.getRecoveryIntervalInMillis();
                 if (interval <= 0) {
                     throw new IllegalArgumentException("AggregationRepository has recovery enabled and the CheckInterval option must be a positive number, was: " + interval);
                 }
@@ -574,14 +622,18 @@ public class AggregateProcessor extends 
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(timeoutMap);
-        ServiceHelper.stopService(recoverService);
-
-        ServiceHelper.stopService(aggregationRepository);
+        ServiceHelper.stopServices(timeoutMap, recoverService, aggregationRepository);
 
         if (closedCorrelationKeys != null) {
             closedCorrelationKeys.clear();
         }
+        redeliveryState.clear();
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        // cleanup when shutting down
+        inProgressCompleteExchanges.clear();
     }
 
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java Sun Apr  4 11:10:13 2010
@@ -48,26 +48,26 @@ public interface RecoverableAggregationR
     Exchange recover(CamelContext camelContext, String exchangeId);
 
     /**
-     * Sets the interval between scans
+     * Sets the interval between recovery scans
      *
      * @param interval  the interval
      * @param timeUnit  the time unit
      */
-    void setCheckInterval(long interval, TimeUnit timeUnit);
+    void setRecoveryInterval(long interval, TimeUnit timeUnit);
 
     /**
-     * Sets the interval between scans
+     * Sets the interval between recovery scans
      *
      * @param interval  the interval in millis
      */
-    void setCheckInterval(long interval);
+    void setRecoveryInterval(long interval);
 
     /**
-     * Gets the interval between scans in millis.
+     * Gets the interval between recovery scans in millis.
      *
      * @return the interval in millis
      */
-    long getCheckIntervalInMillis();
+    long getRecoveryIntervalInMillis();
 
     /**
      * Whether or not recovery is enabled or not

Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Sun Apr  4 11:10:13 2010
@@ -50,7 +50,7 @@ public class HawtDBAggregationRepository
     private boolean sync;
     private boolean returnOldExchange;
     private HawtDBCamelMarshaller<K> marshaller = new HawtDBCamelMarshaller<K>();
-    private long interval = 5000;
+    private long recoveryInterval = 5000;
     private boolean useRecovery = true;
 
     /**
@@ -344,16 +344,16 @@ public class HawtDBAggregationRepository
         this.returnOldExchange = returnOldExchange;
     }
 
-    public void setCheckInterval(long interval, TimeUnit timeUnit) {
-        this.interval = timeUnit.toMillis(interval);
+    public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
+        this.recoveryInterval = timeUnit.toMillis(interval);
     }
 
-    public void setCheckInterval(long interval) {
-        this.interval = interval;
+    public void setRecoveryInterval(long interval) {
+        this.recoveryInterval = interval;
     }
 
-    public long getCheckIntervalInMillis() {
-        return interval;
+    public long getRecoveryIntervalInMillis() {
+        return recoveryInterval;
     }
 
     public boolean isUseRecovery() {

Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java Sun Apr  4 11:10:13 2010
@@ -70,7 +70,7 @@ public class HawtDBAggregateConcurrentDi
     private void doSendMessages(int files, int poolSize) throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:aggregated");
         mock.expectedMessageCount(2);
-        mock.setResultWaitTime(20 * 1000L);
+        mock.setResultWaitTime(30 * 1000L);
 
         ExecutorService executor = Executors.newFixedThreadPool(poolSize);
         for (int i = 0; i < files; i++) {

Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java Sun Apr  4 11:10:13 2010
@@ -69,7 +69,7 @@ public class HawtDBAggregateConcurrentSa
 
     private void doSendMessages(int files, int poolSize) throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:aggregated");
-        mock.setResultWaitTime(20 * 1000L);
+        mock.setResultWaitTime(30 * 1000L);
         mock.expectedMessageCount(1);
         // match number of expected numbers
         mock.message(0).body(String.class).regex("[0-9]{" + files + "}");

Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java?rev=930663&r1=930662&r2=930663&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java Sun Apr  4 11:10:13 2010
@@ -38,7 +38,7 @@ public class HawtDBAggregateRecoverTest 
         // enable recovery
         repo.setUseRecovery(true);
         // check faster
-        repo.setCheckInterval(1, TimeUnit.SECONDS);
+        repo.setRecoveryInterval(1, TimeUnit.SECONDS);
         super.setUp();
     }
 
@@ -47,7 +47,10 @@ public class HawtDBAggregateRecoverTest 
         // should fail the first 2 times and then recover
         getMockEndpoint("mock:aggregated").expectedMessageCount(3);
         getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+        // should be marked as redelivered
         getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        // on the 2nd redelivery attempt we success
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);