You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/01/05 10:25:28 UTC
[3/3] camel git commit: CAMEL-9480: IdempotentConsumer - If exception
from repo it should be able to handle by onException
CAMEL-9480: IdempotentConsumer - If exception from repo it should be able to handle by onException
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5ee90c1a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5ee90c1a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5ee90c1a
Branch: refs/heads/camel-2.15.x
Commit: 5ee90c1aa8f61c3765f9c825fbb2385b591ab6ab
Parents: 4ccd7de
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jan 5 09:49:47 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 5 10:25:15 2016 +0100
----------------------------------------------------------------------
.../idempotent/IdempotentConsumer.java | 74 ++++++++++++--------
.../IdempotentConsumerRepoExceptionTest.java | 70 ++++++++++++++++++
2 files changed, 113 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5ee90c1a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
index d3afe7a..f534991 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
@@ -78,48 +78,60 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor
}
public boolean process(Exchange exchange, AsyncCallback callback) {
- final String messageId = messageIdExpression.evaluate(exchange, String.class);
- if (messageId == null) {
- exchange.setException(new NoMessageIdException(exchange, messageIdExpression));
+ final String messageId;
+ try {
+ messageId = messageIdExpression.evaluate(exchange, String.class);
+ if (messageId == null) {
+ exchange.setException(new NoMessageIdException(exchange, messageIdExpression));
+ callback.done(true);
+ return true;
+ }
+ } catch (Exception e) {
+ exchange.setException(e);
callback.done(true);
return true;
}
- boolean newKey;
- if (eager) {
- // add the key to the repository
- if (idempotentRepository instanceof ExchangeIdempotentRepository) {
- newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange, messageId);
+ try {
+ boolean newKey;
+ if (eager) {
+ // add the key to the repository
+ if (idempotentRepository instanceof ExchangeIdempotentRepository) {
+ newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange, messageId);
+ } else {
+ newKey = idempotentRepository.add(messageId);
+ }
} else {
- newKey = idempotentRepository.add(messageId);
+ // check if we already have the key
+ if (idempotentRepository instanceof ExchangeIdempotentRepository) {
+ newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).contains(exchange, messageId);
+ } else {
+ newKey = !idempotentRepository.contains(messageId);
+ }
}
- } else {
- // check if we already have the key
- if (idempotentRepository instanceof ExchangeIdempotentRepository) {
- newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).contains(exchange, messageId);
- } else {
- newKey = !idempotentRepository.contains(messageId);
- }
- }
-
- if (!newKey) {
- // mark the exchange as duplicate
- exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE);
+ if (!newKey) {
+ // mark the exchange as duplicate
+ exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE);
- // we already have this key so its a duplicate message
- onDuplicate(exchange, messageId);
+ // we already have this key so its a duplicate message
+ onDuplicate(exchange, messageId);
- if (skipDuplicate) {
- // if we should skip duplicate then we are done
- LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange);
- callback.done(true);
- return true;
+ if (skipDuplicate) {
+ // if we should skip duplicate then we are done
+ LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange);
+ callback.done(true);
+ return true;
+ }
}
- }
- // register our on completion callback
- exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure));
+ // register our on completion callback
+ exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure));
+
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ }
// process the exchange
return processor.process(exchange, callback);
http://git-wip-us.apache.org/repos/asf/camel/blob/5ee90c1a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java
new file mode 100644
index 0000000..1cdfe26
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
+
+/**
+ * @version
+ */
+public class IdempotentConsumerRepoExceptionTest extends ContextTestSupport {
+
+ private IdempotentRepository myRepo = new MyRepo();
+
+ public void testRepoException() throws Exception {
+ getMockEndpoint("mock:dead").expectedBodiesReceived("nineninenine");
+ getMockEndpoint("mock:result").expectedBodiesReceived("one", "two", "three");
+
+ template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+ template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+ template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+ template.sendBodyAndHeader("direct:start", "nineninenine", "messageId", "999");
+ template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+ template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
+ template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead"));
+
+ from("direct:start")
+ .idempotentConsumer(header("messageId"), myRepo)
+ .to("mock:result");
+
+ }
+ };
+ }
+
+ private class MyRepo extends MemoryIdempotentRepository {
+ @Override
+ public boolean add(String key) {
+ if ("999".equals(key)) {
+ throw new IllegalArgumentException("Forced");
+ }
+ return super.add(key);
+ }
+ }
+}