You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/02/06 09:14:19 UTC
[camel] 03/03: CAMEL-8958: Claim Check EIP with push/pop. Work in
progress.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch 8958
in repository https://gitbox.apache.org/repos/asf/camel.git
commit a1b419bacca8fccb3f4408e32270d8830d678010
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 6 09:55:41 2018 +0100
CAMEL-8958: Claim Check EIP with push/pop. Work in progress.
---
camel-core/src/main/docs/eips/claimCheck-eip.adoc | 5 +-
.../apache/camel/model/ClaimCheckDefinition.java | 33 ++++++-
.../apache/camel/model/ProcessorDefinition.java | 25 +++--
.../processor/ClaimCheckAggregationStrategy.java | 106 +++++++++++++++++----
.../camel/processor/ClaimCheckProcessor.java | 12 ++-
.../ClaimCheckEipPushPopExcludeBodyTest.java | 60 ++++++++++++
6 files changed, 213 insertions(+), 28 deletions(-)
diff --git a/camel-core/src/main/docs/eips/claimCheck-eip.adoc b/camel-core/src/main/docs/eips/claimCheck-eip.adoc
index e4dc44a..a0ff103 100644
--- a/camel-core/src/main/docs/eips/claimCheck-eip.adoc
+++ b/camel-core/src/main/docs/eips/claimCheck-eip.adoc
@@ -12,7 +12,7 @@ NOTE: The Camel implementation of this EIP pattern stores the message content te
// eip options: START
-The Claim Check EIP supports 5 options which are listed below:
+The Claim Check EIP supports 6 options which are listed below:
[width="100%",cols="2,5,^1,2",options="header"]
@@ -20,7 +20,8 @@ The Claim Check EIP supports 5 options which are listed below:
| Name | Description | Default | Type
| *operation* | *Required* The claim check operation to use. The following operations is supported: Get - Gets (does not remove) the claim check by the given key. GetAndRemove - Gets and remove the claim check by the given key. Set - Sets a new (will override if key already exists) claim check with the given key. Push - Sets a new claim check on the stack (does not use key). Pop - Gets the latest claim check from the stack (does not use key). | | ClaimCheckOperation
| *key* | To use a specific key for claim check id. | | String
-| *include* | What data to include when merging data back from claim check repository. The following syntax is supported: body - to aggregate the message body headers - to aggregate all the message headers header:pattern - to aggregate all the message headers that matches the pattern. The pattern syntax is documented by: link EndpointHelpermatchPattern(String String). You can specify multiple rules separated by comma. For example to include the message body and all headers starting with [...]
+| *include* | What data to include when merging data back from claim check repository. The following syntax is supported: body - to aggregate the message body headers - to aggregate all the message headers header:pattern - to aggregate all the message headers that matches the pattern. The pattern syntax is documented by: link EndpointHelpermatchPattern(String String). You can specify multiple rules separated by comma. For example to include the message body and all headers starting with [...]
+| *exclude* | What data to exclude when merging data back from claim check repository. The following syntax is supported: body - to aggregate the message body headers - to aggregate all the message headers header:pattern - to aggregate all the message headers that matches the pattern. The pattern syntax is documented by: link EndpointHelpermatchPattern(String String). You can specify multiple rules separated by comma. For example to exclude the message body and all headers starting with [...]
| *strategyRef* | To use a custom AggregationStrategy instead of the default implementation. Notice you cannot use both custom aggregation strategy and configure data at the same time. | | String
| *strategyMethodName* | This option can be used to explicit declare the method name to use when using POJOs as the AggregationStrategy. | | String
|===
diff --git a/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
index a78fe60..4dc1609 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
@@ -47,6 +47,8 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
private String key;
@XmlAttribute
private String include;
+ @XmlAttribute
+ private String exclude;
@XmlAttribute(name = "strategyRef") @Metadata(label = "advanced")
private String aggregationStrategyRef;
@XmlAttribute(name = "strategyMethodName") @Metadata(label = "advanced")
@@ -79,6 +81,7 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
claim.setOperation(operation.name());
claim.setKey(getKey());
claim.setInclude(getInclude());
+ claim.setExclude(getExclude());
AggregationStrategy strategy = createAggregationStrategy(routeContext);
if (strategy != null) {
@@ -86,7 +89,7 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
}
// only data or aggregation strategy can be configured not both
- if (getInclude() != null && strategy != null) {
+ if ((getInclude() != null || getExclude() != null) && strategy != null) {
throw new IllegalArgumentException("Cannot use both include/exclude and custom aggregation strategy on ClaimCheck EIP");
}
@@ -153,6 +156,7 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
* You can specify multiple rules separated by comma. For example to include the message body and all headers starting with foo
* <tt>body,header:foo*</tt>.
* If the include rule is specified as empty or as wildcard then everything is included.
+ * If you have configured both include and exclude then exclude take precedence over include.
*/
public ClaimCheckDefinition include(String include) {
setInclude(include);
@@ -160,6 +164,25 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
}
/**
+ * What data to exclude when merging data back from claim check repository.
+ *
+ * The following syntax is supported:
+ * <ul>
+ * <li>body</li> - to aggregate the message body
+ * <li>headers</li> - to aggregate all the message headers
+ * <li>header:pattern</li> - to aggregate all the message headers that matches the pattern.
+ * The pattern syntax is documented by: {@link EndpointHelper#matchPattern(String, String)}.
+ * </ul>
+ * You can specify multiple rules separated by comma. For example to exclude the message body and all headers starting with bar
+ * <tt>body,header:bar*</tt>.
+ * If you have configured both include and exclude then exclude take precedence over include.
+ */
+ public ClaimCheckDefinition exclude(String exclude) {
+ setExclude(exclude);
+ return this;
+ }
+
+ /**
* To use a custom {@link AggregationStrategy} instead of the default implementation.
* Notice you cannot use both custom aggregation strategy and configure data at the same time.
*/
@@ -212,6 +235,14 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
this.include = include;
}
+ public String getExclude() {
+ return exclude;
+ }
+
+ public void setExclude(String exclude) {
+ this.exclude = exclude;
+ }
+
public String getAggregationStrategyRef() {
return aggregationStrategyRef;
}
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index ac0e635..39a0c6d 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3478,11 +3478,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* @param key the unique key to use for the get and set operations, can be <tt>null</tt> for push/pop operations
*/
public Type claimCheck(ClaimCheckOperation operation, String key) {
- ClaimCheckDefinition answer = new ClaimCheckDefinition();
- answer.setOperation(operation);
- answer.setKey(key);
- addOutput(answer);
- return (Type) this;
+ return claimCheck(operation, key, null, null);
}
/**
@@ -3492,13 +3488,30 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*
* @param operation the claim check operation to use.
* @param key the unique key to use for the get and set operations, can be <tt>null</tt> for push/pop operations
- * @param include describes what data to include and retrieve and merge back when using get or pop operations.
+ * @param include describes what data to include when merging data back when using get or pop operations.
*/
public Type claimCheck(ClaimCheckOperation operation, String key, String include) {
+ return claimCheck(operation, key, include, null);
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/claim-check.html">Claim Check EIP</a>
+ * allows you to replace message content with a claim check (a unique key),
+ * which can be used to retrieve the message content at a later time.
+ *
+ * @param operation the claim check operation to use.
+ * @param key the unique key to use for the get and set operations, can be <tt>null</tt> for push/pop operations
+ * @param include describes what data to include when merging data back when using get or pop operations.
+ * If you have configured both include and exclude then exclude take precedence over include.
+ * @param exclude describes what data to exclude when merging data back when using get or pop operations.
+ * If you have configured both include and exclude then exclude take precedence over include.
+ */
+ public Type claimCheck(ClaimCheckOperation operation, String key, String include, String exclude) {
ClaimCheckDefinition answer = new ClaimCheckDefinition();
answer.setOperation(operation);
answer.setKey(key);
answer.setInclude(include);
+ answer.setExclude(exclude);
addOutput(answer);
return (Type) this;
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java
index 02e0d7d..fb58346 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java
@@ -23,6 +23,8 @@ import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.EndpointHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Default {@link AggregationStrategy} used by the {@link ClaimCheckProcessor} EIP.
@@ -37,10 +39,13 @@ import org.apache.camel.util.StringHelper;
* You can specify multiple rules separated by comma. For example to include the message body and all headers starting with foo
* <tt>body,header:foo*</tt>.
* If the include rule is specified as empty or as wildcard then everything is merged.
+ * If you have configured both include and exclude then exclude take precedence over include.
*/
public class ClaimCheckAggregationStrategy implements AggregationStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(ClaimCheckAggregationStrategy.class);
private String include;
+ private String exclude;
public ClaimCheckAggregationStrategy() {
}
@@ -53,34 +58,82 @@ public class ClaimCheckAggregationStrategy implements AggregationStrategy {
this.include = include;
}
+ public String getExclude() {
+ return exclude;
+ }
+
+ public void setExclude(String exclude) {
+ this.exclude = exclude;
+ }
+
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (newExchange == null) {
return oldExchange;
}
- if (ObjectHelper.isEmpty(include) || "*".equals(include)) {
- // grab everything if data is empty or wildcard
+ if (ObjectHelper.isEmpty(exclude) && (ObjectHelper.isEmpty(include) || "*".equals(include))) {
+ // grab everything if include is empty or wildcard (and exclude is not in use)
return newExchange;
}
- Iterable it = ObjectHelper.createIterable(include, ",");
- for (Object k : it) {
- String part = k.toString();
- if ("body".equals(part)) {
+ // if we have include
+ if (ObjectHelper.isNotEmpty(include)) {
+ Iterable it = ObjectHelper.createIterable(include, ",");
+ for (Object k : it) {
+ String part = k.toString();
+ if ("body".equals(part) && !isExcluded("body")) {
+ oldExchange.getMessage().setBody(newExchange.getMessage().getBody());
+ LOG.trace("Including: body");
+ } else if ("headers".equals(part) && !isExcluded("headers")) {
+ oldExchange.getMessage().getHeaders().putAll(newExchange.getMessage().getHeaders());
+ LOG.trace("Including: headers");
+ } else if (part.startsWith("header:")) {
+ // pattern matching for headers, eg header:foo, header:foo*, header:(foo|bar)
+ String after = StringHelper.after(part, "header:");
+ Iterable i = ObjectHelper.createIterable(after, ",");
+ for (Object o : i) {
+ String pattern = o.toString();
+ for (Map.Entry<String, Object> header : newExchange.getMessage().getHeaders().entrySet()) {
+ String key = header.getKey();
+ boolean matched = EndpointHelper.matchPattern(key, pattern);
+ if (matched && !isExcluded(key)) {
+ LOG.trace("Including: header:{}", key);
+ oldExchange.getMessage().getHeaders().put(key, header.getValue());
+ }
+ }
+ }
+ }
+ }
+ } else if (ObjectHelper.isNotEmpty(exclude)) {
+ // grab body unless its excluded
+ if (!isExcluded("body")) {
oldExchange.getMessage().setBody(newExchange.getMessage().getBody());
- } else if ("headers".equals(part)) {
- oldExchange.getMessage().getHeaders().putAll(newExchange.getMessage().getHeaders());
- } else if (part.startsWith("header:")) {
- // pattern matching for headers, eg header:foo, header:foo*, header:(foo|bar)
- String after = StringHelper.after(part, "header:");
- Iterable i = ObjectHelper.createIterable(after, ",");
- for (Object o : i) {
- String pattern = o.toString();
- for (Map.Entry<String, Object> header : newExchange.getMessage().getHeaders().entrySet()) {
- String key = header.getKey();
- if (EndpointHelper.matchPattern(key, pattern)) {
- oldExchange.getMessage().getHeaders().put(key, header.getValue());
+ LOG.trace("Including: body");
+ }
+
+ // if not all headers is excluded, then check each header one-by-one
+ if (!isExcluded("headers")) {
+ // check if we exclude a specific headers
+ Iterable it = ObjectHelper.createIterable(exclude, ",");
+ for (Object k : it) {
+ String part = k.toString();
+ if (part.startsWith("header:")) {
+ // pattern matching for headers, eg header:foo, header:foo*, header:(foo|bar)
+ String after = StringHelper.after(part, "header:");
+ Iterable i = ObjectHelper.createIterable(after, ",");
+ for (Object o : i) {
+ String pattern = o.toString();
+ for (Map.Entry<String, Object> header : newExchange.getMessage().getHeaders().entrySet()) {
+ String key = header.getKey();
+ boolean excluded = EndpointHelper.matchPattern(key, pattern);
+ if (!excluded) {
+ LOG.trace("Including: header:{}", key);
+ oldExchange.getMessage().getHeaders().put(key, header.getValue());
+ } else {
+ LOG.trace("Excluding: header:{}", key);
+ }
+ }
}
}
}
@@ -89,4 +142,21 @@ public class ClaimCheckAggregationStrategy implements AggregationStrategy {
return oldExchange;
}
+
+ private boolean isExcluded(String key) {
+ if (ObjectHelper.isEmpty(exclude)) {
+ return false;
+ }
+ String[] excludes = exclude.split(",");
+ for (String pattern : excludes) {
+ if (pattern.startsWith("header:")) {
+ pattern = StringHelper.after(pattern, "header:");
+ }
+ if (EndpointHelper.matchPattern(key, pattern)) {
+ LOG.trace("Excluding: {}", key);
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
index 137ecbf..2421878 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
* <p/>
* The current Claim Check EIP implementation in Camel is only intended for temporary memory repository. Likewise
* the repository is not shared among {@link Exchange}s, but a private instance is created per {@link Exchange}.
- * This guards against concurrent and thread-safe issues. For off-memeory persistent storage of data, then use
+ * This guards against concurrent and thread-safe issues. For off-memory persistent storage of data, then use
* any of the many Camel components that support persistent storage, and do not use this Claim Check EIP implementation.
*/
public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
@@ -50,6 +50,7 @@ public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcesso
private AggregationStrategy aggregationStrategy;
private String key;
private String include;
+ private String exclude;
@Override
public CamelContext getCamelContext() {
@@ -103,6 +104,14 @@ public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcesso
this.include = include;
}
+ public String getExclude() {
+ return exclude;
+ }
+
+ public void setExclude(String exclude) {
+ this.exclude = exclude;
+ }
+
public void process(Exchange exchange) throws Exception {
AsyncProcessorHelper.process(this, exchange);
}
@@ -198,6 +207,7 @@ public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcesso
protected AggregationStrategy createAggregationStrategy() {
ClaimCheckAggregationStrategy answer = new ClaimCheckAggregationStrategy();
answer.setInclude(include);
+ answer.setExclude(exclude);
return answer;
}
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopExcludeBodyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopExcludeBodyTest.java
new file mode 100644
index 0000000..6ddbb29
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopExcludeBodyTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipPushPopExcludeBodyTest extends ContextTestSupport {
+
+ public void testPushPopBodyExclude() throws Exception {
+ getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:a").expectedHeaderReceived("foo", 123);
+ getMockEndpoint("mock:a").expectedHeaderReceived("bar", "Moes");
+ getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:b").expectedHeaderReceived("foo", 456);
+ getMockEndpoint("mock:b").expectedHeaderReceived("bar", "Jacks");
+ getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:c").expectedHeaderReceived("foo", 123);
+ getMockEndpoint("mock:c").expectedHeaderReceived("bar", "Jacks");
+
+ template.sendBodyAndHeader("direct:start", "Hello World", "foo", 123);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .setHeader("bar", constant("Moes"))
+ .to("mock:a")
+ .claimCheck(ClaimCheckOperation.Push)
+ .transform().constant("Bye World")
+ .setHeader("foo", constant(456))
+ .setHeader("bar", constant("Jacks"))
+ .to("mock:b")
+ // skip the foo header
+ .claimCheck(ClaimCheckOperation.Pop, null, null, "header:bar")
+ .to("mock:c");
+ }
+ };
+ }
+}
--
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.