You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/02/14 12:43:22 UTC

[camel] branch master updated: CAMEL-13119 - Create Cache Policy for routes

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e343fa  CAMEL-13119 - Create Cache Policy for routes
8e343fa is described below

commit 8e343fa854449ecde67a15a8996af88b2b4f874e
Author: bszeti <bs...@redhat.com>
AuthorDate: Sun Feb 3 17:43:05 2019 -0500

    CAMEL-13119 - Create Cache Policy for routes
---
 .../camel-jcache/src/main/docs/jcache-policy.adoc  | 134 ++++++++++
 .../component/jcache/policy/JCachePolicy.java      | 169 ++++++++++++
 .../jcache/policy/JCachePolicyProcessor.java       | 130 +++++++++
 .../policy/CacheManagerFromRegistryTest.java       |  90 +++++++
 .../jcache/policy/JCachePolicyProcessorTest.java   | 294 +++++++++++++++++++++
 .../component/jcache/policy/JCachePolicyTest.java  | 231 ++++++++++++++++
 .../jcache/policy/JCachePolicyTestBase.java        |  63 +++++
 7 files changed, 1111 insertions(+)

diff --git a/components/camel-jcache/src/main/docs/jcache-policy.adoc b/components/camel-jcache/src/main/docs/jcache-policy.adoc
new file mode 100644
index 0000000..a7846e8
--- /dev/null
+++ b/components/camel-jcache/src/main/docs/jcache-policy.adoc
@@ -0,0 +1,134 @@
+[[jcache-policy]]
+== JCache Policy
+
+The JCachePolicy is an interceptor around a route that caches the "result of the route" - the message body - after the route is completed.
+  If next time the route is called with a "similar" Exchange, the cached value is used on the Exchange instead of executing the route.
+  The policy uses the JSR107/JCache API of a cache implementation.
+
+The policy takes a _key_ value from the received Exchange to get or store values in the cache. By default the _key_ is the message body.
+  For example if the route - having a JCachePolicy - receives an Exchange with a String body "fruit" and the body at the
+  end of the route is "apple", it stores a _key/value_ pair "fruit=apple" in the cache. If next time another Exchange arrives
+  with a body "fruit", the value "apple" is taken from the cache instead of letting the route process the Exchange.
+
+So by default the message body at the beginning of the route is the cache _key_ and the body at the end is the stored _value_. It's possible to use something else as _key_ by setting a Camel Expression via _.setKeyExpression()_
+that will be used to determine the key.
+
+The policy needs a JCache Cache. It can be set directly by _.setCache()_ or the policy will try to get or create the Cache
+  based on the other parameters set.
+
+Similar caching solution is available for example in Spring using the @Cacheable annotation.
+
+=== JCachePolicy Fields
+
+
+[width="100%",cols="2,5,3,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *cache* | The Cache to use to store the cached values. If this value is set, _cacheManager_, _cacheName_ and _cacheConfiguration_ is ignored. |  | Cache
+| *cacheManager* | The CacheManager to use to lookup or create the Cache. Used only if _cache_ is not set. | Try to find a CacheManager in CamelContext registry or calls the standard JCache _Caching.getCachingProvider().getCacheManager()_. | CacheManager
+| *cacheName* | Name of the cache. Get the Cache from cacheManager or create a new one if it doesn't exist. | RouteId of the route. | String
+| *cacheConfiguration* | JCache cache configuration to use if a new Cache is created | Default new _MutableConfiguration_ object. | CacheConfiguration
+| *keyExpression* | An Expression to evaluate to determine the cache key. | Exchange body | Expression
+| *enabled* | If policy is not enabled, no wrapper processor is added to the route. It has impact only during startup, not during runtime. For example it can be used to disable caching from properties. | true | boolean
+|===
+
+## How to determine cache to use?
+
+
+### Set cache
+
+The cache used by the policy can be set directly. This means you have to configure the cache yourself and get a JCache Cache object,
+ but this gives the most flexibility. For example it can be setup in the config xml of the cache provider (Hazelcast, EhCache, ...)
+ and used here. Or it's possible to use the standard Caching API as below:
+
+
+[source,java]
+----------------------------
+MutableConfiguration configuration = new MutableConfiguration<>();
+configuration.setTypes(String.class, Object.class);
+configuration.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MINUTES, 60)));
+CacheManager cacheManager = Caching.getCachingProvider().getCacheManager();
+Cache cache = cacheManager.createCache("orders",configuration);
+
+JCachePolicy jcachePolicy = new JCachePolicy();
+jcachePolicy.setCache(cache);
+
+from("direct:get-orders")
+    .policy(jcachePolicy)
+    .log("Getting order with id: ${body}")
+    .bean(OrderService.class,"findOrderById(${body})");
+----------------------------
+
+### Set cacheManager
+
+If the _cache_ is not set, the policy will try to lookup or create the cache automatically.
+ If the _cacheManager_ is set on the policy, it will try to get cache with the set _cacheName_ (routeId by default) from the CacheManager.
+ Is the cache does not exist it will create a new one using the _cacheConfiguration_ (new MutableConfiguration by default).
+
+[source,java]
+----------------------------
+//In a Spring environment for example the CacheManager may already exist as a bean
+@Autowire
+CacheManager cacheManager;
+...
+
+//Cache "items" is used or created if not exists
+JCachePolicy jcachePolicy = new JCachePolicy();
+jcachePolicy.setCacheManager(cacheManager);
+jcachePolicy.setCacheName("items")
+----------------------------
+
+### Find cacheManager
+
+If _cacheManager_ (and the _cache_) is not set, the policy will try to find a JCache CacheManager object:
+
+* Lookup a CacheManager in Camel registry - that falls back on JNDI or Spring context based on the environment
+* Use the standard api _Caching.getCachingProvider().getCacheManager()_
+
+[source,java]
+----------------------------
+//A Cache "getorders" will be used (or created) from the found CacheManager
+from("direct:get-orders").routeId("getorders")
+    .policy(new JCachePolicy())
+    .log("Getting order with id: ${body}")
+    .bean(OrderService.class,"findOrderById(${body})");
+----------------------------
+
+## KeyExpression
+
+By default the policy uses the received Exchange body as _key_, so the default expression is like _simple("${body})_.
+ We can set a different Camel Expression as _keyExpression_ which will be evaluated to determine the key.
+ For example if we try to find an _order_ by an _orderId_ which is in the message headers,
+ set _header("orderId")_ (or _simple("${header.orderId})_ as _keyExpression_.
+
+The expression is evaluated only once at the beginning of the route to determine the _key_. If nothing was found in cache,
+ this _key_ is used to store the _value_ in cache at the end of the route.
+
+[source,java]
+----------------------------
+MutableConfiguration configuration = new MutableConfiguration<>();
+configuration.setTypes(String.class, Order.class);
+configuration.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MINUTES, 10)));
+
+JCachePolicy jcachePolicy = new JCachePolicy();
+jcachePolicy.setCacheConfiguration(configuration);
+jcachePolicy.setCacheName("orders")
+jcachePolicy.setKeyExpression(simple("${header.orderId}))
+
+//The cache key is taken from "orderId" header.
+from("direct:get-orders")
+    .policy(jcachePolicy)
+    .log("Getting order with id: ${header.orderId}")
+    .bean(OrderService.class,"findOrderById(${header.orderId})");
+----------------------------
+
+## Special scenarios and error handling
+
+If the Cache used by the policy is closed (can be done dynamically), the whole caching functionality is skipped,
+ the route will be executed every time.
+
+If the determined _key_ is _null_, nothing is looked up or stored in cache.
+
+In case of an exception during the route, the error handled is called as always. If the exception gets _handled()_,
+ the policy stores the Exchange body, otherwise nothing is added to the cache.
+ If an exception happens during evaluating the keyExpression, the routing fails, the error handler is called as normally.
diff --git a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/policy/JCachePolicy.java b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/policy/JCachePolicy.java
new file mode 100644
index 0000000..cd04a47
--- /dev/null
+++ b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/policy/JCachePolicy.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcache.policy;
+
+import java.util.Set;
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.Configuration;
+import javax.cache.configuration.MutableConfiguration;
+
+import org.apache.camel.Expression;
+import org.apache.camel.NamedNode;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Policy;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Policy for routes. It caches the final body of a route and next time takes it from the cache instead of executing the route.
+ * The cache key is determined by the keyExpression (message body by default).
+ * If there is an object in the cache under that key the rest of the route is not executed, but the cached object is added to the Exchange.
+ *
+ * Fields:
+ * cache: JCache to use
+ * cacheManager: If cache is not set, a new one is get or created using this cacheManager. If cacheManager is not set, we try to lookup one from CamelContext.
+ * cacheName: Name of the cache to use or create. RouteId is used by default.
+ * cacheConfiguration: CacheConfiguration used if a new cache is created. Using default MutableConfiguration if not set.
+ * keyExpression: The Expression to generate the key for the cache. E.g simple("${header.username}")
+ * enabled: If JCachePolicy is not enabled, no policy is added to the route. Has an impact only during startup.
+ */
+public class JCachePolicy implements Policy {
+    private static final Logger LOG = LoggerFactory.getLogger(JCachePolicy.class);
+
+    private Cache cache;
+    private CacheManager cacheManager;
+    private String cacheName;
+    private Configuration cacheConfiguration;
+    private Expression keyExpression;
+    private boolean enabled = true;
+
+    @Override
+    public void beforeWrap(RouteContext routeContext, NamedNode namedNode) {
+
+    }
+
+    @Override
+    public Processor wrap(RouteContext routeContext, Processor processor) {
+        //Don't add JCachePolicyProcessor if JCachePolicy is disabled. This means enable/disable has impact only during startup
+        if (!isEnabled()) {
+            return processor;
+        }
+
+        Cache cache = this.cache;
+        if (cache == null) {
+            //Create cache based on given configuration
+
+            //Find CacheManager
+            CacheManager cacheManager = this.cacheManager;
+
+            //Lookup CacheManager from CamelContext if it's not set
+            if (cacheManager == null) {
+                Set<CacheManager> lookupResult = routeContext.getCamelContext().getRegistry().findByType(CacheManager.class);
+                if (ObjectHelper.isNotEmpty(lookupResult)) {
+
+                    //Use the first cache manager found
+                    cacheManager = lookupResult.iterator().next();
+                    LOG.debug("CacheManager from CamelContext registry: {}", cacheManager);
+                }
+            }
+
+            //Lookup CacheManager the standard way
+            if (cacheManager == null) {
+                cacheManager = Caching.getCachingProvider().getCacheManager();
+                LOG.debug("CacheManager from CachingProvider: {}", cacheManager);
+            }
+
+            //Use routeId as cacheName if it's not set
+            String cacheName = ObjectHelper.isNotEmpty(this.cacheName) ? this.cacheName : routeContext.getRoute().getId();
+            LOG.debug("Getting cache:{}", cacheName);
+
+            //Get cache or create a new one using the cacheConfiguration
+            cache = cacheManager.getCache(cacheName);
+            if (cache == null) {
+                LOG.debug("Create cache:{}", cacheName);
+                cache = cacheManager.createCache(cacheName,
+                        cacheConfiguration != null ? this.cacheConfiguration : (Configuration) new MutableConfiguration());
+            }
+
+        }
+
+        //Create processor
+        return new JCachePolicyProcessor(cache, keyExpression, processor);
+
+
+    }
+
+    public Cache getCache() {
+        return cache;
+    }
+
+    public void setCache(Cache cache) {
+        this.cache = cache;
+    }
+
+    public CacheManager getCacheManager() {
+        return cacheManager;
+    }
+
+    public void setCacheManager(CacheManager cacheManager) {
+        this.cacheManager = cacheManager;
+    }
+
+    public Configuration getCacheConfiguration() {
+        return cacheConfiguration;
+    }
+
+    public void setCacheConfiguration(Configuration cacheConfiguration) {
+        this.cacheConfiguration = cacheConfiguration;
+    }
+
+    public String getCacheName() {
+        return cacheName;
+    }
+
+    public void setCacheName(String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    public Expression getKeyExpression() {
+        return keyExpression;
+    }
+
+    public void setKeyExpression(Expression keyExpression) {
+        this.keyExpression = keyExpression;
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    public void setEnabled(boolean enabled) {
+        this.enabled = enabled;
+    }
+
+    @Override
+    public String toString() {
+        return "JCachePolicy{"
+                + "keyExpression=" + keyExpression
+                + ", enabled=" + enabled
+                + '}';
+    }
+}
diff --git a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/policy/JCachePolicyProcessor.java b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/policy/JCachePolicyProcessor.java
new file mode 100644
index 0000000..78ea40a
--- /dev/null
+++ b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/policy/JCachePolicyProcessor.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcache.policy;
+
+import javax.cache.Cache;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Processor;
+import org.apache.camel.support.processor.DelegateAsyncProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JCachePolicyProcessor extends DelegateAsyncProcessor {
+    private static final Logger LOG = LoggerFactory.getLogger(JCachePolicyProcessor.class);
+
+
+    private Cache cache;
+    private Expression keyExpression;
+
+    public JCachePolicyProcessor(Cache cache, Expression keyExpression, Processor processor) {
+        super(processor);
+        this.cache = cache;
+        this.keyExpression = keyExpression;
+    }
+
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        LOG.debug("JCachePolicy process started - cache:{}, exchange:{}", cache.getName(), exchange.getExchangeId());
+
+        //If cache is closed, just continue
+        if (cache.isClosed()) {
+            return super.process(exchange, callback);
+        }
+
+
+        try {
+            //Get key by the expression or use message body
+            Object key = keyExpression != null ? keyExpression.evaluate(exchange, Object.class) : exchange.getMessage().getBody();
+
+            if (key == null) {
+                return super.process(exchange, callback);
+            }
+
+            //Check if cache contains the key
+            Object value = cache.get(key);
+            if (value != null) {
+                // use the cached object in the Exchange without calling the rest of the route
+                LOG.debug("Cached object is found, skipping the route - key:{}, exchange:{}", key, exchange.getExchangeId());
+
+                exchange.getMessage().setBody(value);
+
+                callback.done(true);
+                return true;
+            }
+
+
+            //Not found in cache. Continue route.
+            LOG.debug("No cached object is found, continue route - key:{}, exchange:{}", key, exchange.getExchangeId());
+
+            return super.process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    try {
+                        if (!exchange.isFailed()) {
+                            //Save body in cache after successfully executing the route
+                            Object value = exchange.getMessage().getBody();
+
+                            if (value != null) {
+                                LOG.debug("Saving in cache - key:{}, value:{}, exchange:{}", key, value, exchange.getExchangeId());
+                                cache.put(key, value);
+                            }
+                        }
+                    } catch (Exception ex) {
+                        //Log exception, but a problem with caching should not fail the exchange
+                        LOG.error("Error storing in cache. - key:{}, exchange:{}", key, exchange.getExchangeId(), ex);
+                    } finally {
+                        callback.done(doneSync);
+                    }
+                }
+            });
+
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        //Clear cache if stopping.
+        if (!cache.isClosed()) {
+            cache.clear();
+        }
+        super.doStop();
+    }
+
+    public Cache getCache() {
+        return cache;
+    }
+
+    public void setCache(Cache cache) {
+        this.cache = cache;
+    }
+
+    public Expression getKeyExpression() {
+        return keyExpression;
+    }
+
+    public void setKeyExpression(Expression keyExpression) {
+        this.keyExpression = keyExpression;
+    }
+}
diff --git a/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/CacheManagerFromRegistryTest.java b/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/CacheManagerFromRegistryTest.java
new file mode 100644
index 0000000..0183941
--- /dev/null
+++ b/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/CacheManagerFromRegistryTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcache.policy;
+
+import java.net.URI;
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+
+import com.hazelcast.instance.HazelcastInstanceFactory;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.After;
+import org.junit.Test;
+
+//This test requires a registered CacheManager, but the others do not.
+public class CacheManagerFromRegistryTest extends JCachePolicyTestBase {
+
+    //Register cacheManager in CamelContext. Set cacheName
+    @Test
+    public void testCacheManagerFromContext() throws Exception {
+        final String key = randomString();
+
+        //Send exchange
+        Object responseBody = this.template().requestBody("direct:policy-context-manager", key);
+
+        //Verify the cacheManager "hzsecond" registered in the CamelContext was used
+        assertNull(lookupCache("contextCacheManager"));
+        CacheManager cacheManager = Caching.getCachingProvider().getCacheManager(URI.create("hzsecond"), null);
+        Cache cache = cacheManager.getCache("contextCacheManager");
+
+        assertEquals(generateValue(key), cache.get(key));
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(1, getMockEndpoint("mock:value").getExchanges().size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                //Use the cacheManager registered in CamelContext. See createRegistry(). Set cacheName
+                //During the test JndiRegistry is used, so we add the cacheManager to JNDI. In Spring context a bean works.
+                JCachePolicy jcachePolicy = new JCachePolicy();
+                jcachePolicy.setCacheName("contextCacheManager");
+
+                from("direct:policy-context-manager")
+                        .policy(jcachePolicy)
+                        .to("mock:value");
+            }
+        };
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+
+        //Register another CacheManager in registry
+        registry.bind("cachemanager-hzsecond", Caching.getCachingProvider().getCacheManager(URI.create("hzsecond"), null));
+
+        return registry;
+    }
+
+    @After
+    public void after() {
+        super.after();
+        CacheManager cacheManager = Caching.getCachingProvider().getCacheManager(URI.create("hzsecond"), null);
+        cacheManager.getCacheNames().forEach((s) -> cacheManager.destroyCache(s));
+        Caching.getCachingProvider().close(URI.create("hzsecond"), null);
+
+        //We need to shutdown the second instance using the Hazelcast api. close(URI,ClassLoader) doesn't do that.
+        HazelcastInstanceFactory.getHazelcastInstance("hzsecond").shutdown();
+    }
+
+}
diff --git a/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/JCachePolicyProcessorTest.java b/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/JCachePolicyProcessorTest.java
new file mode 100644
index 0000000..fbcf74d
--- /dev/null
+++ b/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/JCachePolicyProcessorTest.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcache.policy;
+
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.MutableConfiguration;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.language.simple.types.SimpleIllegalSyntaxException;
+import org.junit.Test;
+
+public class JCachePolicyProcessorTest extends JCachePolicyTestBase {
+
+    //Basic test to verify value gets cached and route is not executed for the second time
+    @Test
+    public void testValueGetsCached() throws Exception {
+        final String key = randomString();
+        MockEndpoint mock = getMockEndpoint("mock:value");
+        Cache cache = lookupCache("simple");
+
+        //Send first, key is not in cache
+        Object responseBody = this.template().requestBody("direct:cached-simple", key);
+
+        //We got back the value, mock was called once, value got cached.
+        assertEquals(generateValue(key), cache.get(key));
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(1, mock.getExchanges().size());
+
+        //Send again, key is already in cache
+        responseBody = this.template().requestBody("direct:cached-simple", key);
+
+        //We got back the stored value, but the mock was not called again
+        assertEquals(generateValue(key), cache.get(key));
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(1, mock.getExchanges().size());
+
+    }
+
+    //Cache is closed
+    @Test
+    public void testClosedCache() throws Exception {
+        final String key = randomString();
+        MockEndpoint mock = getMockEndpoint("mock:value");
+
+        //Send first, key is not in cache
+        Object responseBody = this.template().requestBody("direct:cached-closed", key);
+
+        //We got back the value, mock was called once
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(1, mock.getExchanges().size());
+
+        //Send again, cache is closed
+        responseBody = this.template().requestBody("direct:cached-closed", key);
+
+        //We got back the stored value, mock was called again
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(2, mock.getExchanges().size());
+
+    }
+
+    //Key is already stored
+    @Test
+    public void testValueWasCached() throws Exception {
+        final String key = randomString();
+        final String value = "test";
+        MockEndpoint mock = getMockEndpoint("mock:value");
+
+        //Prestore value in cache
+        Cache cache = lookupCache("simple");
+        cache.put(key, value);
+
+        //Send first, key is already in cache
+        Object responseBody = this.template().requestBody("direct:cached-simple", key);
+
+        //We got back the value, mock was not called, cache was not modified
+        assertEquals(value, cache.get(key));
+        assertEquals(value, responseBody);
+        assertEquals(0, mock.getExchanges().size());
+    }
+
+    //Null final body
+    @Test
+    public void testNullResult() throws Exception {
+        final String key = randomString();
+        MockEndpoint mock = getMockEndpoint("mock:value");
+        mock.whenAnyExchangeReceived((e) -> e.getMessage().setBody(null));
+
+        //Send first
+        this.template().requestBody("direct:cached-simple", key);
+
+        assertEquals(1, mock.getExchanges().size());
+
+        //Send again, nothing was cached
+        this.template().requestBody("direct:cached-simple", key);
+        assertEquals(2, mock.getExchanges().size());
+
+    }
+
+    //Use a key expression ${header.mykey}
+    @Test
+    public void testKeyExpression() throws Exception {
+        final String key = randomString();
+        final String body = randomString();
+        MockEndpoint mock = getMockEndpoint("mock:value");
+        Cache cache = lookupCache("simple");
+
+        //Send first, key is not in cache
+        Object responseBody = this.template().requestBodyAndHeader("direct:cached-byheader", body, "mykey", key);
+
+        //We got back the value, mock was called once, value got cached.
+        assertEquals(generateValue(body), cache.get(key));
+        assertEquals(generateValue(body), responseBody);
+        assertEquals(1, mock.getExchanges().size());
+
+        //Send again, use another body, but the same key
+        responseBody = this.template().requestBodyAndHeader("direct:cached-byheader", randomString(), "mykey", key);
+
+        //We got back the stored value, and the mock was not called again
+        assertEquals(generateValue(body), cache.get(key));
+        assertEquals(generateValue(body), responseBody);
+        assertEquals(1, mock.getExchanges().size());
+
+    }
+
+    //Key is null, ${header.mykey} is not set
+    @Test
+    public void testKeyNull() throws Exception {
+        final String key = randomString();
+        String body = randomString();
+        MockEndpoint mock = getMockEndpoint("mock:value");
+        Cache cache = lookupCache("simple");
+
+        //Send first, expected header is not set
+        Object responseBody = this.template().requestBody("direct:cached-byheader", body);
+
+        //We got back the value, mock was called once, nothing is cached.
+        assertFalse(cache.containsKey("null"));
+        assertFalse(cache.containsKey(""));
+        assertFalse(cache.containsKey(key));
+        assertEquals(generateValue(body), responseBody);
+        assertEquals(1, mock.getExchanges().size());
+
+        //Send again, use another body, but the same key
+        body = randomString();
+        responseBody = this.template().requestBody("direct:cached-byheader", body);
+
+        //We got back the value, mock was called again, nothing is cached
+        assertFalse(cache.containsKey("null"));
+        assertFalse(cache.containsKey(""));
+        assertFalse(cache.containsKey(key));
+        assertEquals(generateValue(body), responseBody);
+        assertEquals(2, mock.getExchanges().size());
+
+    }
+
+    //Use an invalid key expression causing an exception
+    @Test
+    public void testInvalidKeyExpression() throws Exception {
+        final String body = randomString();
+        MockEndpoint mock = getMockEndpoint("mock:value");
+        Cache cache = lookupCache("simple");
+
+        //Send
+        Exchange response = this.template().request("direct:cached-invalidkey",
+            (e) -> e.getMessage().setBody(body));
+
+        //Exception is on the exchange, cache is empty, onException was called.
+        assertIsInstanceOf(SimpleIllegalSyntaxException.class, response.getException().getCause());
+        assertEquals("exception-" + body, response.getMessage().getBody());
+        assertEquals(0, mock.getExchanges().size());
+        assertFalse(cache.iterator().hasNext());
+
+    }
+
+    //Value is cached after handled exception
+    @Test
+    public void testHandledException() throws Exception {
+        final String key = randomString();
+        MockEndpoint mock = getMockEndpoint("mock:value");
+        Cache cache = lookupCache("simple");
+
+        //Send first, key is not in cache
+        Object responseBody = this.template().requestBody("direct:cached-exception", key);
+
+        //We got back the value after exception handler, mock was called once, value got cached.
+        assertEquals("handled-" + generateValue(key), cache.get(key));
+        assertEquals("handled-" + generateValue(key), responseBody);
+        assertEquals(1, mock.getExchanges().size());
+
+    }
+
+    //Nothing is cached after an unhandled exception
+    @Test
+    public void testException() throws Exception {
+        final String key = randomString();
+        MockEndpoint mock = getMockEndpoint("mock:value");
+        mock.whenAnyExchangeReceived((e) -> {
+            throw new RuntimeException("unexpected");
+        });
+        Cache cache = lookupCache("simple");
+
+        //Send
+        Exchange response = this.template().request("direct:cached-exception",
+            (e) -> e.getMessage().setBody(key));
+
+        //Exception is on the exchange, cache is empty
+        assertEquals("unexpected", response.getException().getMessage());
+        assertEquals(1, mock.getExchanges().size());
+        assertFalse(cache.iterator().hasNext());
+
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                CacheManager cacheManager = Caching.getCachingProvider().getCacheManager();
+
+                //Simple cache - with default config
+                Cache cache = cacheManager.createCache("simple", new MutableConfiguration<>());
+                JCachePolicy jcachePolicy = new JCachePolicy();
+                jcachePolicy.setCache(cache);
+
+                from("direct:cached-simple")
+                    .policy(jcachePolicy)
+                    .to("mock:value");
+
+                //Cache after exception handling
+                from("direct:cached-exception")
+                    .onException(Exception.class)
+                    .onWhen(exceptionMessage().isEqualTo("test"))
+                    .handled(true)
+                    .setBody(simple("handled-${body}"))
+                    .end()
+
+                    .policy(jcachePolicy)
+                    .to("mock:value")
+                    .throwException(new Exception("test"));
+
+                //Closed cache
+                cache = cacheManager.createCache("closed", new MutableConfiguration<>());
+                cache.close();
+                jcachePolicy = new JCachePolicy();
+                jcachePolicy.setCache(cache);
+
+                from("direct:cached-closed")
+                    .policy(jcachePolicy)
+                    .to("mock:value");
+
+
+                //Use ${header.mykey} as the key
+                jcachePolicy = new JCachePolicy();
+                jcachePolicy.setCache(cacheManager.getCache("simple"));
+                jcachePolicy.setKeyExpression(simple("${header.mykey}"));
+
+                from("direct:cached-byheader")
+                    .policy(jcachePolicy)
+                    .to("mock:value");
+
+                //Use an invalid keyExpression
+                jcachePolicy = new JCachePolicy();
+                jcachePolicy.setCache(cacheManager.getCache("simple"));
+                jcachePolicy.setKeyExpression(simple("${unexpected}"));
+
+                from("direct:cached-invalidkey")
+                    .onException(Exception.class)
+                    .setBody(simple("exception-${body}"))
+                    .end()
+
+                    .policy(jcachePolicy)
+                    .to("mock:value");
+            }
+        };
+    }
+}
diff --git a/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/JCachePolicyTest.java b/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/JCachePolicyTest.java
new file mode 100644
index 0000000..edc8d2e
--- /dev/null
+++ b/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/JCachePolicyTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcache.policy;
+
+import java.util.concurrent.TimeUnit;
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.CompleteConfiguration;
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class JCachePolicyTest extends JCachePolicyTestBase {
+
+    //Set cache - this use cases is also covered by tests in JCachePolicyProcessorTest
+    @Test
+    public void testSetCache() throws Exception {
+        final String key = randomString();
+
+        //Send exchange
+        Object responseBody = this.template().requestBody("direct:policy-cache", key);
+
+        //Verify the set cache was used
+        assertEquals(generateValue(key), lookupCache("setCache").get(key));
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(1, getMockEndpoint("mock:value").getExchanges().size());
+    }
+
+    //Set cacheManager, cacheName, cacheConfiguration
+    @Test
+    public void testSetManagerNameConfiguration() throws Exception {
+        final String key = randomString();
+
+        //Send exchange
+        Object responseBody = this.template().requestBody("direct:policy-manager-name-configuration", key);
+
+        //Verify cache was created with the set configuration and used in route
+        Cache cache = lookupCache("setManagerNameConfiguration");
+        CompleteConfiguration completeConfiguration = (CompleteConfiguration) cache.getConfiguration(CompleteConfiguration.class);
+        assertEquals(String.class, completeConfiguration.getKeyType());
+        assertEquals(String.class, completeConfiguration.getValueType());
+        assertEquals(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MINUTES, 60)), completeConfiguration.getExpiryPolicyFactory());
+        assertEquals(generateValue(key), cache.get(key));
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(1, getMockEndpoint("mock:value").getExchanges().size());
+    }
+
+    //Set cacheManager, cacheName
+    @Test
+    public void testSetManagerName() throws Exception {
+        final String key = randomString();
+
+        //Send exchange
+        Object responseBody = this.template().requestBody("direct:policy-manager-name", key);
+
+        //Verify the cache was created with the name and used
+        assertEquals(generateValue(key), lookupCache("setManagerName").get(key));
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(1, getMockEndpoint("mock:value").getExchanges().size());
+    }
+
+    //Set cacheManager, cacheName - cache already exists
+    @Test
+    public void testSetManagerNameExists() throws Exception {
+        final String key = randomString();
+
+        //Send exchange
+        Object responseBody = this.template().requestBody("direct:policy-manager-name-exists", key);
+
+        //Verify the existing cache with name was used
+        assertEquals(generateValue(key), lookupCache("setManagerNameExists").get(key));
+        assertEquals("dummy", lookupCache("setManagerNameExists").get("test"));
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(1, getMockEndpoint("mock:value").getExchanges().size());
+    }
+
+    //Set cacheManager, cacheConfiguration
+    @Test
+    public void testSetManagerConfiguration() throws Exception {
+        final String key = randomString();
+
+        //Send exchange
+        Object responseBody = this.template().requestBody("direct:policy-manager-configuration", key);
+
+        //Verify the cache was created with routeId and configuration
+        Cache cache = lookupCache("direct-policy-manager-configuration");
+        CompleteConfiguration completeConfiguration = (CompleteConfiguration) cache.getConfiguration(CompleteConfiguration.class);
+        assertEquals(String.class, completeConfiguration.getKeyType());
+        assertEquals(String.class, completeConfiguration.getValueType());
+        assertEquals(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MINUTES, 61)), completeConfiguration.getExpiryPolicyFactory());
+
+        assertEquals(generateValue(key), cache.get(key));
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(1, getMockEndpoint("mock:value").getExchanges().size());
+    }
+
+    //Set cacheName - use CachingProvider to lookup CacheManager
+    @Test
+    public void testDefaultCacheManager() throws Exception {
+        final String key = randomString();
+
+        //Send exchange
+        Object responseBody = this.template().requestBody("direct:policy-default-manager", key);
+
+        //Verify the default cacheManager was used, despite it was not set
+        Cache cache = lookupCache("contextCacheManager");
+
+        assertEquals(generateValue(key), cache.get(key));
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(1, getMockEndpoint("mock:value").getExchanges().size());
+    }
+
+    //Not enabled
+    @Test
+    public void testNotEnabled() throws Exception {
+        final String key = randomString();
+
+        //Send exchange
+        Object responseBody = this.template().requestBody("direct:policy-not-enabled", key);
+
+        //Verify the default cacheManager was used, despite it was not set
+        Cache cache = lookupCache("notEnabled");
+
+        assertNull(cache.get(key));
+        assertEquals(generateValue(key), responseBody);
+        assertEquals(1, getMockEndpoint("mock:value").getExchanges().size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                CacheManager cacheManager = Caching.getCachingProvider().getCacheManager();
+                MutableConfiguration configuration;
+
+                //Set cache
+                Cache cache = cacheManager.createCache("setCache", new MutableConfiguration<>());
+                JCachePolicy jcachePolicy = new JCachePolicy();
+                jcachePolicy.setCache(cache);
+
+                from("direct:policy-cache")
+                        .policy(jcachePolicy)
+                        .to("mock:value");
+
+                //Set cacheManager, cacheName, cacheConfiguration
+                configuration = new MutableConfiguration<>();
+                configuration.setTypes(String.class, String.class);
+                configuration.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MINUTES, 60)));
+                jcachePolicy = new JCachePolicy();
+                jcachePolicy.setCacheManager(cacheManager);
+                jcachePolicy.setCacheName("setManagerNameConfiguration");
+                jcachePolicy.setCacheConfiguration(configuration);
+
+                from("direct:policy-manager-name-configuration")
+                        .policy(jcachePolicy)
+                        .to("mock:value");
+
+                //Set cacheManager, cacheName
+                jcachePolicy = new JCachePolicy();
+                jcachePolicy.setCacheManager(cacheManager);
+                jcachePolicy.setCacheName("setManagerName");
+
+                from("direct:policy-manager-name")
+                        .policy(jcachePolicy)
+                        .to("mock:value");
+
+                //Set cacheManager, cacheName - cache already exists
+                cache = cacheManager.createCache("setManagerNameExists", new MutableConfiguration<>());
+                cache.put("test", "dummy");
+                jcachePolicy = new JCachePolicy();
+                jcachePolicy.setCacheManager(cacheManager);
+                jcachePolicy.setCacheName("setManagerNameExists");
+
+                from("direct:policy-manager-name-exists")
+                        .policy(jcachePolicy)
+                        .to("mock:value");
+
+                //Set cacheManager, cacheConfiguration
+                configuration = new MutableConfiguration<>();
+                configuration.setTypes(String.class, String.class);
+                configuration.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MINUTES, 61)));
+                jcachePolicy = new JCachePolicy();
+                jcachePolicy.setCacheManager(cacheManager);
+                jcachePolicy.setCacheConfiguration(configuration);
+
+                from("direct:policy-manager-configuration").routeId("direct-policy-manager-configuration")
+                        .policy(jcachePolicy)
+                        .to("mock:value");
+
+                //Set cacheName - use CachingProvider to lookup CacheManager
+                jcachePolicy = new JCachePolicy();
+                jcachePolicy.setCacheName("contextCacheManager");
+
+                from("direct:policy-default-manager")
+                        .policy(jcachePolicy)
+                        .to("mock:value");
+
+                //Not enabled
+                jcachePolicy = new JCachePolicy();
+                cache = cacheManager.createCache("notEnabled", new MutableConfiguration<>());
+                jcachePolicy.setCache(cache);
+                jcachePolicy.setEnabled(false);
+
+                from("direct:policy-not-enabled")
+                        .policy(jcachePolicy)
+                        .to("mock:value");
+            }
+        };
+    }
+
+
+}
diff --git a/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/JCachePolicyTestBase.java b/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/JCachePolicyTestBase.java
new file mode 100644
index 0000000..194239c
--- /dev/null
+++ b/components/camel-jcache/src/test/java/org/apache/camel/component/jcache/policy/JCachePolicyTestBase.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jcache.policy;
+
+import java.util.UUID;
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+
+
+
+
+public class JCachePolicyTestBase extends CamelTestSupport {
+
+    @Before
+    public void before() {
+        //reset mock
+        MockEndpoint mock = getMockEndpoint("mock:value");
+        mock.reset();
+        mock.whenAnyExchangeReceived((e) ->
+                e.getMessage().setBody(generateValue(e.getMessage().getBody(String.class))));
+    }
+
+    protected String randomString() {
+        return UUID.randomUUID().toString();
+    }
+
+    protected Cache lookupCache(String cacheName) {
+        //This will also open a closed cache
+        return Caching.getCachingProvider().getCacheManager().getCache(cacheName);
+    }
+
+    public static String generateValue(String key) {
+        return "value-" + key;
+    }
+
+    @After
+    public void after() {
+        //The RouteBuilder code is called for every test, so we destroy cache after each test
+        CacheManager cacheManager = Caching.getCachingProvider().getCacheManager();
+        cacheManager.getCacheNames().forEach((s) -> cacheManager.destroyCache(s));
+        Caching.getCachingProvider().close();
+    }
+}