You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/02/05 15:28:22 UTC

[camel] 01/07: CAMEL-8958: Claim Check EIP with push/pop. Work in progress.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch 8958
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8ab6a6f46e2687a5528f3843e0138a4c9f4abf9b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 4 14:47:25 2018 +0100

    CAMEL-8958: Claim Check EIP with push/pop. Work in progress.
---
 .../src/main/java/org/apache/camel/Exchange.java   |  17 +-
 .../camel/impl/DefaultClaimCheckRepository.java    |  81 +++++++++
 .../apache/camel/model/ClaimCheckDefinition.java   | 188 ++++++++++++++++++++
 .../apache/camel/model/ClaimCheckOperation.java    |  28 +++
 .../apache/camel/model/ProcessorDefinition.java    |  31 ++++
 .../processor/ClaimCheckAggregationStrategy.java   |  55 ++++++
 .../camel/processor/ClaimCheckProcessor.java       | 193 +++++++++++++++++++++
 .../org/apache/camel/spi/ClaimCheckRepository.java |  76 ++++++++
 .../java/org/apache/camel/util/ExchangeHelper.java |   2 +-
 .../resources/org/apache/camel/model/jaxb.index    |   2 +
 .../ClaimCheckEipGetAndRemoveSetTest.java          |  57 ++++++
 .../camel/processor/ClaimCheckEipGetSetTest.java   |  56 ++++++
 .../processor/ClaimCheckEipPushPopBodyTest.java    |  55 ++++++
 .../processor/ClaimCheckEipPushPopHeadersTest.java |  55 ++++++
 .../camel/processor/ClaimCheckEipPushPopTest.java  |  50 ++++++
 15 files changed, 937 insertions(+), 9 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java
index 541ea16..6ca2ac6 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -97,14 +97,15 @@ public interface Exchange {
     // used across other Apache products such as AMQ, SMX etc.
     String BREADCRUMB_ID              = "breadcrumbId";
 
-    String CHARSET_NAME          = "CamelCharsetName";
-    String CIRCUIT_BREAKER_STATE = "CamelCircuitBreakerState";
-    String CREATED_TIMESTAMP     = "CamelCreatedTimestamp";
-    String CONTENT_ENCODING      = "Content-Encoding";
-    String CONTENT_LENGTH        = "Content-Length";
-    String CONTENT_TYPE          = "Content-Type";
-    String COOKIE_HANDLER        = "CamelCookieHandler";
-    String CORRELATION_ID        = "CamelCorrelationId";
+    String CHARSET_NAME           = "CamelCharsetName";
+    String CIRCUIT_BREAKER_STATE  = "CamelCircuitBreakerState";
+    String CREATED_TIMESTAMP      = "CamelCreatedTimestamp";
+    String CLAIM_CHECK_REPOSITORY = "CamelClaimCheckRepository";
+    String CONTENT_ENCODING       = "Content-Encoding";
+    String CONTENT_LENGTH         = "Content-Length";
+    String CONTENT_TYPE           = "Content-Type";
+    String COOKIE_HANDLER         = "CamelCookieHandler";
+    String CORRELATION_ID         = "CamelCorrelationId";
 
     String DATASET_INDEX             = "CamelDataSetIndex";
     String DEFAULT_CHARSET_PROPERTY  = "org.apache.camel.default.charset";
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultClaimCheckRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultClaimCheckRepository.java
new file mode 100644
index 0000000..adac797
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultClaimCheckRepository.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.ClaimCheckRepository;
+
+public class DefaultClaimCheckRepository implements ClaimCheckRepository {
+
+    private final Map<String, Exchange> map = new HashMap();
+    private final Deque<Exchange> stack = new ArrayDeque<>();
+
+    @Override
+    public boolean add(String key, Exchange exchange) {
+        return map.put(key, exchange) == null;
+    }
+
+    @Override
+    public boolean contains(String key) {
+        return map.containsKey(key);
+    }
+
+    @Override
+    public Exchange get(String key) {
+        return map.get(key);
+    }
+
+    @Override
+    public Exchange getAndRemove(String key) {
+        return map.remove(key);
+    }
+
+    @Override
+    public void push(Exchange exchange) {
+        stack.push(exchange);
+    }
+
+    @Override
+    public Exchange pop() {
+        if (!stack.isEmpty()) {
+            return stack.pop();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void clear() {
+        map.clear();
+        stack.clear();
+    }
+
+    @Override
+    public void start() throws Exception {
+        // noop
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // noop
+    }
+}
diff --git a/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
new file mode 100644
index 0000000..8db7fab
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.ClaimCheckProcessor;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
+
+public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinition> {
+
+    @XmlAttribute(required = true)
+    private ClaimCheckOperation operation;
+    @XmlAttribute
+    private String key;
+    @XmlAttribute
+    private String data;
+    @XmlAttribute(name = "strategyRef")
+    private String aggregationStrategyRef;
+    @XmlAttribute(name = "strategyMethodName")
+    private String aggregationStrategyMethodName;
+    @XmlTransient
+    private AggregationStrategy aggregationStrategy;
+
+    public ClaimCheckDefinition() {
+    }
+
+    @Override
+    public String toString() {
+        return "ClaimCheck";
+    }
+
+    @Override
+    public String getLabel() {
+        return "claimCheck";
+    }
+
+    @Override
+    public Processor createProcessor(RouteContext routeContext) throws Exception {
+        ObjectHelper.notNull(operation, "operation", this);
+
+        ClaimCheckProcessor claim = new ClaimCheckProcessor();
+        claim.setOperation(operation.name());
+        claim.setKey(getKey());
+        claim.setData(getData());
+
+        AggregationStrategy strategy = createAggregationStrategy(routeContext);
+        if (strategy != null) {
+            claim.setAggregationStrategy(strategy);
+        }
+
+        // only data or aggregation strategy can be configured not both
+        if (getData() != null && strategy != null) {
+            throw new IllegalArgumentException("Cannot use both data and custom aggregation strategy on ClaimCheck EIP");
+        }
+
+        return claim;
+    }
+
+    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
+        AggregationStrategy strategy = getAggregationStrategy();
+        if (strategy == null && aggregationStrategyRef != null) {
+            Object aggStrategy = routeContext.lookup(aggregationStrategyRef, Object.class);
+            if (aggStrategy instanceof AggregationStrategy) {
+                strategy = (AggregationStrategy) aggStrategy;
+            } else if (aggStrategy != null) {
+                strategy = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName());
+            } else {
+                throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + aggregationStrategyRef);
+            }
+        }
+
+        if (strategy instanceof CamelContextAware) {
+            ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
+        }
+
+        return strategy;
+    }
+
+    // Fluent API
+    //-------------------------------------------------------------------------
+
+    /**
+     * The claim check operation.
+     */
+    public ClaimCheckDefinition operation(ClaimCheckOperation operation) {
+        setOperation(operation);
+        return this;
+    }
+
+    /**
+     * To use a specific key for claim check id.
+     */
+    public ClaimCheckDefinition key(String key) {
+        setKey(key);
+        return this;
+    }
+
+    /**
+     * What data to merge when claiming from the repository.
+     * // TODO: add more description here about the syntax
+     */
+    public ClaimCheckDefinition data(String data) {
+        setData(data);
+        return this;
+    }
+
+    public ClaimCheckDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy);
+        return this;
+    }
+
+    public ClaimCheckDefinition aggregationStrategyRef(String aggregationStrategyRef) {
+        setAggregationStrategyRef(aggregationStrategyRef);
+        return this;
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public ClaimCheckOperation getOperation() {
+        return operation;
+    }
+
+    public void setOperation(ClaimCheckOperation operation) {
+        this.operation = operation;
+    }
+
+    public String getData() {
+        return data;
+    }
+
+    public void setData(String data) {
+        this.data = data;
+    }
+
+    public String getAggregationStrategyRef() {
+        return aggregationStrategyRef;
+    }
+
+    public void setAggregationStrategyRef(String aggregationStrategyRef) {
+        this.aggregationStrategyRef = aggregationStrategyRef;
+    }
+
+    public String getAggregationStrategyMethodName() {
+        return aggregationStrategyMethodName;
+    }
+
+    public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
+        this.aggregationStrategyMethodName = aggregationStrategyMethodName;
+    }
+
+    public AggregationStrategy getAggregationStrategy() {
+        return aggregationStrategy;
+    }
+
+    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
+        this.aggregationStrategy = aggregationStrategy;
+    }
+}
diff --git a/camel-core/src/main/java/org/apache/camel/model/ClaimCheckOperation.java b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckOperation.java
new file mode 100644
index 0000000..5339351
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckOperation.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlEnum;
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType
+@XmlEnum
+public enum ClaimCheckOperation {
+
+    get, getAndRemove, set, push, pop
+
+}
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index bb74e2a..9d168b1 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -69,6 +69,7 @@ import org.apache.camel.processor.interceptor.StreamCaching;
 import org.apache.camel.processor.loadbalancer.LoadBalancer;
 import org.apache.camel.spi.AsEndpointUri;
 import org.apache.camel.spi.AsPredicate;
+import org.apache.camel.spi.ClaimCheckRepository;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.IdempotentRepository;
@@ -3444,6 +3445,36 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
         return ExpressionClause.createAndSetExpression(answer);
     }
 
+    public ClaimCheckDefinition claimCheck() {
+        ClaimCheckDefinition answer = new ClaimCheckDefinition();
+        addOutput(answer);
+        return answer;
+    }
+
+    public Type claimCheck(ClaimCheckOperation operation) {
+        ClaimCheckDefinition answer = new ClaimCheckDefinition();
+        answer.setOperation(operation);
+        addOutput(answer);
+        return (Type) this;
+    }
+
+    public Type claimCheck(ClaimCheckOperation operation, String key) {
+        ClaimCheckDefinition answer = new ClaimCheckDefinition();
+        answer.setOperation(operation);
+        answer.setKey(key);
+        addOutput(answer);
+        return (Type) this;
+    }
+
+    public Type claimCheck(ClaimCheckOperation operation, String key, String data) {
+        ClaimCheckDefinition answer = new ClaimCheckDefinition();
+        answer.setOperation(operation);
+        answer.setKey(key);
+        answer.setData(data);
+        addOutput(answer);
+        return (Type) this;
+    }
+
     /**
      * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
      * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java
new file mode 100644
index 0000000..c22cde9
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.util.ObjectHelper;
+
+public class ClaimCheckAggregationStrategy implements AggregationStrategy {
+
+    private final String data;
+    // TODO: pattern matching for headers, eg headers:foo*, headers, headers:*, header:foo,header:bar
+
+    public ClaimCheckAggregationStrategy(String data) {
+        this.data = data;
+    }
+
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        if (newExchange == null) {
+            return oldExchange;
+        }
+
+        if (ObjectHelper.isEmpty(data)) {
+            // grab everything if data is empty
+            return newExchange;
+        }
+
+        Iterable it = ObjectHelper.createIterable(data, ",");
+        for (Object k : it) {
+            String part = k.toString();
+            if ("body".equals(part)) {
+                oldExchange.getMessage().setBody(newExchange.getMessage().getBody());
+            } else if ("headers".equals(part)) {
+                oldExchange.getMessage().getHeaders().putAll(newExchange.getMessage().getHeaders());
+            }
+        }
+
+        return oldExchange;
+    }
+}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
new file mode 100644
index 0000000..a9cac9a
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultClaimCheckRepository;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.ClaimCheckRepository;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClaimCheckProcessor.class);
+    private CamelContext camelContext;
+    private String id;
+    private String operation;
+    private AggregationStrategy aggregationStrategy;
+    private String key;
+    private String data;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getOperation() {
+        return operation;
+    }
+
+    public void setOperation(String operation) {
+        this.operation = operation;
+    }
+
+    public AggregationStrategy getAggregationStrategy() {
+        return aggregationStrategy;
+    }
+
+    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
+        this.aggregationStrategy = aggregationStrategy;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public String getData() {
+        return data;
+    }
+
+    public void setData(String data) {
+        this.data = data;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // the repository is scoped per exchange
+        ClaimCheckRepository repo = exchange.getProperty(Exchange.CLAIM_CHECK_REPOSITORY, ClaimCheckRepository.class);
+        if (repo == null) {
+            repo = new DefaultClaimCheckRepository();
+            exchange.setProperty(Exchange.CLAIM_CHECK_REPOSITORY, repo);
+        }
+
+        try {
+            if ("set".equals(operation)) {
+                // copy exchange, and do not share the unit of work
+                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+                boolean addedNew = repo.add(key, copy);
+                if (addedNew) {
+                    LOG.debug("Add: {} -> {}", key, copy);
+                } else {
+                    LOG.debug("Override: {} -> {}", key, copy);
+                }
+            } else if ("get".equals(operation)) {
+                Exchange copy = repo.get(key);
+                LOG.debug("Get: {} -> {}", key, exchange);
+                if (copy != null) {
+                    Exchange result = aggregationStrategy.aggregate(exchange, copy);
+                    if (result != null) {
+                        ExchangeHelper.copyResultsPreservePattern(exchange, result);
+                    }
+                }
+            } else if ("getAndRemove".equals(operation)) {
+                Exchange copy = repo.getAndRemove(key);
+                LOG.debug("GetAndRemove: {} -> {}", key, exchange);
+                if (copy != null) {
+                    // prepare the exchanges for aggregation
+                    ExchangeHelper.prepareAggregation(exchange, copy);
+                    Exchange result = aggregationStrategy.aggregate(exchange, copy);
+                    if (result != null) {
+                        ExchangeHelper.copyResultsPreservePattern(exchange, result);
+                    }
+                }
+            } else if ("push".equals(operation)) {
+                // copy exchange, and do not share the unit of work
+                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+                LOG.debug("Push: {} -> {}", key, copy);
+                repo.push(copy);
+            } else if ("pop".equals(operation)) {
+                Exchange copy = repo.pop();
+                LOG.debug("Pop: {} -> {}", key, exchange);
+                if (copy != null) {
+                    // prepare the exchanges for aggregation
+                    ExchangeHelper.prepareAggregation(exchange, copy);
+                    Exchange result = aggregationStrategy.aggregate(exchange, copy);
+                    if (result != null) {
+                        ExchangeHelper.copyResultsPreservePattern(exchange, result);
+                    }
+                }
+            }
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+
+        callback.done(true);
+        return true;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(operation, "operation", this);
+
+        if (aggregationStrategy == null) {
+            aggregationStrategy = createAggregationStrategy(data);
+        }
+        if (aggregationStrategy instanceof CamelContextAware) {
+            ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
+        }
+
+        ServiceHelper.startServices(aggregationStrategy);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(aggregationStrategy);
+    }
+
+    @Override
+    public String toString() {
+        return "ClaimCheck[" + operation + "]";
+    }
+
+    protected AggregationStrategy createAggregationStrategy(String data) {
+        return new ClaimCheckAggregationStrategy(data);
+    }
+}
diff --git a/camel-core/src/main/java/org/apache/camel/spi/ClaimCheckRepository.java b/camel-core/src/main/java/org/apache/camel/spi/ClaimCheckRepository.java
new file mode 100644
index 0000000..7292f4b
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/ClaimCheckRepository.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Service;
+
+/**
+ * Access to a repository of Keys to implement the
+ * <a href="http://camel.apache.org/claim-check.html">Claim Check</a> pattern.
+ * <p/>
+ * The <tt>add</tt> and <tt>contains</tt> methods is operating according to the {@link java.util.Set} contract.
+ */
+public interface ClaimCheckRepository extends Service {
+
+    /**
+     * Adds the exchange to the repository.
+     *
+     * @param key the claim check key
+     * @return <tt>true</tt> if this repository did <b>not</b> already contain the specified key
+     */
+    boolean add(String key, Exchange exchange);
+
+    /**
+     * Returns <tt>true</tt> if this repository contains the specified key.
+     *
+     * @param key the claim check key
+     * @return <tt>true</tt> if this repository contains the specified key
+     */
+    boolean contains(String key);
+
+    /**
+     * Gets the exchange from the repository.
+     *
+     * @param key the claim check key
+     */
+    Exchange get(String key);
+
+    /**
+     * Gets and removes the exchange from the repository.
+     *
+     * @param key the claim check key
+     * @return the removed exchange, or <tt>null</tt> if the key did not exists.
+     */
+    Exchange getAndRemove(String key);
+
+    /**
+     * Pushes the exchange on top of the repository.
+     */
+    void push(Exchange exchange);
+
+    /**
+     * Pops the repository and returns the latest.
+     */
+    Exchange pop();
+
+    /**
+     * Clear the repository.
+     */
+    void clear();
+
+}
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 3e6ecb6..672523e 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -384,8 +384,8 @@ public final class ExchangeHelper {
      * Copies the <code>source</code> exchange to <code>target</code> exchange
      * preserving the {@link ExchangePattern} of <code>target</code>.
      *
-     * @param source source exchange.
      * @param result target exchange.
+     * @param source source exchange.
      */
     public static void copyResultsPreservePattern(Exchange result, Exchange source) {
 
diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
index 793f23b..ffe888e 100644
--- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
+++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
@@ -19,6 +19,8 @@ AOPDefinition
 BeanDefinition
 CatchDefinition
 ChoiceDefinition
+ClaimCheckDefinition
+ClaimCheckOperation
 ConvertBodyDefinition
 ContextScanDefinition
 DataFormatDefinition
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetAndRemoveSetTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetAndRemoveSetTest.java
new file mode 100644
index 0000000..4ba7a37
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetAndRemoveSetTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipGetAndRemoveSetTest extends ContextTestSupport {
+
+    public void testGetAndRemoveSet() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:d").expectedBodiesReceived("Hi World");
+        // it was removed so the data is not changed
+        getMockEndpoint("mock:e").expectedBodiesReceived("Hi World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("mock:a")
+                    .claimCheck(ClaimCheckOperation.set, "foo")
+                    .transform().constant("Bye World")
+                    .to("mock:b")
+                    .claimCheck(ClaimCheckOperation.getAndRemove, "foo")
+                    .to("mock:c")
+                    .transform().constant("Hi World")
+                    .to("mock:d")
+                    .claimCheck(ClaimCheckOperation.getAndRemove, "foo")
+                    .to("mock:e");
+            }
+        };
+    }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetSetTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetSetTest.java
new file mode 100644
index 0000000..893040e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetSetTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipGetSetTest extends ContextTestSupport {
+
+    public void testGetSet() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:d").expectedBodiesReceived("Hi World");
+        getMockEndpoint("mock:e").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("mock:a")
+                    .claimCheck(ClaimCheckOperation.set, "foo")
+                    .transform().constant("Bye World")
+                    .to("mock:b")
+                    .claimCheck(ClaimCheckOperation.get, "foo")
+                    .to("mock:c")
+                    .transform().constant("Hi World")
+                    .to("mock:d")
+                    .claimCheck(ClaimCheckOperation.get, "foo")
+                    .to("mock:e");
+            }
+        };
+    }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopBodyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopBodyTest.java
new file mode 100644
index 0000000..59529bb
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopBodyTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipPushPopBodyTest extends ContextTestSupport {
+
+    public void testPushPopBody() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:a").expectedHeaderReceived("foo", 123);
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:b").expectedHeaderReceived("foo", 456);
+        getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:c").expectedHeaderReceived("foo", 456);
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "foo", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("mock:a")
+                    .claimCheck(ClaimCheckOperation.push)
+                    .transform().constant("Bye World")
+                    .setHeader("foo", constant(456))
+                    .to("mock:b")
+                    // only merge in the message body
+                    .claimCheck(ClaimCheckOperation.pop, null, "body")
+                    .to("mock:c");
+            }
+        };
+    }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopHeadersTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopHeadersTest.java
new file mode 100644
index 0000000..3cf1e4f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopHeadersTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipPushPopHeadersTest extends ContextTestSupport {
+
+    public void testPushPopHeaders() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:a").expectedHeaderReceived("foo", 123);
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:b").expectedHeaderReceived("foo", 456);
+        getMockEndpoint("mock:c").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:c").expectedHeaderReceived("foo", 123);
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "foo", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("mock:a")
+                    .claimCheck(ClaimCheckOperation.push)
+                    .transform().constant("Bye World")
+                    .setHeader("foo", constant(456))
+                    .to("mock:b")
+                    // only merge in the message headers
+                    .claimCheck(ClaimCheckOperation.pop, null, "headers")
+                    .to("mock:c");
+            }
+        };
+    }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopTest.java
new file mode 100644
index 0000000..9e50e23
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipPushPopTest extends ContextTestSupport {
+
+    public void testPushPop() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("mock:a")
+                    .claimCheck(ClaimCheckOperation.push)
+                    .transform().constant("Bye World")
+                    .to("mock:b")
+                    .claimCheck(ClaimCheckOperation.pop)
+                    .to("mock:c");
+            }
+        };
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.