You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/02/05 15:28:22 UTC
[camel] 01/07: CAMEL-8958: Claim Check EIP with push/pop. Work in
progress.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch 8958
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8ab6a6f46e2687a5528f3843e0138a4c9f4abf9b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 4 14:47:25 2018 +0100
CAMEL-8958: Claim Check EIP with push/pop. Work in progress.
---
.../src/main/java/org/apache/camel/Exchange.java | 17 +-
.../camel/impl/DefaultClaimCheckRepository.java | 81 +++++++++
.../apache/camel/model/ClaimCheckDefinition.java | 188 ++++++++++++++++++++
.../apache/camel/model/ClaimCheckOperation.java | 28 +++
.../apache/camel/model/ProcessorDefinition.java | 31 ++++
.../processor/ClaimCheckAggregationStrategy.java | 55 ++++++
.../camel/processor/ClaimCheckProcessor.java | 193 +++++++++++++++++++++
.../org/apache/camel/spi/ClaimCheckRepository.java | 76 ++++++++
.../java/org/apache/camel/util/ExchangeHelper.java | 2 +-
.../resources/org/apache/camel/model/jaxb.index | 2 +
.../ClaimCheckEipGetAndRemoveSetTest.java | 57 ++++++
.../camel/processor/ClaimCheckEipGetSetTest.java | 56 ++++++
.../processor/ClaimCheckEipPushPopBodyTest.java | 55 ++++++
.../processor/ClaimCheckEipPushPopHeadersTest.java | 55 ++++++
.../camel/processor/ClaimCheckEipPushPopTest.java | 50 ++++++
15 files changed, 937 insertions(+), 9 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java
index 541ea16..6ca2ac6 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -97,14 +97,15 @@ public interface Exchange {
// used across other Apache products such as AMQ, SMX etc.
String BREADCRUMB_ID = "breadcrumbId";
- String CHARSET_NAME = "CamelCharsetName";
- String CIRCUIT_BREAKER_STATE = "CamelCircuitBreakerState";
- String CREATED_TIMESTAMP = "CamelCreatedTimestamp";
- String CONTENT_ENCODING = "Content-Encoding";
- String CONTENT_LENGTH = "Content-Length";
- String CONTENT_TYPE = "Content-Type";
- String COOKIE_HANDLER = "CamelCookieHandler";
- String CORRELATION_ID = "CamelCorrelationId";
+ String CHARSET_NAME = "CamelCharsetName";
+ String CIRCUIT_BREAKER_STATE = "CamelCircuitBreakerState";
+ String CREATED_TIMESTAMP = "CamelCreatedTimestamp";
+ String CLAIM_CHECK_REPOSITORY = "CamelClaimCheckRepository";
+ String CONTENT_ENCODING = "Content-Encoding";
+ String CONTENT_LENGTH = "Content-Length";
+ String CONTENT_TYPE = "Content-Type";
+ String COOKIE_HANDLER = "CamelCookieHandler";
+ String CORRELATION_ID = "CamelCorrelationId";
String DATASET_INDEX = "CamelDataSetIndex";
String DEFAULT_CHARSET_PROPERTY = "org.apache.camel.default.charset";
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultClaimCheckRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultClaimCheckRepository.java
new file mode 100644
index 0000000..adac797
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultClaimCheckRepository.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.ClaimCheckRepository;
+
+public class DefaultClaimCheckRepository implements ClaimCheckRepository {
+
+ private final Map<String, Exchange> map = new HashMap();
+ private final Deque<Exchange> stack = new ArrayDeque<>();
+
+ @Override
+ public boolean add(String key, Exchange exchange) {
+ return map.put(key, exchange) == null;
+ }
+
+ @Override
+ public boolean contains(String key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public Exchange get(String key) {
+ return map.get(key);
+ }
+
+ @Override
+ public Exchange getAndRemove(String key) {
+ return map.remove(key);
+ }
+
+ @Override
+ public void push(Exchange exchange) {
+ stack.push(exchange);
+ }
+
+ @Override
+ public Exchange pop() {
+ if (!stack.isEmpty()) {
+ return stack.pop();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void clear() {
+ map.clear();
+ stack.clear();
+ }
+
+ @Override
+ public void start() throws Exception {
+ // noop
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // noop
+ }
+}
diff --git a/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
new file mode 100644
index 0000000..8db7fab
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.ClaimCheckProcessor;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
+
+public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinition> {
+
+ @XmlAttribute(required = true)
+ private ClaimCheckOperation operation;
+ @XmlAttribute
+ private String key;
+ @XmlAttribute
+ private String data;
+ @XmlAttribute(name = "strategyRef")
+ private String aggregationStrategyRef;
+ @XmlAttribute(name = "strategyMethodName")
+ private String aggregationStrategyMethodName;
+ @XmlTransient
+ private AggregationStrategy aggregationStrategy;
+
+ public ClaimCheckDefinition() {
+ }
+
+ @Override
+ public String toString() {
+ return "ClaimCheck";
+ }
+
+ @Override
+ public String getLabel() {
+ return "claimCheck";
+ }
+
+ @Override
+ public Processor createProcessor(RouteContext routeContext) throws Exception {
+ ObjectHelper.notNull(operation, "operation", this);
+
+ ClaimCheckProcessor claim = new ClaimCheckProcessor();
+ claim.setOperation(operation.name());
+ claim.setKey(getKey());
+ claim.setData(getData());
+
+ AggregationStrategy strategy = createAggregationStrategy(routeContext);
+ if (strategy != null) {
+ claim.setAggregationStrategy(strategy);
+ }
+
+ // only data or aggregation strategy can be configured not both
+ if (getData() != null && strategy != null) {
+ throw new IllegalArgumentException("Cannot use both data and custom aggregation strategy on ClaimCheck EIP");
+ }
+
+ return claim;
+ }
+
+ private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
+ AggregationStrategy strategy = getAggregationStrategy();
+ if (strategy == null && aggregationStrategyRef != null) {
+ Object aggStrategy = routeContext.lookup(aggregationStrategyRef, Object.class);
+ if (aggStrategy instanceof AggregationStrategy) {
+ strategy = (AggregationStrategy) aggStrategy;
+ } else if (aggStrategy != null) {
+ strategy = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName());
+ } else {
+ throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + aggregationStrategyRef);
+ }
+ }
+
+ if (strategy instanceof CamelContextAware) {
+ ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
+ }
+
+ return strategy;
+ }
+
+ // Fluent API
+ //-------------------------------------------------------------------------
+
+ /**
+ * The claim check operation.
+ */
+ public ClaimCheckDefinition operation(ClaimCheckOperation operation) {
+ setOperation(operation);
+ return this;
+ }
+
+ /**
+ * To use a specific key for claim check id.
+ */
+ public ClaimCheckDefinition key(String key) {
+ setKey(key);
+ return this;
+ }
+
+ /**
+ * What data to merge when claiming from the repository.
+ * // TODO: add more description here about the syntax
+ */
+ public ClaimCheckDefinition data(String data) {
+ setData(data);
+ return this;
+ }
+
+ public ClaimCheckDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy);
+ return this;
+ }
+
+ public ClaimCheckDefinition aggregationStrategyRef(String aggregationStrategyRef) {
+ setAggregationStrategyRef(aggregationStrategyRef);
+ return this;
+ }
+
+ // Properties
+ //-------------------------------------------------------------------------
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public ClaimCheckOperation getOperation() {
+ return operation;
+ }
+
+ public void setOperation(ClaimCheckOperation operation) {
+ this.operation = operation;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+
+ public String getAggregationStrategyRef() {
+ return aggregationStrategyRef;
+ }
+
+ public void setAggregationStrategyRef(String aggregationStrategyRef) {
+ this.aggregationStrategyRef = aggregationStrategyRef;
+ }
+
+ public String getAggregationStrategyMethodName() {
+ return aggregationStrategyMethodName;
+ }
+
+ public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
+ this.aggregationStrategyMethodName = aggregationStrategyMethodName;
+ }
+
+ public AggregationStrategy getAggregationStrategy() {
+ return aggregationStrategy;
+ }
+
+ public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
+ this.aggregationStrategy = aggregationStrategy;
+ }
+}
diff --git a/camel-core/src/main/java/org/apache/camel/model/ClaimCheckOperation.java b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckOperation.java
new file mode 100644
index 0000000..5339351
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckOperation.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlEnum;
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType
+@XmlEnum
+public enum ClaimCheckOperation {
+
+ get, getAndRemove, set, push, pop
+
+}
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index bb74e2a..9d168b1 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -69,6 +69,7 @@ import org.apache.camel.processor.interceptor.StreamCaching;
import org.apache.camel.processor.loadbalancer.LoadBalancer;
import org.apache.camel.spi.AsEndpointUri;
import org.apache.camel.spi.AsPredicate;
+import org.apache.camel.spi.ClaimCheckRepository;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.IdempotentRepository;
@@ -3444,6 +3445,36 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
return ExpressionClause.createAndSetExpression(answer);
}
+ public ClaimCheckDefinition claimCheck() {
+ ClaimCheckDefinition answer = new ClaimCheckDefinition();
+ addOutput(answer);
+ return answer;
+ }
+
+ public Type claimCheck(ClaimCheckOperation operation) {
+ ClaimCheckDefinition answer = new ClaimCheckDefinition();
+ answer.setOperation(operation);
+ addOutput(answer);
+ return (Type) this;
+ }
+
+ public Type claimCheck(ClaimCheckOperation operation, String key) {
+ ClaimCheckDefinition answer = new ClaimCheckDefinition();
+ answer.setOperation(operation);
+ answer.setKey(key);
+ addOutput(answer);
+ return (Type) this;
+ }
+
+ public Type claimCheck(ClaimCheckOperation operation, String key, String data) {
+ ClaimCheckDefinition answer = new ClaimCheckDefinition();
+ answer.setOperation(operation);
+ answer.setKey(key);
+ answer.setData(data);
+ addOutput(answer);
+ return (Type) this;
+ }
+
/**
* The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
* enriches an exchange with additional data obtained from a <code>resourceUri</code>.
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java
new file mode 100644
index 0000000..c22cde9
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.util.ObjectHelper;
+
+public class ClaimCheckAggregationStrategy implements AggregationStrategy {
+
+ private final String data;
+ // TODO: pattern matching for headers, eg headers:foo*, headers, headers:*, header:foo,header:bar
+
+ public ClaimCheckAggregationStrategy(String data) {
+ this.data = data;
+ }
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (newExchange == null) {
+ return oldExchange;
+ }
+
+ if (ObjectHelper.isEmpty(data)) {
+ // grab everything if data is empty
+ return newExchange;
+ }
+
+ Iterable it = ObjectHelper.createIterable(data, ",");
+ for (Object k : it) {
+ String part = k.toString();
+ if ("body".equals(part)) {
+ oldExchange.getMessage().setBody(newExchange.getMessage().getBody());
+ } else if ("headers".equals(part)) {
+ oldExchange.getMessage().getHeaders().putAll(newExchange.getMessage().getHeaders());
+ }
+ }
+
+ return oldExchange;
+ }
+}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
new file mode 100644
index 0000000..a9cac9a
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultClaimCheckRepository;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.ClaimCheckRepository;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClaimCheckProcessor.class);
+ private CamelContext camelContext;
+ private String id;
+ private String operation;
+ private AggregationStrategy aggregationStrategy;
+ private String key;
+ private String data;
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getOperation() {
+ return operation;
+ }
+
+ public void setOperation(String operation) {
+ this.operation = operation;
+ }
+
+ public AggregationStrategy getAggregationStrategy() {
+ return aggregationStrategy;
+ }
+
+ public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
+ this.aggregationStrategy = aggregationStrategy;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ // the repository is scoped per exchange
+ ClaimCheckRepository repo = exchange.getProperty(Exchange.CLAIM_CHECK_REPOSITORY, ClaimCheckRepository.class);
+ if (repo == null) {
+ repo = new DefaultClaimCheckRepository();
+ exchange.setProperty(Exchange.CLAIM_CHECK_REPOSITORY, repo);
+ }
+
+ try {
+ if ("set".equals(operation)) {
+ // copy exchange, and do not share the unit of work
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+ boolean addedNew = repo.add(key, copy);
+ if (addedNew) {
+ LOG.debug("Add: {} -> {}", key, copy);
+ } else {
+ LOG.debug("Override: {} -> {}", key, copy);
+ }
+ } else if ("get".equals(operation)) {
+ Exchange copy = repo.get(key);
+ LOG.debug("Get: {} -> {}", key, exchange);
+ if (copy != null) {
+ Exchange result = aggregationStrategy.aggregate(exchange, copy);
+ if (result != null) {
+ ExchangeHelper.copyResultsPreservePattern(exchange, result);
+ }
+ }
+ } else if ("getAndRemove".equals(operation)) {
+ Exchange copy = repo.getAndRemove(key);
+ LOG.debug("GetAndRemove: {} -> {}", key, exchange);
+ if (copy != null) {
+ // prepare the exchanges for aggregation
+ ExchangeHelper.prepareAggregation(exchange, copy);
+ Exchange result = aggregationStrategy.aggregate(exchange, copy);
+ if (result != null) {
+ ExchangeHelper.copyResultsPreservePattern(exchange, result);
+ }
+ }
+ } else if ("push".equals(operation)) {
+ // copy exchange, and do not share the unit of work
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+ LOG.debug("Push: {} -> {}", key, copy);
+ repo.push(copy);
+ } else if ("pop".equals(operation)) {
+ Exchange copy = repo.pop();
+ LOG.debug("Pop: {} -> {}", key, exchange);
+ if (copy != null) {
+ // prepare the exchanges for aggregation
+ ExchangeHelper.prepareAggregation(exchange, copy);
+ Exchange result = aggregationStrategy.aggregate(exchange, copy);
+ if (result != null) {
+ ExchangeHelper.copyResultsPreservePattern(exchange, result);
+ }
+ }
+ }
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+
+ callback.done(true);
+ return true;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(operation, "operation", this);
+
+ if (aggregationStrategy == null) {
+ aggregationStrategy = createAggregationStrategy(data);
+ }
+ if (aggregationStrategy instanceof CamelContextAware) {
+ ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
+ }
+
+ ServiceHelper.startServices(aggregationStrategy);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(aggregationStrategy);
+ }
+
+ @Override
+ public String toString() {
+ return "ClaimCheck[" + operation + "]";
+ }
+
+ protected AggregationStrategy createAggregationStrategy(String data) {
+ return new ClaimCheckAggregationStrategy(data);
+ }
+}
diff --git a/camel-core/src/main/java/org/apache/camel/spi/ClaimCheckRepository.java b/camel-core/src/main/java/org/apache/camel/spi/ClaimCheckRepository.java
new file mode 100644
index 0000000..7292f4b
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/ClaimCheckRepository.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Service;
+
+/**
+ * Access to a repository of Keys to implement the
+ * <a href="http://camel.apache.org/claim-check.html">Claim Check</a> pattern.
+ * <p/>
+ * The <tt>add</tt> and <tt>contains</tt> methods is operating according to the {@link java.util.Set} contract.
+ */
+public interface ClaimCheckRepository extends Service {
+
+ /**
+ * Adds the exchange to the repository.
+ *
+ * @param key the claim check key
+ * @return <tt>true</tt> if this repository did <b>not</b> already contain the specified key
+ */
+ boolean add(String key, Exchange exchange);
+
+ /**
+ * Returns <tt>true</tt> if this repository contains the specified key.
+ *
+ * @param key the claim check key
+ * @return <tt>true</tt> if this repository contains the specified key
+ */
+ boolean contains(String key);
+
+ /**
+ * Gets the exchange from the repository.
+ *
+ * @param key the claim check key
+ */
+ Exchange get(String key);
+
+ /**
+ * Gets and removes the exchange from the repository.
+ *
+ * @param key the claim check key
+ * @return the removed exchange, or <tt>null</tt> if the key did not exists.
+ */
+ Exchange getAndRemove(String key);
+
+ /**
+ * Pushes the exchange on top of the repository.
+ */
+ void push(Exchange exchange);
+
+ /**
+ * Pops the repository and returns the latest.
+ */
+ Exchange pop();
+
+ /**
+ * Clear the repository.
+ */
+ void clear();
+
+}
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 3e6ecb6..672523e 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -384,8 +384,8 @@ public final class ExchangeHelper {
* Copies the <code>source</code> exchange to <code>target</code> exchange
* preserving the {@link ExchangePattern} of <code>target</code>.
*
- * @param source source exchange.
* @param result target exchange.
+ * @param source source exchange.
*/
public static void copyResultsPreservePattern(Exchange result, Exchange source) {
diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
index 793f23b..ffe888e 100644
--- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
+++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
@@ -19,6 +19,8 @@ AOPDefinition
BeanDefinition
CatchDefinition
ChoiceDefinition
+ClaimCheckDefinition
+ClaimCheckOperation
ConvertBodyDefinition
ContextScanDefinition
DataFormatDefinition
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetAndRemoveSetTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetAndRemoveSetTest.java
new file mode 100644
index 0000000..4ba7a37
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetAndRemoveSetTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipGetAndRemoveSetTest extends ContextTestSupport {
+
+ public void testGetAndRemoveSet() throws Exception {
+ getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:d").expectedBodiesReceived("Hi World");
+ // it was removed so the data is not changed
+ getMockEndpoint("mock:e").expectedBodiesReceived("Hi World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("mock:a")
+ .claimCheck(ClaimCheckOperation.set, "foo")
+ .transform().constant("Bye World")
+ .to("mock:b")
+ .claimCheck(ClaimCheckOperation.getAndRemove, "foo")
+ .to("mock:c")
+ .transform().constant("Hi World")
+ .to("mock:d")
+ .claimCheck(ClaimCheckOperation.getAndRemove, "foo")
+ .to("mock:e");
+ }
+ };
+ }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetSetTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetSetTest.java
new file mode 100644
index 0000000..893040e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipGetSetTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipGetSetTest extends ContextTestSupport {
+
+ public void testGetSet() throws Exception {
+ getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:d").expectedBodiesReceived("Hi World");
+ getMockEndpoint("mock:e").expectedBodiesReceived("Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("mock:a")
+ .claimCheck(ClaimCheckOperation.set, "foo")
+ .transform().constant("Bye World")
+ .to("mock:b")
+ .claimCheck(ClaimCheckOperation.get, "foo")
+ .to("mock:c")
+ .transform().constant("Hi World")
+ .to("mock:d")
+ .claimCheck(ClaimCheckOperation.get, "foo")
+ .to("mock:e");
+ }
+ };
+ }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopBodyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopBodyTest.java
new file mode 100644
index 0000000..59529bb
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopBodyTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipPushPopBodyTest extends ContextTestSupport {
+
+ public void testPushPopBody() throws Exception {
+ getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:a").expectedHeaderReceived("foo", 123);
+ getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:b").expectedHeaderReceived("foo", 456);
+ getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:c").expectedHeaderReceived("foo", 456);
+
+ template.sendBodyAndHeader("direct:start", "Hello World", "foo", 123);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("mock:a")
+ .claimCheck(ClaimCheckOperation.push)
+ .transform().constant("Bye World")
+ .setHeader("foo", constant(456))
+ .to("mock:b")
+ // only merge in the message body
+ .claimCheck(ClaimCheckOperation.pop, null, "body")
+ .to("mock:c");
+ }
+ };
+ }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopHeadersTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopHeadersTest.java
new file mode 100644
index 0000000..3cf1e4f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopHeadersTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipPushPopHeadersTest extends ContextTestSupport {
+
+ public void testPushPopHeaders() throws Exception {
+ getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:a").expectedHeaderReceived("foo", 123);
+ getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:b").expectedHeaderReceived("foo", 456);
+ getMockEndpoint("mock:c").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:c").expectedHeaderReceived("foo", 123);
+
+ template.sendBodyAndHeader("direct:start", "Hello World", "foo", 123);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("mock:a")
+ .claimCheck(ClaimCheckOperation.push)
+ .transform().constant("Bye World")
+ .setHeader("foo", constant(456))
+ .to("mock:b")
+ // only merge in the message headers
+ .claimCheck(ClaimCheckOperation.pop, null, "headers")
+ .to("mock:c");
+ }
+ };
+ }
+}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopTest.java
new file mode 100644
index 0000000..9e50e23
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipPushPopTest extends ContextTestSupport {
+
+ public void testPushPop() throws Exception {
+ getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("mock:a")
+ .claimCheck(ClaimCheckOperation.push)
+ .transform().constant("Bye World")
+ .to("mock:b")
+ .claimCheck(ClaimCheckOperation.pop)
+ .to("mock:c");
+ }
+ };
+ }
+}
--
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.