You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/02/06 09:14:19 UTC

[camel] 03/03: CAMEL-8958: Claim Check EIP with push/pop. Work in progress.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch 8958
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a1b419bacca8fccb3f4408e32270d8830d678010
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Feb 6 09:55:41 2018 +0100

    CAMEL-8958: Claim Check EIP with push/pop. Work in progress.
---
 camel-core/src/main/docs/eips/claimCheck-eip.adoc  |   5 +-
 .../apache/camel/model/ClaimCheckDefinition.java   |  33 ++++++-
 .../apache/camel/model/ProcessorDefinition.java    |  25 +++--
 .../processor/ClaimCheckAggregationStrategy.java   | 106 +++++++++++++++++----
 .../camel/processor/ClaimCheckProcessor.java       |  12 ++-
 .../ClaimCheckEipPushPopExcludeBodyTest.java       |  60 ++++++++++++
 6 files changed, 213 insertions(+), 28 deletions(-)

diff --git a/camel-core/src/main/docs/eips/claimCheck-eip.adoc b/camel-core/src/main/docs/eips/claimCheck-eip.adoc
index e4dc44a..a0ff103 100644
--- a/camel-core/src/main/docs/eips/claimCheck-eip.adoc
+++ b/camel-core/src/main/docs/eips/claimCheck-eip.adoc
@@ -12,7 +12,7 @@ NOTE: The Camel implementation of this EIP pattern stores the message content te
 
 
 // eip options: START
-The Claim Check EIP supports 5 options which are listed below:
+The Claim Check EIP supports 6 options which are listed below:
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -20,7 +20,8 @@ The Claim Check EIP supports 5 options which are listed below:
 | Name | Description | Default | Type
 | *operation* | *Required* The claim check operation to use. The following operations is supported: Get - Gets (does not remove) the claim check by the given key. GetAndRemove - Gets and remove the claim check by the given key. Set - Sets a new (will override if key already exists) claim check with the given key. Push - Sets a new claim check on the stack (does not use key). Pop - Gets the latest claim check from the stack (does not use key). |  | ClaimCheckOperation
 | *key* | To use a specific key for claim check id. |  | String
-| *include* | What data to include when merging data back from claim check repository. The following syntax is supported: body - to aggregate the message body headers - to aggregate all the message headers header:pattern - to aggregate all the message headers that matches the pattern. The pattern syntax is documented by: link EndpointHelpermatchPattern(String String). You can specify multiple rules separated by comma. For example to include the message body and all headers starting with  [...]
+| *include* | What data to include when merging data back from claim check repository. The following syntax is supported: body - to aggregate the message body headers - to aggregate all the message headers header:pattern - to aggregate all the message headers that matches the pattern. The pattern syntax is documented by: link EndpointHelpermatchPattern(String String). You can specify multiple rules separated by comma. For example to include the message body and all headers starting with  [...]
+| *exclude* | What data to exclude when merging data back from claim check repository. The following syntax is supported: body - to aggregate the message body headers - to aggregate all the message headers header:pattern - to aggregate all the message headers that matches the pattern. The pattern syntax is documented by: link EndpointHelpermatchPattern(String String). You can specify multiple rules separated by comma. For example to exclude the message body and all headers starting with  [...]
 | *strategyRef* | To use a custom AggregationStrategy instead of the default implementation. Notice you cannot use both custom aggregation strategy and configure data at the same time. |  | String
 | *strategyMethodName* | This option can be used to explicit declare the method name to use when using POJOs as the AggregationStrategy. |  | String
 |===
diff --git a/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
index a78fe60..4dc1609 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ClaimCheckDefinition.java
@@ -47,6 +47,8 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
     private String key;
     @XmlAttribute
     private String include;
+    @XmlAttribute
+    private String exclude;
     @XmlAttribute(name = "strategyRef") @Metadata(label = "advanced")
     private String aggregationStrategyRef;
     @XmlAttribute(name = "strategyMethodName") @Metadata(label = "advanced")
@@ -79,6 +81,7 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
         claim.setOperation(operation.name());
         claim.setKey(getKey());
         claim.setInclude(getInclude());
+        claim.setExclude(getExclude());
 
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
         if (strategy != null) {
@@ -86,7 +89,7 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
         }
 
         // only data or aggregation strategy can be configured not both
-        if (getInclude() != null && strategy != null) {
+        if ((getInclude() != null || getExclude() != null) && strategy != null) {
             throw new IllegalArgumentException("Cannot use both include/exclude and custom aggregation strategy on ClaimCheck EIP");
         }
 
@@ -153,6 +156,7 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
      * You can specify multiple rules separated by comma. For example to include the message body and all headers starting with foo
      * <tt>body,header:foo*</tt>.
      * If the include rule is specified as empty or as wildcard then everything is included.
+     * If you have configured both include and exclude then exclude take precedence over include.
      */
     public ClaimCheckDefinition include(String include) {
         setInclude(include);
@@ -160,6 +164,25 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
     }
 
     /**
+     * What data to exclude when merging data back from claim check repository.
+     *
+     * The following syntax is supported:
+     * <ul>
+     *     <li>body</li> - to aggregate the message body
+     *     <li>headers</li> - to aggregate all the message headers
+     *     <li>header:pattern</li> - to aggregate all the message headers that matches the pattern.
+     *     The pattern syntax is documented by: {@link EndpointHelper#matchPattern(String, String)}.
+     * </ul>
+     * You can specify multiple rules separated by comma. For example to exclude the message body and all headers starting with bar
+     * <tt>body,header:bar*</tt>.
+     * If you have configured both include and exclude then exclude take precedence over include.
+     */
+    public ClaimCheckDefinition exclude(String exclude) {
+        setExclude(exclude);
+        return this;
+    }
+
+    /**
      * To use a custom {@link AggregationStrategy} instead of the default implementation.
      * Notice you cannot use both custom aggregation strategy and configure data at the same time.
      */
@@ -212,6 +235,14 @@ public class ClaimCheckDefinition extends NoOutputDefinition<ClaimCheckDefinitio
         this.include = include;
     }
 
+    public String getExclude() {
+        return exclude;
+    }
+
+    public void setExclude(String exclude) {
+        this.exclude = exclude;
+    }
+
     public String getAggregationStrategyRef() {
         return aggregationStrategyRef;
     }
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index ac0e635..39a0c6d 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3478,11 +3478,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * @param key       the unique key to use for the get and set operations, can be <tt>null</tt> for push/pop operations
      */
     public Type claimCheck(ClaimCheckOperation operation, String key) {
-        ClaimCheckDefinition answer = new ClaimCheckDefinition();
-        answer.setOperation(operation);
-        answer.setKey(key);
-        addOutput(answer);
-        return (Type) this;
+        return claimCheck(operation, key, null, null);
     }
 
     /**
@@ -3492,13 +3488,30 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      *
      * @param operation the claim check operation to use.
      * @param key       the unique key to use for the get and set operations, can be <tt>null</tt> for push/pop operations
-     * @param include   describes what data to include and retrieve and merge back when using get or pop operations.
+     * @param include   describes what data to include when merging data back when using get or pop operations.
      */
     public Type claimCheck(ClaimCheckOperation operation, String key, String include) {
+        return claimCheck(operation, key, include, null);
+    }
+
+    /**
+     * The <a href="http://camel.apache.org/claim-check.html">Claim Check EIP</a>
+     * allows you to replace message content with a claim check (a unique key),
+     * which can be used to retrieve the message content at a later time.
+     *
+     * @param operation the claim check operation to use.
+     * @param key       the unique key to use for the get and set operations, can be <tt>null</tt> for push/pop operations
+     * @param include   describes what data to include when merging data back when using get or pop operations.
+     *                  If you have configured both include and exclude then exclude take precedence over include.
+     * @param exclude   describes what data to exclude when merging data back when using get or pop operations.
+     *                  If you have configured both include and exclude then exclude take precedence over include.
+     */
+    public Type claimCheck(ClaimCheckOperation operation, String key, String include, String exclude) {
         ClaimCheckDefinition answer = new ClaimCheckDefinition();
         answer.setOperation(operation);
         answer.setKey(key);
         answer.setInclude(include);
+        answer.setExclude(exclude);
         addOutput(answer);
         return (Type) this;
     }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java
index 02e0d7d..fb58346 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckAggregationStrategy.java
@@ -23,6 +23,8 @@ import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StringHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default {@link AggregationStrategy} used by the {@link ClaimCheckProcessor} EIP.
@@ -37,10 +39,13 @@ import org.apache.camel.util.StringHelper;
  * You can specify multiple rules separated by comma. For example to include the message body and all headers starting with foo
  * <tt>body,header:foo*</tt>.
  * If the include rule is specified as empty or as wildcard then everything is merged.
+ * If you have configured both include and exclude then exclude take precedence over include.
  */
 public class ClaimCheckAggregationStrategy implements AggregationStrategy {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ClaimCheckAggregationStrategy.class);
     private String include;
+    private String exclude;
 
     public ClaimCheckAggregationStrategy() {
     }
@@ -53,34 +58,82 @@ public class ClaimCheckAggregationStrategy implements AggregationStrategy {
         this.include = include;
     }
 
+    public String getExclude() {
+        return exclude;
+    }
+
+    public void setExclude(String exclude) {
+        this.exclude = exclude;
+    }
+
     @Override
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         if (newExchange == null) {
             return oldExchange;
         }
 
-        if (ObjectHelper.isEmpty(include) || "*".equals(include)) {
-            // grab everything if data is empty or wildcard
+        if (ObjectHelper.isEmpty(exclude) && (ObjectHelper.isEmpty(include) || "*".equals(include))) {
+            // grab everything if include is empty or wildcard (and exclude is not in use)
             return newExchange;
         }
 
-        Iterable it = ObjectHelper.createIterable(include, ",");
-        for (Object k : it) {
-            String part = k.toString();
-            if ("body".equals(part)) {
+        // if we have include
+        if (ObjectHelper.isNotEmpty(include)) {
+            Iterable it = ObjectHelper.createIterable(include, ",");
+            for (Object k : it) {
+                String part = k.toString();
+                if ("body".equals(part) && !isExcluded("body")) {
+                    oldExchange.getMessage().setBody(newExchange.getMessage().getBody());
+                    LOG.trace("Including: body");
+                } else if ("headers".equals(part) && !isExcluded("headers")) {
+                    oldExchange.getMessage().getHeaders().putAll(newExchange.getMessage().getHeaders());
+                    LOG.trace("Including: headers");
+                } else if (part.startsWith("header:")) {
+                    // pattern matching for headers, eg header:foo, header:foo*, header:(foo|bar)
+                    String after = StringHelper.after(part, "header:");
+                    Iterable i = ObjectHelper.createIterable(after, ",");
+                    for (Object o : i) {
+                        String pattern = o.toString();
+                        for (Map.Entry<String, Object> header : newExchange.getMessage().getHeaders().entrySet()) {
+                            String key = header.getKey();
+                            boolean matched = EndpointHelper.matchPattern(key, pattern);
+                            if (matched && !isExcluded(key)) {
+                                LOG.trace("Including: header:{}", key);
+                                oldExchange.getMessage().getHeaders().put(key, header.getValue());
+                            }
+                        }
+                    }
+                }
+            }
+        } else if (ObjectHelper.isNotEmpty(exclude)) {
+            // grab body unless its excluded
+            if (!isExcluded("body")) {
                 oldExchange.getMessage().setBody(newExchange.getMessage().getBody());
-            } else if ("headers".equals(part)) {
-                oldExchange.getMessage().getHeaders().putAll(newExchange.getMessage().getHeaders());
-            } else if (part.startsWith("header:")) {
-                // pattern matching for headers, eg header:foo, header:foo*, header:(foo|bar)
-                String after = StringHelper.after(part, "header:");
-                Iterable i = ObjectHelper.createIterable(after, ",");
-                for (Object o : i) {
-                    String pattern = o.toString();
-                    for (Map.Entry<String, Object> header : newExchange.getMessage().getHeaders().entrySet()) {
-                        String key = header.getKey();
-                        if (EndpointHelper.matchPattern(key, pattern)) {
-                            oldExchange.getMessage().getHeaders().put(key, header.getValue());
+                LOG.trace("Including: body");
+            }
+
+            // if not all headers is excluded, then check each header one-by-one
+            if (!isExcluded("headers")) {
+                // check if we exclude a specific headers
+                Iterable it = ObjectHelper.createIterable(exclude, ",");
+                for (Object k : it) {
+                    String part = k.toString();
+                    if (part.startsWith("header:")) {
+                        // pattern matching for headers, eg header:foo, header:foo*, header:(foo|bar)
+                        String after = StringHelper.after(part, "header:");
+                        Iterable i = ObjectHelper.createIterable(after, ",");
+                        for (Object o : i) {
+                            String pattern = o.toString();
+                            for (Map.Entry<String, Object> header : newExchange.getMessage().getHeaders().entrySet()) {
+                                String key = header.getKey();
+                                boolean excluded = EndpointHelper.matchPattern(key, pattern);
+                                if (!excluded) {
+                                    LOG.trace("Including: header:{}", key);
+                                    oldExchange.getMessage().getHeaders().put(key, header.getValue());
+                                } else {
+                                    LOG.trace("Excluding: header:{}", key);
+                                }
+                            }
                         }
                     }
                 }
@@ -89,4 +142,21 @@ public class ClaimCheckAggregationStrategy implements AggregationStrategy {
 
         return oldExchange;
     }
+
+    private boolean isExcluded(String key) {
+        if (ObjectHelper.isEmpty(exclude)) {
+            return false;
+        }
+        String[] excludes = exclude.split(",");
+        for (String pattern : excludes) {
+            if (pattern.startsWith("header:")) {
+                pattern = StringHelper.after(pattern, "header:");
+            }
+            if (EndpointHelper.matchPattern(key, pattern)) {
+                LOG.trace("Excluding: {}", key);
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
index 137ecbf..2421878 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * The current Claim Check EIP implementation in Camel is only intended for temporary memory repository. Likewise
  * the repository is not shared among {@link Exchange}s, but a private instance is created per {@link Exchange}.
- * This guards against concurrent and thread-safe issues. For off-memeory persistent storage of data, then use
+ * This guards against concurrent and thread-safe issues. For off-memory persistent storage of data, then use
  * any of the many Camel components that support persistent storage, and do not use this Claim Check EIP implementation.
  */
 public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
@@ -50,6 +50,7 @@ public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcesso
     private AggregationStrategy aggregationStrategy;
     private String key;
     private String include;
+    private String exclude;
 
     @Override
     public CamelContext getCamelContext() {
@@ -103,6 +104,14 @@ public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcesso
         this.include = include;
     }
 
+    public String getExclude() {
+        return exclude;
+    }
+
+    public void setExclude(String exclude) {
+        this.exclude = exclude;
+    }
+
     public void process(Exchange exchange) throws Exception {
         AsyncProcessorHelper.process(this, exchange);
     }
@@ -198,6 +207,7 @@ public class ClaimCheckProcessor extends ServiceSupport implements AsyncProcesso
     protected AggregationStrategy createAggregationStrategy() {
         ClaimCheckAggregationStrategy answer = new ClaimCheckAggregationStrategy();
         answer.setInclude(include);
+        answer.setExclude(exclude);
         return answer;
     }
 }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopExcludeBodyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopExcludeBodyTest.java
new file mode 100644
index 0000000..6ddbb29
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ClaimCheckEipPushPopExcludeBodyTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ClaimCheckOperation;
+
+public class ClaimCheckEipPushPopExcludeBodyTest extends ContextTestSupport {
+
+    public void testPushPopBodyExclude() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:a").expectedHeaderReceived("foo", 123);
+        getMockEndpoint("mock:a").expectedHeaderReceived("bar", "Moes");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:b").expectedHeaderReceived("foo", 456);
+        getMockEndpoint("mock:b").expectedHeaderReceived("bar", "Jacks");
+        getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:c").expectedHeaderReceived("foo", 123);
+        getMockEndpoint("mock:c").expectedHeaderReceived("bar", "Jacks");
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "foo", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .setHeader("bar", constant("Moes"))
+                    .to("mock:a")
+                    .claimCheck(ClaimCheckOperation.Push)
+                    .transform().constant("Bye World")
+                    .setHeader("foo", constant(456))
+                    .setHeader("bar", constant("Jacks"))
+                    .to("mock:b")
+                    // skip the foo header
+                    .claimCheck(ClaimCheckOperation.Pop, null, null, "header:bar")
+                    .to("mock:c");
+            }
+        };
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
davsclaus@apache.org.