You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/08 07:48:42 UTC
svn commit: r782534 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/processor/idempotent/
camel-core/src/main/java/org/apache/camel/...
Author: davsclaus
Date: Mon Jun 8 05:48:40 2009
New Revision: 782534
URL: http://svn.apache.org/viewvc?rev=782534&view=rev
Log:
CAMEL-1650: Added confirm to idempotent repository. Also add eager option to be able to disable eager and add to the repo only when the exchange is done.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java (contents, props changed)
- copied, changed from r781916, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Mon Jun 8 05:48:40 2009
@@ -206,13 +206,13 @@
protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) {
if (!isMatched(file, isDirectory)) {
if (log.isTraceEnabled()) {
- log.trace("Remote file did not match. Will skip this remote file: " + file);
+ log.trace("File did not match. Will skip this file: " + file);
}
return false;
} else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) {
// only use the filename as the key as the file could be moved into a done folder
if (log.isTraceEnabled()) {
- log.trace("RemoteFileConsumer is idempotent and the file has been consumed before. Will skip this remote file: " + file);
+ log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: " + file);
}
return false;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java Mon Jun 8 05:48:40 2009
@@ -39,6 +39,8 @@
public class IdempotentConsumerDefinition extends ExpressionNode {
@XmlAttribute
private String messageIdRepositoryRef;
+ @XmlAttribute
+ private Boolean eager = Boolean.TRUE;
@XmlTransient
private IdempotentRepository idempotentRepository;
@@ -92,6 +94,19 @@
return this;
}
+ /**
+ * Sets whether to eagerly add the key to the idempotent repository or wait until the exchange
+ * is complete. Eager is default enabled.
+ *
+ * @param eager <tt>true</tt> to add the key before processing, <tt>false</tt> to wait until
+ * the exchange is complete.
+ * @return builder
+ */
+ public IdempotentConsumerDefinition eager(boolean eager) {
+ setEager(eager);
+ return this;
+ }
+
public String getMessageIdRepositoryRef() {
return messageIdRepositoryRef;
}
@@ -108,12 +123,20 @@
this.idempotentRepository = idempotentRepository;
}
+ public Boolean isEager() {
+ return eager;
+ }
+
+ public void setEager(Boolean eager) {
+ this.eager = eager;
+ }
+
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
Processor childProcessor = routeContext.createProcessor(this);
IdempotentRepository idempotentRepository = resolveMessageIdRepository(routeContext);
- return new IdempotentConsumer(getExpression().createExpression(routeContext), idempotentRepository,
- childProcessor);
+ Expression expression = getExpression().createExpression(routeContext);
+ return new IdempotentConsumer(expression, idempotentRepository, eager, childProcessor);
}
/**
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java Mon Jun 8 05:48:40 2009
@@ -148,6 +148,11 @@
}
}
+ public boolean confirm(String key) {
+ // noop
+ return true;
+ }
+
public File getFileStore() {
return fileStore;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Mon Jun 8 05:48:40 2009
@@ -31,8 +31,7 @@
/**
* An implementation of the <a
- * href="http://camel.apache.org/idempotent-consumer.html">Idempotent
- * Consumer</a> pattern.
+ * href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a> pattern.
*
* @version $Revision$
*/
@@ -41,17 +40,19 @@
private final Expression messageIdExpression;
private final Processor processor;
private final IdempotentRepository idempotentRepository;
+ private final boolean eager;
- public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository idempotentRepository, Processor processor) {
+ public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository idempotentRepository,
+ boolean eager, Processor processor) {
this.messageIdExpression = messageIdExpression;
this.idempotentRepository = idempotentRepository;
+ this.eager = eager;
this.processor = processor;
}
@Override
public String toString() {
- return "IdempotentConsumer[expression=" + messageIdExpression + ", repository=" + idempotentRepository
- + ", processor=" + processor + "]";
+ return "IdempotentConsumer[" + messageIdExpression + " -> " + processor + "]";
}
@SuppressWarnings("unchecked")
@@ -61,18 +62,26 @@
throw new NoMessageIdException(exchange, messageIdExpression);
}
- // add the key to the repository
- boolean newKey = idempotentRepository.add(messageId);
+ boolean newKey;
+ if (eager) {
+ // add the key to the repository
+ newKey = idempotentRepository.add(messageId);
+ } else {
+ // check if we alrady have the key
+ newKey = !idempotentRepository.contains(messageId);
+ }
+
if (!newKey) {
// we already have this key so its a duplicate message
onDuplicateMessage(exchange, messageId);
- } else {
- // register our on completion callback
- exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId));
-
- // process the exchange
- processor.process(exchange);
+ return;
}
+
+ // register our on completion callback
+ exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager));
+
+ // process the exchange
+ processor.process(exchange);
}
public List<Processor> next() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java Mon Jun 8 05:48:40 2009
@@ -31,17 +31,17 @@
* @version $Revision$
*/
public class IdempotentOnCompletion implements Synchronization {
-
private static final transient Log LOG = LogFactory.getLog(IdempotentOnCompletion.class);
- private final IdempotentRepository<String> idempotentRepository;
+ private final IdempotentRepository idempotentRepository;
private final String messageId;
+ private final boolean eager;
- public IdempotentOnCompletion(IdempotentRepository<String> idempotentRepository, String messageId) {
+ public IdempotentOnCompletion(IdempotentRepository idempotentRepository, String messageId, boolean eager) {
this.idempotentRepository = idempotentRepository;
this.messageId = messageId;
+ this.eager = eager;
}
- @SuppressWarnings("unchecked")
public void onComplete(Exchange exchange) {
onCompletedMessage(exchange, messageId);
}
@@ -57,8 +57,13 @@
* @param exchange the exchange
* @param messageId the message ID of this exchange
*/
+ @SuppressWarnings("unchecked")
protected void onCompletedMessage(Exchange exchange, String messageId) {
- // noop
+ if (!eager) {
+ // if not eager we should add the key when its complete
+ idempotentRepository.add(messageId);
+ }
+ idempotentRepository.confirm(messageId);
}
/**
@@ -68,6 +73,7 @@
* @param exchange the exchange
* @param messageId the message ID of this exchange
*/
+ @SuppressWarnings("unchecked")
protected void onFailedMessage(Exchange exchange, String messageId) {
idempotentRepository.remove(messageId);
if (LOG.isDebugEnabled()) {
@@ -79,4 +85,5 @@
public String toString() {
return "IdempotentOnCompletion[" + messageId + ']';
}
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java Mon Jun 8 05:48:40 2009
@@ -94,6 +94,11 @@
}
}
+ public boolean confirm(String key) {
+ // noop
+ return true;
+ }
+
public Map<String, Object> getCache() {
return cache;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java Mon Jun 8 05:48:40 2009
@@ -44,9 +44,20 @@
/**
* Removes the key from the repository.
+ * <p/>
+ * Is usually invoked if the exchange failed.
*
* @param key the key of the message for duplicate test
* @return <tt>true</tt> if the key was removed
*/
boolean remove(E key);
+
+ /**
+ * Confirms the key, after the exchange has been processed sucesfully.
+ *
+ * @param key the key of the message for duplicate test
+ * @return <tt>true</tt> if the key was confirmed
+ */
+ boolean confirm(E key);
+
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java Mon Jun 8 05:48:40 2009
@@ -99,6 +99,10 @@
public boolean remove(String key) {
return true;
}
+
+ public boolean confirm(String key) {
+ return true;
+ }
}
}
\ No newline at end of file
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java (from r781916, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java&r1=781916&r2=782534&rev=782534&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java Mon Jun 8 05:48:40 2009
@@ -24,11 +24,12 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
/**
* @version $Revision$
*/
-public class IdempotentConsumerTest extends ContextTestSupport {
+public class IdempotentConsumerEagerTest extends ContextTestSupport {
protected Endpoint startEndpoint;
protected MockEndpoint resultEndpoint;
@@ -43,7 +44,7 @@
public void configure() throws Exception {
from("direct:start").idempotentConsumer(
header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
- ).to("mock:result");
+ ).eager(false).to("mock:result");
}
});
context.start();
@@ -68,7 +69,7 @@
from("direct:start").idempotentConsumer(
header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
- ).process(new Processor() {
+ ).eager(false).process(new Processor() {
public void process(Exchange exchange) throws Exception {
String id = exchange.getIn().getHeader("messageId", String.class);
if (id.equals("2")) {
@@ -94,6 +95,60 @@
assertMockEndpointsSatisfied();
}
+ public void testNotEager() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ final IdempotentRepository repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);
+
+ from("direct:start").idempotentConsumer(header("messageId"), repo).eager(false).
+ process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String id = exchange.getIn().getHeader("messageId", String.class);
+ // should not contain
+ assertFalse("Should not eager add to repo", repo.contains(id));
+ }
+ }).to("mock:result");
+ }
+ });
+ context.start();
+
+ resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("3", "three");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testEager() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ final IdempotentRepository repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);
+
+ from("direct:start").idempotentConsumer(header("messageId"), repo).eager(true).
+ process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String id = exchange.getIn().getHeader("messageId", String.class);
+ // should contain
+ assertTrue("Should eager add to repo", repo.contains(id));
+ }
+ }).to("mock:result");
+ }
+ });
+ context.start();
+
+ resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("3", "three");
+
+ assertMockEndpointsSatisfied();
+ }
+
protected void sendMessage(final Object messageId, final Object body) {
template.send(startEndpoint, new Processor() {
public void process(Exchange exchange) {
@@ -113,4 +168,4 @@
resultEndpoint = getMockEndpoint("mock:result");
}
-}
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerEagerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java?rev=782534&r1=782533&r2=782534&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java (original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java Mon Jun 8 05:48:40 2009
@@ -115,4 +115,9 @@
return rc.booleanValue();
}
+ public boolean confirm(String s) {
+ // noop
+ return true;
+ }
+
}