You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/08 07:48:42 UTC

svn commit: r782534 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/idempotent/ camel-core/src/main/java/org/apache/camel/...

Author: davsclaus
Date: Mon Jun  8 05:48:40 2009
New Revision: 782534

URL: http://svn.apache.org/viewvc?rev=782534&view=rev
Log:
CAMEL-1650: Added confirm to idempotent repository. Also add eager option to be able to disable eager and add to the repo only when the exchange is done.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java   (contents, props changed)
      - copied, changed from r781916, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
    camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Mon Jun  8 05:48:40 2009
@@ -206,13 +206,13 @@
     protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) {
         if (!isMatched(file, isDirectory)) {
             if (log.isTraceEnabled()) {
-                log.trace("Remote file did not match. Will skip this remote file: " + file);
+                log.trace("File did not match. Will skip this file: " + file);
             }
             return false;
         } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) {
             // only use the filename as the key as the file could be moved into a done folder
             if (log.isTraceEnabled()) {
-                log.trace("RemoteFileConsumer is idempotent and the file has been consumed before. Will skip this remote file: " + file);
+                log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: " + file);
             }
             return false;
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java Mon Jun  8 05:48:40 2009
@@ -39,6 +39,8 @@
 public class IdempotentConsumerDefinition extends ExpressionNode {
     @XmlAttribute
     private String messageIdRepositoryRef;
+    @XmlAttribute
+    private Boolean eager = Boolean.TRUE;
     @XmlTransient
     private IdempotentRepository idempotentRepository;
 
@@ -92,6 +94,19 @@
         return this;
     }
 
+    /**
+     * Sets whether to eagerly add the key to the idempotent repository or wait until the exchange
+     * is complete. Eager is default enabled.
+     *
+     * @param eager  <tt>true</tt> to add the key before processing, <tt>false</tt> to wait until
+     * the exchange is complete.
+     * @return builder
+     */
+    public IdempotentConsumerDefinition eager(boolean eager) {
+        setEager(eager);
+        return this;
+    }
+
     public String getMessageIdRepositoryRef() {
         return messageIdRepositoryRef;
     }
@@ -108,12 +123,20 @@
         this.idempotentRepository = idempotentRepository;
     }
 
+    public Boolean isEager() {
+        return eager;
+    }
+
+    public void setEager(Boolean eager) {
+        this.eager = eager;
+    }
+
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = routeContext.createProcessor(this);
         IdempotentRepository idempotentRepository = resolveMessageIdRepository(routeContext);
-        return new IdempotentConsumer(getExpression().createExpression(routeContext), idempotentRepository,
-                                      childProcessor);
+        Expression expression = getExpression().createExpression(routeContext);
+        return new IdempotentConsumer(expression, idempotentRepository, eager, childProcessor);
     }
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java Mon Jun  8 05:48:40 2009
@@ -148,6 +148,11 @@
         }
     }
 
+    public boolean confirm(String key) {
+        // noop
+        return true;
+    }
+
     public File getFileStore() {
         return fileStore;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Mon Jun  8 05:48:40 2009
@@ -31,8 +31,7 @@
 
 /**
  * An implementation of the <a
- * href="http://camel.apache.org/idempotent-consumer.html">Idempotent
- * Consumer</a> pattern.
+ * href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a> pattern.
  * 
  * @version $Revision$
  */
@@ -41,17 +40,19 @@
     private final Expression messageIdExpression;
     private final Processor processor;
     private final IdempotentRepository idempotentRepository;
+    private final boolean eager;
 
-    public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository idempotentRepository, Processor processor) {
+    public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository idempotentRepository,
+                              boolean eager, Processor processor) {
         this.messageIdExpression = messageIdExpression;
         this.idempotentRepository = idempotentRepository;
+        this.eager = eager;
         this.processor = processor;
     }
 
     @Override
     public String toString() {
-        return "IdempotentConsumer[expression=" + messageIdExpression + ", repository=" + idempotentRepository
-               + ", processor=" + processor + "]";
+        return "IdempotentConsumer[" + messageIdExpression + " -> " + processor + "]";
     }
 
     @SuppressWarnings("unchecked")
@@ -61,18 +62,26 @@
             throw new NoMessageIdException(exchange, messageIdExpression);
         }
 
-        // add the key to the repository
-        boolean newKey = idempotentRepository.add(messageId);
+        boolean newKey;
+        if (eager) {
+            // add the key to the repository
+            newKey = idempotentRepository.add(messageId);
+        } else {
+            // check if we alrady have the key
+            newKey = !idempotentRepository.contains(messageId);
+        }
+
         if (!newKey) {
             // we already have this key so its a duplicate message
             onDuplicateMessage(exchange, messageId);
-        } else {
-            // register our on completion callback
-            exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId));
-
-            // process the exchange
-            processor.process(exchange);
+            return;
         }
+
+        // register our on completion callback
+        exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager));
+
+        // process the exchange
+        processor.process(exchange);
     }
 
     public List<Processor> next() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java Mon Jun  8 05:48:40 2009
@@ -31,17 +31,17 @@
  * @version $Revision$
  */
 public class IdempotentOnCompletion implements Synchronization {
-
     private static final transient Log LOG = LogFactory.getLog(IdempotentOnCompletion.class);
-    private final IdempotentRepository<String> idempotentRepository;
+    private final IdempotentRepository idempotentRepository;
     private final String messageId;
+    private final boolean eager;
 
-    public IdempotentOnCompletion(IdempotentRepository<String> idempotentRepository, String messageId) {
+    public IdempotentOnCompletion(IdempotentRepository idempotentRepository, String messageId, boolean eager) {
         this.idempotentRepository = idempotentRepository;
         this.messageId = messageId;
+        this.eager = eager;
     }
 
-    @SuppressWarnings("unchecked")
     public void onComplete(Exchange exchange) {
         onCompletedMessage(exchange, messageId);
     }
@@ -57,8 +57,13 @@
      * @param exchange the exchange
      * @param messageId the message ID of this exchange
      */
+    @SuppressWarnings("unchecked")
     protected void onCompletedMessage(Exchange exchange, String messageId) {
-        // noop
+        if (!eager) {
+            // if not eager we should add the key when its complete
+            idempotentRepository.add(messageId);
+        }
+        idempotentRepository.confirm(messageId);
     }
 
     /**
@@ -68,6 +73,7 @@
      * @param exchange the exchange
      * @param messageId the message ID of this exchange
      */
+    @SuppressWarnings("unchecked")
     protected void onFailedMessage(Exchange exchange, String messageId) {
         idempotentRepository.remove(messageId);
         if (LOG.isDebugEnabled()) {
@@ -79,4 +85,5 @@
     public String toString() {
         return "IdempotentOnCompletion[" + messageId + ']';
     }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java Mon Jun  8 05:48:40 2009
@@ -94,6 +94,11 @@
         }
     }
 
+    public boolean confirm(String key) {
+        // noop
+        return true;
+    }
+
     public Map<String, Object> getCache() {
         return cache;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java Mon Jun  8 05:48:40 2009
@@ -44,9 +44,20 @@
 
     /**
      * Removes the key from the repository.
+     * <p/>
+     * Is usually invoked if the exchange failed.
      *
      * @param key the key of the message for duplicate test
      * @return <tt>true</tt> if the key was removed
      */
     boolean remove(E key);
+
+    /**
+     * Confirms the key, after the exchange has been processed sucesfully.
+     *
+     * @param key the key of the message for duplicate test
+     * @return <tt>true</tt> if the key was confirmed
+     */
+    boolean confirm(E key);
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java Mon Jun  8 05:48:40 2009
@@ -99,6 +99,10 @@
         public boolean remove(String key) {
             return true;
         }
+
+        public boolean confirm(String key) {
+            return true;
+        }
     }
     
 }
\ No newline at end of file

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java (from r781916, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java&r1=781916&r2=782534&rev=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java Mon Jun  8 05:48:40 2009
@@ -24,11 +24,12 @@
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
 
 /**
  * @version $Revision$
  */
-public class IdempotentConsumerTest extends ContextTestSupport {
+public class IdempotentConsumerEagerTest extends ContextTestSupport {
     protected Endpoint startEndpoint;
     protected MockEndpoint resultEndpoint;
 
@@ -43,7 +44,7 @@
             public void configure() throws Exception {
                 from("direct:start").idempotentConsumer(
                         header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
-                ).to("mock:result");
+                ).eager(false).to("mock:result");
             }
         });
         context.start();
@@ -68,7 +69,7 @@
 
                 from("direct:start").idempotentConsumer(
                         header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
-                ).process(new Processor() {
+                ).eager(false).process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         String id = exchange.getIn().getHeader("messageId", String.class);
                         if (id.equals("2")) {
@@ -94,6 +95,60 @@
         assertMockEndpointsSatisfied();
     }
 
+    public void testNotEager() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                final IdempotentRepository repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);
+
+                from("direct:start").idempotentConsumer(header("messageId"), repo).eager(false).
+                        process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                String id = exchange.getIn().getHeader("messageId", String.class);
+                                // should not contain
+                                assertFalse("Should not eager add to repo", repo.contains(id));
+                            }
+                        }).to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testEager() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                final IdempotentRepository repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);
+
+                from("direct:start").idempotentConsumer(header("messageId"), repo).eager(true).
+                        process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                String id = exchange.getIn().getHeader("messageId", String.class);
+                                // should contain
+                                assertTrue("Should eager add to repo", repo.contains(id));
+                            }
+                        }).to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+    }
+
     protected void sendMessage(final Object messageId, final Object body) {
         template.send(startEndpoint, new Processor() {
             public void process(Exchange exchange) {
@@ -113,4 +168,4 @@
         resultEndpoint = getMockEndpoint("mock:result");
     }
 
-}
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java (original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java Mon Jun  8 05:48:40 2009
@@ -115,4 +115,9 @@
         return rc.booleanValue();
     }
 
+    public boolean confirm(String s) {
+        // noop
+        return true;
+    }
+
 }