You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/05/14 13:24:47 UTC
[3/5] camel git commit: CAMEL-8761: Idempotent Consumer EIP - Allow
to commit when EIP scope ends
CAMEL-8761: Idempotent Consumer EIP - Allow to commit when EIP scope ends
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92897a09
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92897a09
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92897a09
Branch: refs/heads/master
Commit: 92897a09f69323038f485ea4f4207b3dee891258
Parents: b51bda4
Author: Claus Ibsen <da...@apache.org>
Authored: Thu May 14 11:36:19 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu May 14 13:28:54 2015 +0200
----------------------------------------------------------------------
.../model/IdempotentConsumerDefinition.java | 66 ++++++++-
.../camel/model/IdempotentConsumerScope.java | 31 ++++
.../idempotent/IdempotentConsumer.java | 54 ++++++-
.../resources/org/apache/camel/model/jaxb.index | 1 +
.../processor/IdempotentConsumerScopeTest.java | 140 +++++++++++++++++++
5 files changed, 286 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/92897a09/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
index 5c64e04..244bc50 100644
--- a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
@@ -38,10 +38,13 @@ import org.apache.camel.util.ObjectHelper;
@XmlRootElement(name = "idempotentConsumer")
@XmlAccessorType(XmlAccessType.FIELD)
public class IdempotentConsumerDefinition extends ExpressionNode {
+
@XmlAttribute(required = true)
private String messageIdRepositoryRef;
@XmlAttribute @Metadata(defaultValue = "true")
private Boolean eager;
+ @XmlAttribute @Metadata(defaultValue = "OnCompletion")
+ private IdempotentConsumerScope scope;
@XmlAttribute @Metadata(defaultValue = "true")
private Boolean skipDuplicate;
@XmlAttribute @Metadata(defaultValue = "true")
@@ -106,6 +109,57 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
}
/**
+ * Sets the scope of this idempotent consumer when its boundaries ends.
+ * <p/>
+ * The default mode is <tt>onCompletion</tt> which means the idempotent consumer will
+ * only trigger its completion at the end of the routing of the exchange, when the exchange completes.
+ * So if the exchange is continued routed after the block ends, then whatever happens there <b>also</b> affect the state.
+ * For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback.
+ * <p/>
+ * The other mode <tt>blockOnly</tt> means that the idempotent consumer will trigger its completion
+ * when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange
+ * is continued routed after the block ends, then whatever happens there does not affect the state.
+ *
+ * @param scope the scope to use
+ * @return builder
+ */
+ public IdempotentConsumerDefinition scope(IdempotentConsumerScope scope) {
+ setScope(scope);
+ return this;
+ }
+
+ /**
+ * Sets the scope of this idempotent consumer where its boundaries ends to <tt>blockOnly</tt>.
+ * <p/>
+ * The <tt>blockOnly</tt> mode means that the idempotent consumer will trigger its completion
+ * when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange
+ * is continued routed after the block ends, then whatever happens there does not affect the state.
+ *
+ * @see #scope(IdempotentConsumerScope)
+ * @return builder
+ */
+ public IdempotentConsumerDefinition scopeBlockOnly() {
+ setScope(IdempotentConsumerScope.BlockOnly);
+ return this;
+ }
+
+ /**
+ * Sets the scope of this idempotent consumer where its boundaries ends to <tt>onCompletion</tt>.
+ * <p/>
+ * The <tt>onCompletion</tt> mode means the idempotent consumer will
+ * only trigger its completion at the end of the routing of the exchange, when the exchange completes.
+ * So if the exchange is continued routed after the block ends, then whatever happens there <b>also</b> affect the state.
+ * For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback.
+ *
+ * @see #scope(IdempotentConsumerScope)
+ * @return builder
+ */
+ public IdempotentConsumerDefinition scopeOnCompletion() {
+ setScope(IdempotentConsumerScope.OnCompletion);
+ return this;
+ }
+
+ /**
* Sets whether to remove or keep the key on failure.
* <p/>
* The default behavior is to remove the key on failure.
@@ -185,6 +239,14 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
this.removeOnFailure = removeOnFailure;
}
+ public IdempotentConsumerScope getScope() {
+ return scope;
+ }
+
+ public void setScope(IdempotentConsumerScope scope) {
+ this.scope = scope;
+ }
+
@Override
@SuppressWarnings("unchecked")
public Processor createProcessor(RouteContext routeContext) throws Exception {
@@ -203,8 +265,10 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
boolean eager = getEager() == null || getEager();
boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
+ // and this is not true by default
+ boolean scopeBlockOnly = getScope() != null && IdempotentConsumerScope.BlockOnly == getScope();
- return new IdempotentConsumer(expression, idempotentRepository, eager, duplicate, remove, childProcessor);
+ return new IdempotentConsumer(expression, idempotentRepository, eager, duplicate, remove, scopeBlockOnly, childProcessor);
}
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/92897a09/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerScope.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerScope.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerScope.java
new file mode 100644
index 0000000..b42f28b
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerScope.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlEnum;
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * Represents the scopes supported by the idempotent consumer EIP
+ */
+@XmlType
+@XmlEnum
+public enum IdempotentConsumerScope {
+
+ BlockOnly, OnCompletion
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/92897a09/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
index 67d20d5..6b73e63 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
@@ -29,6 +29,7 @@ import org.apache.camel.Processor;
import org.apache.camel.spi.ExchangeIdempotentRepository;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper;
@@ -58,15 +59,17 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor
private final boolean eager;
private final boolean skipDuplicate;
private final boolean removeOnFailure;
+ private final boolean scopeBlockOnly;
private final AtomicLong duplicateMessageCount = new AtomicLong();
public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository,
- boolean eager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) {
+ boolean eager, boolean skipDuplicate, boolean removeOnFailure, boolean scopeBlockOnly, Processor processor) {
this.messageIdExpression = messageIdExpression;
this.idempotentRepository = idempotentRepository;
this.eager = eager;
this.skipDuplicate = skipDuplicate;
this.removeOnFailure = removeOnFailure;
+ this.scopeBlockOnly = scopeBlockOnly;
this.processor = AsyncProcessorConverterHelper.convert(processor);
}
@@ -87,7 +90,7 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor
AsyncProcessorHelper.process(this, exchange);
}
- public boolean process(Exchange exchange, AsyncCallback callback) {
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
final String messageId = messageIdExpression.evaluate(exchange, String.class);
if (messageId == null) {
exchange.setException(new NoMessageIdException(exchange, messageIdExpression));
@@ -128,11 +131,15 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor
}
}
- // register our on completion callback
- exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure));
+ final Synchronization onCompletion = new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure);
+ final AsyncCallback target = new IdempotentConsumerCallback(exchange, onCompletion, callback, scopeBlockOnly);
+ if (!scopeBlockOnly) {
+ // the scope is to do the idempotent completion work as an unit of work on the exchange when its done being routed
+ exchange.addOnCompletion(onCompletion);
+ }
// process the exchange
- return processor.process(exchange, callback);
+ return processor.process(exchange, target);
}
public List<Processor> next() {
@@ -201,4 +208,41 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor
// noop
}
+ /**
+ * {@link org.apache.camel.AsyncCallback} that is invoked when the idempotent consumer block ends
+ */
+ private static class IdempotentConsumerCallback implements AsyncCallback {
+ private final Exchange exchange;
+ private final Synchronization onCompletion;
+ private final AsyncCallback callback;
+ private final boolean scopeBlockOnly;
+
+ public IdempotentConsumerCallback(Exchange exchange, Synchronization onCompletion, AsyncCallback callback, boolean scopeBlockOnly) {
+ this.exchange = exchange;
+ this.onCompletion = onCompletion;
+ this.callback = callback;
+ this.scopeBlockOnly = scopeBlockOnly;
+ }
+
+ @Override
+ public void done(boolean doneSync) {
+ try {
+ if (scopeBlockOnly) {
+ if (exchange.isFailed()) {
+ onCompletion.onFailure(exchange);
+ } else {
+ onCompletion.onComplete(exchange);
+ }
+ }
+ // if scope is not block only then the onCompletion is invoked as part of the UoW of the Exchange
+ } finally {
+ callback.done(doneSync);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "IdempotentConsumerCallback";
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/92897a09/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
----------------------------------------------------------------------
diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
index fc79da2..5e6a3c0 100644
--- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
+++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
@@ -31,6 +31,7 @@ FilterDefinition
FinallyDefinition
FromDefinition
IdempotentConsumerDefinition
+IdempotentConsumerScope
InOnlyDefinition
InOutDefinition
InterceptDefinition
http://git-wip-us.apache.org/repos/asf/camel/blob/92897a09/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerScopeTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerScopeTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerScopeTest.java
new file mode 100644
index 0000000..21b727a
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerScopeTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
+
+/**
+ * @version
+ */
+public class IdempotentConsumerScopeTest extends ContextTestSupport {
+ protected Endpoint startEndpoint;
+ protected MockEndpoint resultEndpoint;
+ protected MockEndpoint a;
+ protected MockEndpoint b;
+ protected MockEndpoint dead;
+ protected IdempotentRepository<String> repo;
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testScopeBlockOnly() throws Exception {
+ repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead"));
+
+ from("direct:start")
+ .idempotentConsumer(header("messageId"), repo).scopeBlockOnly()
+ .to("log:a", "mock:a")
+ .to("log:b", "mock:b")
+ .end()
+ .filter(simple("${header.messageId} == '2'"))
+ .throwException(new IllegalArgumentException("Forced"))
+ .end()
+ .to("log:result", "mock:result");
+ }
+ });
+ context.start();
+
+ // we are on block only scope as "two" was success in the block, and then "two" failed afterwards does not matter
+ // the idempotent consumer will not receive "two" again
+ a.expectedBodiesReceived("one", "two", "three");
+ b.expectedBodiesReceived("one", "two", "three");
+ dead.expectedBodiesReceived("two", "two");
+ resultEndpoint.expectedBodiesReceived("one", "one", "one", "three");
+
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("1", "one");
+ sendMessage("3", "three");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testScopeOnCompletion() throws Exception {
+ repo = MemoryIdempotentRepository.memoryIdempotentRepository(200);
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead"));
+
+ from("direct:start")
+ .idempotentConsumer(header("messageId"), repo).scopeOnCompletion()
+ .to("log:a", "mock:a")
+ .to("log:b", "mock:b")
+ .end()
+ .filter(simple("${header.messageId} == '2'"))
+ .throwException(new IllegalArgumentException("Forced"))
+ .end()
+ .to("log:result", "mock:result");
+ }
+ });
+ context.start();
+
+ // we are on completion scope so the "two" will rollback and therefore the idempotent consumer receives those again
+ a.expectedBodiesReceived("one", "two", "two", "three");
+ b.expectedBodiesReceived("one", "two", "two", "three");
+ dead.expectedBodiesReceived("two", "two");
+ resultEndpoint.expectedBodiesReceived("one", "one", "one", "three");
+
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("1", "one");
+ sendMessage("3", "three");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected void sendMessage(final Object messageId, final Object body) {
+ template.send(startEndpoint, new Processor() {
+ public void process(Exchange exchange) {
+ // now lets fire in a message
+ Message in = exchange.getIn();
+ in.setBody(body);
+ in.setHeader("messageId", messageId);
+ }
+ });
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ startEndpoint = resolveMandatoryEndpoint("direct:start");
+ resultEndpoint = getMockEndpoint("mock:result");
+ a = getMockEndpoint("mock:a");
+ b = getMockEndpoint("mock:b");
+ dead = getMockEndpoint("mock:dead");
+ }
+
+}
\ No newline at end of file