You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/23 09:59:03 UTC

[1/2] camel git commit: CAMEL-9740 : Improve camel-infinispan

Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 2363157a8 -> 0de49d45a
  refs/heads/master ab388bf2e -> aaaea389d


CAMEL-9740 : Improve camel-infinispan


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/aaaea389
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/aaaea389
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/aaaea389

Branch: refs/heads/master
Commit: aaaea389d158682fd19b2f09a7ca406bbbcb6e3f
Parents: ab388bf
Author: lburgazzoli <lb...@gmail.com>
Authored: Fri Mar 11 10:48:26 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 23 09:55:07 2016 +0100

----------------------------------------------------------------------
 components/camel-infinispan/pom.xml             |   2 +
 .../infinispan/InfinispanConfiguration.java     |  44 ++-
 .../infinispan/InfinispanConstants.java         |   2 +
 .../infinispan/InfinispanConsumer.java          |  19 +-
 .../component/infinispan/InfinispanManager.java | 116 ++++++
 .../infinispan/InfinispanOperation.java         | 355 ++++++++++---------
 .../infinispan/InfinispanProducer.java          |  34 +-
 .../component/infinispan/InfinispanUtil.java    |  47 ++-
 .../InfinispanAsyncLocalEventListener.java      |   1 -
 .../InfinispanConsumerEmbeddedHandler.java      |   6 +-
 .../InfinispanSyncClusteredEventListener.java   |   9 +-
 .../InfinispanSyncLocalEventListener.java       |   7 +-
 .../InfinispanIdempotentRepository.java         |  17 +-
 .../remote/InfinispanRemoteOperation.java       |   6 +-
 .../InfinispanClusterTestSupport.java           |   6 +-
 .../InfinispanConfigurationTestIT.java          | 100 ++++++
 .../test/resources/infinispan/client.properties |  20 ++
 .../src/test/resources/log4j.xml                |  70 ++++
 18 files changed, 621 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/pom.xml b/components/camel-infinispan/pom.xml
index 7fcdb32..db95c94 100644
--- a/components/camel-infinispan/pom.xml
+++ b/components/camel-infinispan/pom.xml
@@ -254,6 +254,8 @@
                                             <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=static_filter_factory:add(configuration=default)</command>
                                             <!-- Separate cache for @ClientListener(includeCurrentState=true) -->
                                             <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=include_current_state:add(configuration=default)</command>
+                                            <!-- Separate cache for protobuf serialized objects. -->
+                                            <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=misc_cache:add(configuration=default)</command>
                                         </commands>
                                     </executeCommands>
                                 </configuration>

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
index 473a267..0559f84 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.ObjectHelper;
 import org.infinispan.commons.api.BasicCacheContainer;
+import org.infinispan.context.Flag;
 
 @UriParams
 public class InfinispanConfiguration {
@@ -55,6 +56,11 @@ public class InfinispanConfiguration {
     private boolean clusteredListener;
     @UriParam
     private InfinispanQueryBuilder queryBuilder;
+    @UriParam(label = "advanced", javaType = "java.lang.String")
+    private Flag[] flags;
+    @UriParam(label = "advanced")
+    private String configurationUri;
+
 
     public String getCommand() {
         return command;
@@ -149,7 +155,7 @@ public class InfinispanConfiguration {
      * TRANSACTION_REGISTERED, CACHE_ENTRY_INVALIDATED, DATA_REHASHED, TOPOLOGY_CHANGED, PARTITION_STATUS_CHANGED
      */
     public void setEventTypes(String eventTypes) {
-        this.eventTypes = new HashSet<String>(Arrays.asList(eventTypes.split(",")));
+        this.eventTypes = new HashSet<>(Arrays.asList(eventTypes.split(",")));
     }
 
     /**
@@ -181,4 +187,40 @@ public class InfinispanConfiguration {
     public boolean hasQueryBuilder() {
         return queryBuilder != null;
     }
+
+    public Flag[] getFlags() {
+        return flags;
+    }
+
+    /**
+     * A comma separated list of Flag to be applied by default on each cache
+     * invocation, not applicable to remote caches.
+     */
+    public void setFlags(String flagsAsString) {
+        String[] flagsArray = flagsAsString.split(",");
+        this.flags = new Flag[flagsArray.length];
+
+        for (int i = 0; i < flagsArray.length; i++) {
+            this.flags[i] = Flag.valueOf(flagsArray[i]);
+        }
+    }
+
+    public void setFlags(Flag... flags) {
+        this.flags = flags;
+    }
+
+    public boolean hasFlags() {
+        return flags != null && flags.length > 0;
+    }
+
+    /**
+     * An implementation specific URI for the CacheManager
+     */
+    public String getConfigurationUri() {
+        return configurationUri;
+    }
+
+    public void setConfigurationUri(String configurationUri) {
+        this.configurationUri = configurationUri;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
index 1a27fd7..d791968 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.infinispan;
 
 public interface InfinispanConstants {
+    int OPERATION_LEN = InfinispanConstants.OPERATION.length();
+
     String EVENT_TYPE = "CamelInfinispanEventType";
     String IS_PRE = "CamelInfinispanIsPre";
     String CACHE_NAME = "CamelInfinispanCacheName";

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
index b4db422..e60156a 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
@@ -25,7 +25,6 @@ import org.apache.camel.impl.DefaultConsumer;
 import org.infinispan.client.hotrod.RemoteCache;
 import org.infinispan.client.hotrod.Search;
 import org.infinispan.commons.api.BasicCache;
-import org.infinispan.commons.api.BasicCacheContainer;
 import org.infinispan.query.api.continuous.ContinuousQuery;
 import org.infinispan.query.api.continuous.ContinuousQueryListener;
 import org.infinispan.query.dsl.Query;
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
 public class InfinispanConsumer extends DefaultConsumer {
     private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class);
     private final InfinispanConfiguration configuration;
+    private final InfinispanManager manager;
     private InfinispanEventListener listener;
     private InfinispanConsumerHandler consumerHandler;
     private BasicCache<Object, Object> cache;
@@ -43,6 +43,7 @@ public class InfinispanConsumer extends DefaultConsumer {
     public InfinispanConsumer(InfinispanEndpoint endpoint, Processor processor, InfinispanConfiguration configuration) {
         super(endpoint, processor);
         this.configuration = configuration;
+        this.manager = new InfinispanManager(endpoint.getCamelContext(), configuration);
     }
 
     public void processEvent(String eventType, boolean isPre, String cacheName, Object key) {
@@ -69,10 +70,9 @@ public class InfinispanConsumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+        manager.start();
 
-        BasicCacheContainer cacheContainer = configuration.getCacheContainer();
-        cache = InfinispanUtil.getCache(configuration.getCacheContainer(), configuration.getCacheName());
-
+        cache = manager.getCache();
         if (configuration.hasQueryBuilder()) {
             if (InfinispanUtil.isRemote(cache)) {
                 RemoteCache<Object, Object> remoteCache = InfinispanUtil.asRemote(cache);
@@ -81,15 +81,17 @@ public class InfinispanConsumer extends DefaultConsumer {
                 continuousQuery = Search.getContinuousQuery(remoteCache);
                 continuousQuery.addContinuousQueryListener(query, new ContinuousQueryEventListener(cache.getName()));
             } else {
-                throw new IllegalArgumentException("Can't run continuous queries against embedded cache (" + cache.getName() + ")");
+                throw new IllegalArgumentException(
+                    "Can't run continuous queries against embedded cache (" + cache.getName() + ")");
             }
         } else {
-            if (InfinispanUtil.isEmbedded(cacheContainer)) {
+            if (manager.isCacheContainerEmbedded()) {
                 consumerHandler = InfinispanConsumerEmbeddedHandler.INSTANCE;
-            } else if (InfinispanUtil.isRemote(cacheContainer)) {
+            } else if (manager.isCacheContainerRemote()) {
                 consumerHandler = InfinispanConsumerRemoteHandler.INSTANCE;
             } else {
-                throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + cacheContainer);
+                throw new UnsupportedOperationException(
+                    "Unsupported CacheContainer type " + manager.getCacheContainer().getClass().getName());
             }
 
             listener = consumerHandler.start(this);
@@ -106,6 +108,7 @@ public class InfinispanConsumer extends DefaultConsumer {
             consumerHandler.stop(this);
         }
 
+        manager.stop();
         super.doStop();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
new file mode 100644
index 0000000..eab846b
--- /dev/null
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Service;
+import org.infinispan.cache.impl.DecoratedCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InfinispanManager implements Service {
+    private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanManager.class);
+
+    private final InfinispanConfiguration configuration;
+    private final CamelContext camelContext;
+    private BasicCacheContainer cacheContainer;
+    private boolean isManagedCacheContainer;
+
+
+    public InfinispanManager(InfinispanConfiguration configuration) {
+        this(null, configuration);
+    }
+
+    public InfinispanManager(CamelContext camelContext, InfinispanConfiguration configuration) {
+        this.camelContext = camelContext;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void start() throws Exception {
+        cacheContainer = configuration.getCacheContainer();
+        if (cacheContainer == null) {
+            String uri = configuration.getConfigurationUri();
+            if (uri != null && camelContext != null) {
+                uri = camelContext.resolvePropertyPlaceholders(uri);
+            }
+
+            ConfigurationBuilder configurationBuilder = new ConfigurationBuilder()
+                .classLoader(Thread.currentThread().getContextClassLoader());
+
+            if (uri != null) {
+                configurationBuilder.withProperties(InfinispanUtil.loadProperties(camelContext, uri));
+            }
+
+            cacheContainer = new RemoteCacheManager(
+                configurationBuilder
+                    .addServers(configuration.getHost())
+                    .build(),
+                true);
+
+            isManagedCacheContainer = true;
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (isManagedCacheContainer) {
+            cacheContainer.stop();
+        }
+    }
+
+    public BasicCacheContainer getCacheContainer() {
+        return cacheContainer;
+    }
+
+    public boolean isCacheContainerEmbedded() {
+        return InfinispanUtil.isEmbedded(cacheContainer);
+    }
+
+    public boolean isCacheContainerRemote() {
+        return InfinispanUtil.isRemote(cacheContainer);
+    }
+
+    public BasicCache<Object, Object> getCache() {
+        return getCache(configuration.getCacheName());
+    }
+
+    public BasicCache<Object, Object> getCache(Exchange exchange) {
+        return getCache(exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class));
+    }
+
+    public BasicCache<Object, Object> getCache(String cacheName) {
+        if (cacheName == null) {
+            cacheName = configuration.getCacheName();
+        }
+
+        LOGGER.trace("Cache[{}]", cacheName);
+
+        BasicCache<Object, Object> cache = InfinispanUtil.getCache(cacheContainer, cacheName);
+        if (configuration.hasFlags() && InfinispanUtil.isEmbedded(cache)) {
+            cache = new DecoratedCache(InfinispanUtil.asAdvanced(cache), configuration.getFlags());
+        }
+
+        return cache;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
index c54fd54..7da8552 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.component.infinispan.remote.InfinispanRemoteOperation;
 import org.apache.camel.util.ObjectHelper;
 import org.infinispan.commons.api.BasicCache;
@@ -28,7 +29,7 @@ import org.infinispan.query.dsl.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.component.infinispan.InfinispanUtil.isInHeaderEmpty;
+import static org.apache.camel.component.infinispan.InfinispanUtil.isHeaderEmpty;
 
 public final class InfinispanOperation {
     private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanOperation.class);
@@ -37,12 +38,20 @@ public final class InfinispanOperation {
     }
 
     public static void process(Exchange exchange, InfinispanConfiguration configuration, BasicCache<Object, Object> cache) {
-        Operation operation = getOperation(exchange, configuration);
-        operation.execute(configuration, cache, exchange);
+        final Message in = exchange.getIn();
+
+        Operation operation = getOperation(in, configuration);
+        operation.execute(
+            configuration,
+            exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES) != null
+                ? cache
+                : InfinispanUtil.ignoreReturnValuesCache(cache),
+            in
+        );
     }
 
-    private static Operation getOperation(Exchange exchange, InfinispanConfiguration configuration) {
-        String operation = exchange.getIn().getHeader(InfinispanConstants.OPERATION, String.class);
+    private static Operation getOperation(Message message, InfinispanConfiguration configuration) {
+        String operation = message.getHeader(InfinispanConstants.OPERATION, String.class);
         if (operation == null) {
             if (configuration.hasCommand()) {
                 operation = InfinispanConstants.OPERATION + configuration.getCommand();
@@ -51,310 +60,306 @@ public final class InfinispanOperation {
             }
         }
         LOGGER.trace("Operation: [{}]", operation);
-        return Operation.valueOf(operation.substring(InfinispanConstants.OPERATION.length()).toUpperCase());
+        return Operation.fromOperation(operation);
     }
 
     private enum Operation {
         PUT {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 Object result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        result = cache.put(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        result = cache.put(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        result = cache.put(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        result = cache.put(getKey(message), getValue(message), lifespan, timeUnit);
                     }
                 } else {
-                    result = cache.put(getKey(exchange), getValue(exchange));
+                    result = cache.put(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, PUTASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 NotifyingFuture result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        result = cache.putAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        result = cache.putAsync(getKey(message), getValue(message), lifespan, timeUnit);
                     }
                 } else {
-                    result = cache.putAsync(getKey(exchange), getValue(exchange));
+                    result = cache.putAsync(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, PUTALL {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        cache.putAll(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        cache.putAll(getMap(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        cache.putAll(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        cache.putAll(getMap(message), lifespan, timeUnit);
                     }
                 } else {
-                    cache.putAll(getMap(exchange));
+                    cache.putAll(getMap(message));
                 }
             }
         }, PUTALLASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 NotifyingFuture result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        result = cache.putAllAsync(getMap(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        result = cache.putAllAsync(getMap(message), lifespan, timeUnit);
                     }
                 } else {
-                    result = cache.putAllAsync(getMap(exchange));
+                    result = cache.putAllAsync(getMap(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, PUTIFABSENT {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 Object result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        result = cache.putIfAbsent(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        result = cache.putIfAbsent(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        result = cache.putIfAbsent(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        result = cache.putIfAbsent(getKey(message), getValue(message), lifespan, timeUnit);
                     }
                 } else {
-                    result = cache.putIfAbsent(getKey(exchange), getValue(exchange));
+                    result = cache.putIfAbsent(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, PUTIFABSENTASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 NotifyingFuture result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        result = cache.putIfAbsentAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        result = cache.putIfAbsentAsync(getKey(message), getValue(message), lifespan, timeUnit);
                     }
                 } else {
-                    result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange));
+                    result = cache.putIfAbsentAsync(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, GET {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                setResult(cache.get(getKey(exchange)), exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                setResult(cache.get(getKey(message)), message);
             }
         }, CONTAINSKEY {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                setResult(cache.containsKey(getKey(exchange)), exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                setResult(cache.containsKey(getKey(message)), message);
             }
         }, CONTAINSVALUE {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                Object result = cache.containsValue(getValue(exchange));
-                setResult(result, exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                setResult(cache.containsValue(getValue(message)), message);
             }
         }, REMOVE {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 Object result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (ObjectHelper.isEmpty(getValue(exchange))) {
-                    result = cache.remove(getKey(exchange));
+                if (ObjectHelper.isEmpty(getValue(message))) {
+                    result = cache.remove(getKey(message));
                 } else {
-                    result = cache.remove(getKey(exchange), getValue(exchange));
+                    result = cache.remove(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, REMOVEASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 NotifyingFuture result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (ObjectHelper.isEmpty(getValue(exchange))) {
-                    result = cache.removeAsync(getKey(exchange));
+                if (ObjectHelper.isEmpty(getValue(message))) {
+                    result = cache.removeAsync(getKey(message));
                 } else {
-                    result = cache.removeAsync(getKey(exchange), getValue(exchange));
+                    result = cache.removeAsync(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, REPLACE {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 Object result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                            result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        if (ObjectHelper.isEmpty(getOldValue(message))) {
+                            result = cache.replace(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                         } else {
-                            result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                            result = cache.replace(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                         }
                     } else {
-                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                            result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        if (ObjectHelper.isEmpty(getOldValue(message))) {
+                            result = cache.replace(getKey(message), getValue(message), lifespan, timeUnit);
                         } else {
-                            result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                            result = cache.replace(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit);
                         }
                     }
                 } else {
-                    if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                        result = cache.replace(getKey(exchange), getValue(exchange));
+                    if (ObjectHelper.isEmpty(getOldValue(message))) {
+                        result = cache.replace(getKey(message), getValue(message));
                     } else {
-                        result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange));
+                        result = cache.replace(getKey(message), getOldValue(message), getValue(message));
                     }
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, REPLACEASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 NotifyingFuture result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                            result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        if (ObjectHelper.isEmpty(getOldValue(message))) {
+                            result = cache.replaceAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                         } else {
-                            result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                            result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                         }
                     } else {
-                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                            result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        if (ObjectHelper.isEmpty(getOldValue(message))) {
+                            result = cache.replaceAsync(getKey(message), getValue(message), lifespan, timeUnit);
                         } else {
-                            result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                            result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit);
                         }
                     }
                 } else {
-                    if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                        result = cache.replaceAsync(getKey(exchange), getValue(exchange));
+                    if (ObjectHelper.isEmpty(getOldValue(message))) {
+                        result = cache.replaceAsync(getKey(message), getValue(message));
                     } else {
-                        result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange));
+                        result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message));
                     }
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, SIZE {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                setResult(cache.size(), exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                setResult(cache.size(), message);
             }
         }, CLEAR {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 cache.clear();
             }
         }, CLEARASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                setResult(cache.clearAsync(), exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                setResult(cache.clearAsync(), message);
             }
         }, QUERY {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                Query query = getQuery(configuration, cache, exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                Query query = getQuery(configuration, cache, message);
                 if (query == null) {
                     return;
                 }
-                setResult(query.list(), exchange);
+                setResult(query.list(), message);
             }
         };
 
-        void setResult(Object result, Exchange exchange) {
-            exchange.getIn().setHeader(InfinispanConstants.RESULT, result);
+        private static final Operation[] OPERATIONS = values();
+
+        void setResult(Object result, Message message) {
+            message.setHeader(InfinispanConstants.RESULT, result);
         }
 
-        Object getKey(Exchange exchange) {
-            return exchange.getIn().getHeader(InfinispanConstants.KEY);
+        Object getKey(Message message) {
+            return message.getHeader(InfinispanConstants.KEY);
         }
 
-        Object getValue(Exchange exchange) {
-            return exchange.getIn().getHeader(InfinispanConstants.VALUE);
+        Object getValue(Message message) {
+            return message.getHeader(InfinispanConstants.VALUE);
         }
 
-        Object getOldValue(Exchange exchange) {
-            return exchange.getIn().getHeader(InfinispanConstants.OLD_VALUE);
+        Object getOldValue(Message message) {
+            return message.getHeader(InfinispanConstants.OLD_VALUE);
         }
 
-        Map<? extends Object, ? extends Object>  getMap(Exchange exchange) {
-            return (Map<? extends Object, ? extends Object>) exchange.getIn().getHeader(InfinispanConstants.MAP);
+        Map<? extends Object, ? extends Object>  getMap(Message message) {
+            return (Map<? extends Object, ? extends Object>) message.getHeader(InfinispanConstants.MAP);
         }
 
-        Query getQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+        Query getQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
             if (InfinispanUtil.isRemote(cache)) {
-                return InfinispanRemoteOperation.buildQuery(configuration, cache, exchange);
+                return InfinispanRemoteOperation.buildQuery(configuration, cache, message);
             } else {
                 return null;
             }
 
         }
 
-        abstract void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange);
+        boolean hasLifespan(Message message) {
+            return !isHeaderEmpty(message, InfinispanConstants.LIFESPAN_TIME)
+                && !isHeaderEmpty(message, InfinispanConstants.LIFESPAN_TIME_UNIT);
+        }
+
+        boolean hasMaxIdleTime(Message message) {
+            return !isHeaderEmpty(message, InfinispanConstants.MAX_IDLE_TIME)
+                && !isHeaderEmpty(message, InfinispanConstants.MAX_IDLE_TIME_UNIT);
+        }
+
+        abstract void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message);
+
+        public static Operation fromOperation(String operation) {
+            int len;
+            String name;
+
+            for (int i = OPERATIONS.length - 1; i >= 0; i--) {
+                name = OPERATIONS[i].name();
+                len = name.length();
+                if (len == operation.length() - InfinispanConstants.OPERATION_LEN) {
+                    if (name.regionMatches(true, 0, operation, InfinispanConstants.OPERATION_LEN, len)) {
+                        return OPERATIONS[i];
+                    }
+                }
+            }
+
+            throw new IllegalArgumentException("Unknown Operation for string: " + operation);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
index 31e1e16..6fbd1c1 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
@@ -18,55 +18,31 @@ package org.apache.camel.component.infinispan;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
-import org.infinispan.client.hotrod.RemoteCacheManager;
-import org.infinispan.client.hotrod.configuration.Configuration;
-import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
-import org.infinispan.commons.api.BasicCache;
-import org.infinispan.commons.api.BasicCacheContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class InfinispanProducer extends DefaultProducer {
-    private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class);
     private final InfinispanConfiguration configuration;
-    private BasicCacheContainer cacheContainer;
-    private boolean isManagedCacheContainer;
+    private final InfinispanManager manager;
 
     public InfinispanProducer(InfinispanEndpoint endpoint, InfinispanConfiguration configuration) {
         super(endpoint);
         this.configuration = configuration;
+        this.manager = new InfinispanManager(endpoint.getCamelContext(), configuration);
     }
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        InfinispanOperation.process(exchange, configuration, getCache(exchange));
+        InfinispanOperation.process(exchange, configuration, manager.getCache(exchange));
     }
 
     @Override
     protected void doStart() throws Exception {
-        cacheContainer = configuration.getCacheContainer();
-        if (cacheContainer == null) {
-            Configuration config = new ConfigurationBuilder().classLoader(Thread.currentThread().getContextClassLoader()).addServers(configuration.getHost()).build();
-            cacheContainer = new RemoteCacheManager(config, true);
-            isManagedCacheContainer = true;
-        }
         super.doStart();
+        manager.start();
     }
 
     @Override
     protected void doStop() throws Exception {
-        if (isManagedCacheContainer) {
-            cacheContainer.stop();
-        }
+        manager.stop();
         super.doStop();
     }
-
-    private BasicCache<Object, Object> getCache(Exchange exchange) {
-        String cacheName = exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class);
-        if (cacheName == null) {
-            cacheName = configuration.getCacheName();
-        }
-        LOGGER.trace("Cache[{}]", cacheName);
-        return cacheName != null ? cacheContainer.getCache(cacheName) : cacheContainer.getCache();
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
index 95a5d04..3bc6c2a 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
@@ -16,8 +16,17 @@
  */
 package org.apache.camel.component.infinispan;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ResourceHelper;
+import org.infinispan.AdvancedCache;
 import org.infinispan.Cache;
 import org.infinispan.client.hotrod.RemoteCache;
 import org.infinispan.client.hotrod.RemoteCacheManager;
@@ -68,10 +77,20 @@ public final class InfinispanUtil {
     }
 
     @SuppressWarnings("unchecked")
+    public static <K, V> Cache<K, V> asEmbedded(BasicCache<K, V> cache) {
+        return Cache.class.cast(cache);
+    }
+
+    @SuppressWarnings("unchecked")
     public static <K, V>  RemoteCache<K, V> asRemote(BasicCache<K, V> cache) {
         return RemoteCache.class.cast(cache);
     }
 
+    @SuppressWarnings("unchecked")
+    public static <K, V> AdvancedCache<K, V> asAdvanced(BasicCache<K, V> cache) {
+        return Cache.class.cast(cache).getAdvancedCache();
+    }
+
     public static <K, V> BasicCache<K, V> ignoreReturnValuesCache(BasicCache<K, V> cache) {
         if (isEmbedded(cache)) {
             return ((Cache<K, V>) cache).getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES);
@@ -81,10 +100,36 @@ public final class InfinispanUtil {
     }
 
     public static boolean isInHeaderEmpty(Exchange exchange, String header) {
-        return ObjectHelper.isEmpty(exchange.getIn().getHeader(header));
+        return isHeaderEmpty(exchange.getIn(), header);
+    }
+
+    public static boolean isHeaderEmpty(Message message, String header) {
+        return ObjectHelper.isEmpty(message.getHeader(header));
     }
 
     public static BasicCache<Object, Object> getCache(BasicCacheContainer cacheContainer, String cacheName) {
         return ObjectHelper.isEmpty(cacheName) ? cacheContainer.getCache() : cacheContainer.getCache(cacheName);
     }
+
+    public static Properties loadProperties(CamelContext camelContext, String uri) throws IOException {
+        if (camelContext != null) {
+            try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) {
+                Properties properties = new Properties();
+                properties.load(is);
+
+                return properties;
+            } catch (IOException e) {
+            }
+        }
+
+        try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(uri)) {
+            Properties properties = new Properties();
+            properties.load(is);
+
+            return properties;
+        } catch (IOException e) {
+        }
+
+        throw new FileNotFoundException("Cannot find resource: " + uri);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
index 4f50950..0d6f630 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
@@ -22,7 +22,6 @@ import org.infinispan.notifications.Listener;
 
 @Listener(clustered = false, sync = false)
 public class InfinispanAsyncLocalEventListener extends InfinispanSyncLocalEventListener {
-
     public InfinispanAsyncLocalEventListener(InfinispanConsumer consumer, Set<String> eventTypes) {
         super(consumer, eventTypes);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
index 113c073..d4a70ca 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
@@ -20,6 +20,7 @@ import org.apache.camel.component.infinispan.InfinispanConfiguration;
 import org.apache.camel.component.infinispan.InfinispanConsumer;
 import org.apache.camel.component.infinispan.InfinispanConsumerHandler;
 import org.apache.camel.component.infinispan.InfinispanEventListener;
+import org.apache.camel.component.infinispan.InfinispanUtil;
 import org.infinispan.Cache;
 
 public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsumerHandler {
@@ -30,7 +31,7 @@ public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsum
 
     @Override
     public InfinispanEventListener start(InfinispanConsumer consumer) {
-        Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache();
+        Cache<?, ?> embeddedCache = InfinispanUtil.asEmbedded(consumer.getCache());
         InfinispanConfiguration configuration = consumer.getConfiguration();
         InfinispanEventListener listener;
         if (configuration.hasCustomListener()) {
@@ -55,8 +56,7 @@ public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsum
 
     @Override
     public void stop(InfinispanConsumer consumer) {
-        Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache();
+        Cache<?, ?> embeddedCache = InfinispanUtil.asEmbedded(consumer.getCache());
         embeddedCache.removeListener(consumer.getListener());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
index bcf6957..14fac43 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.infinispan.embedded;
 
 import java.util.Set;
+
 import org.apache.camel.component.infinispan.InfinispanConsumer;
 import org.apache.camel.component.infinispan.InfinispanEventListener;
 import org.infinispan.notifications.Listener;
@@ -25,16 +26,12 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
 import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Listener(clustered = true, sync = true)
 public class InfinispanSyncClusteredEventListener extends InfinispanEventListener {
-    /*clustered listeners only listen for post events*/
+    // clustered listeners only listen for post events
     private static final boolean IS_PRE = false;
 
-    private final transient Logger logger = LoggerFactory.getLogger(this.getClass());
-
     public InfinispanSyncClusteredEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) {
         super(infinispanConsumer, eventTypes);
     }
@@ -44,8 +41,6 @@ public class InfinispanSyncClusteredEventListener extends InfinispanEventListene
     @CacheEntryRemoved
     @CacheEntryExpired
     public void processEvent(CacheEntryEvent<Object, Object> event) {
-        logger.trace("Received CacheEntryEvent [{}]", event);
-
         if (isAccepted(event.getType().toString())) {
             infinispanConsumer.processEvent(event.getType().toString(), IS_PRE, event.getCache().getName(), event.getKey());
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
index 1cf9325..e8112bd 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.infinispan.embedded;
 
 import java.util.Set;
+
 import org.apache.camel.component.infinispan.InfinispanConsumer;
 import org.apache.camel.component.infinispan.InfinispanEventListener;
 import org.infinispan.notifications.Listener;
@@ -30,13 +31,9 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivate
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
 import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Listener(clustered = false, sync = true)
 public class InfinispanSyncLocalEventListener extends InfinispanEventListener {
-    private final transient Logger logger = LoggerFactory.getLogger(this.getClass());
-
     public InfinispanSyncLocalEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) {
         super(infinispanConsumer, eventTypes);
     }
@@ -51,8 +48,6 @@ public class InfinispanSyncLocalEventListener extends InfinispanEventListener {
     @CacheEntryVisited
     @CacheEntryExpired
     public void processEvent(CacheEntryEvent<Object, Object> event) {
-        logger.trace("Received CacheEntryEvent [{}]", event);
-
         dispatch(event.getType().toString(), event.isPre(), event.getCache().getName(), event.getKey());
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
index e7838b3..b28a460 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
@@ -29,17 +29,19 @@ import org.infinispan.manager.DefaultCacheManager;
 public class InfinispanIdempotentRepository extends ServiceSupport implements IdempotentRepository<Object> {
     private final String cacheName;
     private final BasicCacheContainer cacheContainer;
-    private boolean isManagedCacheContainer;
+    private final boolean isManagedCacheContainer;
+    private BasicCache<Object, Boolean> cache;
 
     public InfinispanIdempotentRepository(BasicCacheContainer cacheContainer, String cacheName) {
         this.cacheContainer = cacheContainer;
         this.cacheName = cacheName;
+        this.isManagedCacheContainer = false;
     }
 
     public InfinispanIdempotentRepository(String cacheName) {
-        cacheContainer = new DefaultCacheManager();
+        this.cacheContainer = new DefaultCacheManager();
         this.cacheName = cacheName;
-        isManagedCacheContainer = true;
+        this.isManagedCacheContainer = true;
     }
 
     public InfinispanIdempotentRepository() {
@@ -106,16 +108,21 @@ public class InfinispanIdempotentRepository extends ServiceSupport implements Id
 
     @Override
     protected void doShutdown() throws Exception {
-        super.doShutdown();
         if (isManagedCacheContainer) {
             cacheContainer.stop();
         }
+
+        super.doShutdown();
     }
 
     private BasicCache<Object, Boolean> getCache() {
-        return cacheName != null
+        if (cache == null) {
+            cache = cacheName != null
                 ? cacheContainer.<Object, Boolean>getCache(cacheName)
                 : cacheContainer.<Object, Boolean>getCache();
+        }
+
+        return cache;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
index dd0f590..42b3cec 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
@@ -16,7 +16,7 @@
  */
 package org.apache.camel.component.infinispan.remote;
 
-import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.component.infinispan.InfinispanConfiguration;
 import org.apache.camel.component.infinispan.InfinispanConstants;
 import org.apache.camel.component.infinispan.InfinispanQueryBuilder;
@@ -30,8 +30,8 @@ public final class InfinispanRemoteOperation {
     private InfinispanRemoteOperation() {
     }
 
-    public static Query buildQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-        InfinispanQueryBuilder queryBuilder = exchange.getIn().getHeader(InfinispanConstants.QUERY_BUILDER, InfinispanQueryBuilder.class);
+    public static Query buildQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+        InfinispanQueryBuilder queryBuilder = message.getHeader(InfinispanConstants.QUERY_BUILDER, InfinispanQueryBuilder.class);
         if (queryBuilder == null) {
             queryBuilder = configuration.getQueryBuilder();
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
index b47b1db..747a9e4 100644
--- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.infinispan;
 
 import java.util.List;
+
 import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.infinispan.Cache;
@@ -90,15 +91,18 @@ public class InfinispanClusterTestSupport extends CamelTestSupport {
         } catch (Throwable ex) {
             throw new Exception(ex);
         }
+
         super.setUp();
     }
 
     @Override
     public void tearDown() throws Exception {
+        super.tearDown();
+
+        // Has to be done later, maybe CamelTestSupport should
         for (BasicCacheContainer container: clusteredCacheContainers) {
             container.stop();
         }
-        super.tearDown();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
new file mode 100644
index 0000000..48c7772
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import org.infinispan.cache.impl.DecoratedCache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.manager.DefaultCacheManager;
+import org.jgroups.util.UUID;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class InfinispanConfigurationTestIT {
+
+    @Test
+    public void embeddedCacheWithFlagsTest() throws Exception {
+        InfinispanConfiguration configuration = new InfinispanConfiguration();
+        configuration.setHost("localhost");
+        configuration.setCacheName("misc_cache");
+        configuration.setCacheContainer(new DefaultCacheManager(true));
+        configuration.setFlags(
+            org.infinispan.context.Flag.SKIP_CACHE_LOAD,
+            org.infinispan.context.Flag.SKIP_CACHE_STORE
+        );
+
+        InfinispanManager manager = new InfinispanManager(configuration);
+        manager.start();
+
+        BasicCache<Object, Object> cache = manager.getCache();
+        assertNotNull(cache);
+        assertTrue(cache instanceof DecoratedCache);
+
+        DecoratedCache<Object, Object> decoratedCache = (DecoratedCache<Object, Object>)cache;
+        assertTrue(decoratedCache.getFlags().contains(org.infinispan.context.Flag.SKIP_CACHE_LOAD));
+        assertTrue(decoratedCache.getFlags().contains(org.infinispan.context.Flag.SKIP_CACHE_STORE));
+
+        manager.getCacheContainer().stop();
+        manager.stop();
+    }
+
+    @Test
+    public void remoteCacheWithoutProperties() throws Exception {
+        InfinispanConfiguration configuration = new InfinispanConfiguration();
+        configuration.setHost("localhost");
+        configuration.setCacheName("misc_cache");
+
+        InfinispanManager manager = new InfinispanManager(configuration);
+        manager.start();
+
+        BasicCache<Object, Object> cache = manager.getCache();
+        assertNotNull(cache);
+        assertTrue(cache instanceof RemoteCache);
+
+        RemoteCache<Object, Object> remoteCache = InfinispanUtil.asRemote(cache);
+
+        String key = UUID.randomUUID().toString();
+        assertNull(remoteCache.put(key, "val1"));
+        assertNull(remoteCache.put(key, "val2"));
+
+        manager.stop();
+    }
+
+    @Test
+    public void remoteCacheWithPropertiesTest() throws Exception {
+        InfinispanConfiguration configuration = new InfinispanConfiguration();
+        configuration.setHost("localhost");
+        configuration.setCacheName("misc_cache");
+        configuration.setConfigurationUri("infinispan/client.properties");
+
+        InfinispanManager manager = new InfinispanManager(configuration);
+        manager.start();
+
+        BasicCache<Object, Object> cache = manager.getCache();
+        assertNotNull(cache);
+        assertTrue(cache instanceof RemoteCache);
+
+        String key = UUID.randomUUID().toString();
+        assertNull(cache.put(key, "val1"));
+        assertNotNull(cache.put(key, "val2"));
+
+        manager.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/test/resources/infinispan/client.properties
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/resources/infinispan/client.properties b/components/camel-infinispan/src/test/resources/infinispan/client.properties
new file mode 100644
index 0000000..4cacbe2
--- /dev/null
+++ b/components/camel-infinispan/src/test/resources/infinispan/client.properties
@@ -0,0 +1,20 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+# Whether or not to implicitly Flag.FORCE_RETURN_VALUE for all calls.
+# The default value for this property is false.
+infinispan.client.hotrod.force_return_values = true

http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/resources/log4j.xml b/components/camel-infinispan/src/test/resources/log4j.xml
new file mode 100644
index 0000000..71a2b38
--- /dev/null
+++ b/components/camel-infinispan/src/test/resources/log4j.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+    <appender name="FILE" class="org.apache.log4j.FileAppender">
+        <param name="File" value="target/camel-infinispan-test.log" />
+        <param name="Append" value="true" />
+        <param name="Threshold" value="INFO" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d [%-15.15t] %-5p %-30.30c{1} - %m%n" />
+        </layout>
+    </appender>
+
+    <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+        <param name="Threshold" value="INFO" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%30.30t] %-30.30c{1} %-5p %m%n" />
+        </layout>
+    </appender>
+
+    <!-- ================ -->
+    <!-- Limit categories -->
+    <!-- ================ -->
+    <category name="org.jboss.arquillian">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.infinispan.arquillian.core.WithRunningServerObserver">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.infinispan.server.test.util.TestsuiteListener">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.apache.http">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.infinispan">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.jgroups">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.apache.commons.httpclient.auth">
+        <priority value="INFO"/>
+    </category>
+
+    <category name="org.apache.auth">
+        <priority value="INFO"/>
+    </category>
+    
+    <category name="org.apache.directory">
+        <priority value="WARN"/>
+    </category>
+
+    <!-- ======================= -->
+    <!-- Setup the Root category -->
+    <!-- ======================= -->
+
+    <root>
+        <priority value="INFO" />
+        <appender-ref ref="FILE" />
+    </root>
+
+</log4j:configuration>


[2/2] camel git commit: CAMEL-9740 : Improve camel-infinispan

Posted by da...@apache.org.
CAMEL-9740 : Improve camel-infinispan


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0de49d45
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0de49d45
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0de49d45

Branch: refs/heads/camel-2.17.x
Commit: 0de49d45a4fe11f4d6ec7022208b306f24fc8905
Parents: 2363157
Author: lburgazzoli <lb...@gmail.com>
Authored: Fri Mar 11 10:48:26 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 23 09:58:54 2016 +0100

----------------------------------------------------------------------
 components/camel-infinispan/pom.xml             |   2 +
 .../infinispan/InfinispanConfiguration.java     |  44 ++-
 .../infinispan/InfinispanConstants.java         |   2 +
 .../infinispan/InfinispanConsumer.java          |  19 +-
 .../component/infinispan/InfinispanManager.java | 116 ++++++
 .../infinispan/InfinispanOperation.java         | 355 ++++++++++---------
 .../infinispan/InfinispanProducer.java          |  34 +-
 .../component/infinispan/InfinispanUtil.java    |  47 ++-
 .../InfinispanAsyncLocalEventListener.java      |   1 -
 .../InfinispanConsumerEmbeddedHandler.java      |   6 +-
 .../InfinispanSyncClusteredEventListener.java   |   9 +-
 .../InfinispanSyncLocalEventListener.java       |   7 +-
 .../InfinispanIdempotentRepository.java         |  17 +-
 .../remote/InfinispanRemoteOperation.java       |   6 +-
 .../InfinispanClusterTestSupport.java           |   6 +-
 .../InfinispanConfigurationTestIT.java          | 100 ++++++
 .../test/resources/infinispan/client.properties |  20 ++
 .../src/test/resources/log4j.xml                |  70 ++++
 18 files changed, 621 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/pom.xml b/components/camel-infinispan/pom.xml
index e7eb14a..ff4eb99 100644
--- a/components/camel-infinispan/pom.xml
+++ b/components/camel-infinispan/pom.xml
@@ -254,6 +254,8 @@
                                             <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=static_filter_factory:add(configuration=default)</command>
                                             <!-- Separate cache for @ClientListener(includeCurrentState=true) -->
                                             <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=include_current_state:add(configuration=default)</command>
+                                            <!-- Separate cache for protobuf serialized objects. -->
+                                            <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=misc_cache:add(configuration=default)</command>
                                         </commands>
                                     </executeCommands>
                                 </configuration>

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
index 473a267..0559f84 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.ObjectHelper;
 import org.infinispan.commons.api.BasicCacheContainer;
+import org.infinispan.context.Flag;
 
 @UriParams
 public class InfinispanConfiguration {
@@ -55,6 +56,11 @@ public class InfinispanConfiguration {
     private boolean clusteredListener;
     @UriParam
     private InfinispanQueryBuilder queryBuilder;
+    @UriParam(label = "advanced", javaType = "java.lang.String")
+    private Flag[] flags;
+    @UriParam(label = "advanced")
+    private String configurationUri;
+
 
     public String getCommand() {
         return command;
@@ -149,7 +155,7 @@ public class InfinispanConfiguration {
      * TRANSACTION_REGISTERED, CACHE_ENTRY_INVALIDATED, DATA_REHASHED, TOPOLOGY_CHANGED, PARTITION_STATUS_CHANGED
      */
     public void setEventTypes(String eventTypes) {
-        this.eventTypes = new HashSet<String>(Arrays.asList(eventTypes.split(",")));
+        this.eventTypes = new HashSet<>(Arrays.asList(eventTypes.split(",")));
     }
 
     /**
@@ -181,4 +187,40 @@ public class InfinispanConfiguration {
     public boolean hasQueryBuilder() {
         return queryBuilder != null;
     }
+
+    public Flag[] getFlags() {
+        return flags;
+    }
+
+    /**
+     * A comma separated list of Flag to be applied by default on each cache
+     * invocation, not applicable to remote caches.
+     */
+    public void setFlags(String flagsAsString) {
+        String[] flagsArray = flagsAsString.split(",");
+        this.flags = new Flag[flagsArray.length];
+
+        for (int i = 0; i < flagsArray.length; i++) {
+            this.flags[i] = Flag.valueOf(flagsArray[i]);
+        }
+    }
+
+    public void setFlags(Flag... flags) {
+        this.flags = flags;
+    }
+
+    public boolean hasFlags() {
+        return flags != null && flags.length > 0;
+    }
+
+    /**
+     * An implementation specific URI for the CacheManager
+     */
+    public String getConfigurationUri() {
+        return configurationUri;
+    }
+
+    public void setConfigurationUri(String configurationUri) {
+        this.configurationUri = configurationUri;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
index 1a27fd7..d791968 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.infinispan;
 
 public interface InfinispanConstants {
+    int OPERATION_LEN = InfinispanConstants.OPERATION.length();
+
     String EVENT_TYPE = "CamelInfinispanEventType";
     String IS_PRE = "CamelInfinispanIsPre";
     String CACHE_NAME = "CamelInfinispanCacheName";

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
index b4db422..e60156a 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
@@ -25,7 +25,6 @@ import org.apache.camel.impl.DefaultConsumer;
 import org.infinispan.client.hotrod.RemoteCache;
 import org.infinispan.client.hotrod.Search;
 import org.infinispan.commons.api.BasicCache;
-import org.infinispan.commons.api.BasicCacheContainer;
 import org.infinispan.query.api.continuous.ContinuousQuery;
 import org.infinispan.query.api.continuous.ContinuousQueryListener;
 import org.infinispan.query.dsl.Query;
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
 public class InfinispanConsumer extends DefaultConsumer {
     private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class);
     private final InfinispanConfiguration configuration;
+    private final InfinispanManager manager;
     private InfinispanEventListener listener;
     private InfinispanConsumerHandler consumerHandler;
     private BasicCache<Object, Object> cache;
@@ -43,6 +43,7 @@ public class InfinispanConsumer extends DefaultConsumer {
     public InfinispanConsumer(InfinispanEndpoint endpoint, Processor processor, InfinispanConfiguration configuration) {
         super(endpoint, processor);
         this.configuration = configuration;
+        this.manager = new InfinispanManager(endpoint.getCamelContext(), configuration);
     }
 
     public void processEvent(String eventType, boolean isPre, String cacheName, Object key) {
@@ -69,10 +70,9 @@ public class InfinispanConsumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+        manager.start();
 
-        BasicCacheContainer cacheContainer = configuration.getCacheContainer();
-        cache = InfinispanUtil.getCache(configuration.getCacheContainer(), configuration.getCacheName());
-
+        cache = manager.getCache();
         if (configuration.hasQueryBuilder()) {
             if (InfinispanUtil.isRemote(cache)) {
                 RemoteCache<Object, Object> remoteCache = InfinispanUtil.asRemote(cache);
@@ -81,15 +81,17 @@ public class InfinispanConsumer extends DefaultConsumer {
                 continuousQuery = Search.getContinuousQuery(remoteCache);
                 continuousQuery.addContinuousQueryListener(query, new ContinuousQueryEventListener(cache.getName()));
             } else {
-                throw new IllegalArgumentException("Can't run continuous queries against embedded cache (" + cache.getName() + ")");
+                throw new IllegalArgumentException(
+                    "Can't run continuous queries against embedded cache (" + cache.getName() + ")");
             }
         } else {
-            if (InfinispanUtil.isEmbedded(cacheContainer)) {
+            if (manager.isCacheContainerEmbedded()) {
                 consumerHandler = InfinispanConsumerEmbeddedHandler.INSTANCE;
-            } else if (InfinispanUtil.isRemote(cacheContainer)) {
+            } else if (manager.isCacheContainerRemote()) {
                 consumerHandler = InfinispanConsumerRemoteHandler.INSTANCE;
             } else {
-                throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + cacheContainer);
+                throw new UnsupportedOperationException(
+                    "Unsupported CacheContainer type " + manager.getCacheContainer().getClass().getName());
             }
 
             listener = consumerHandler.start(this);
@@ -106,6 +108,7 @@ public class InfinispanConsumer extends DefaultConsumer {
             consumerHandler.stop(this);
         }
 
+        manager.stop();
         super.doStop();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
new file mode 100644
index 0000000..eab846b
--- /dev/null
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Service;
+import org.infinispan.cache.impl.DecoratedCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InfinispanManager implements Service {
+    private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanManager.class);
+
+    private final InfinispanConfiguration configuration;
+    private final CamelContext camelContext;
+    private BasicCacheContainer cacheContainer;
+    private boolean isManagedCacheContainer;
+
+
+    public InfinispanManager(InfinispanConfiguration configuration) {
+        this(null, configuration);
+    }
+
+    public InfinispanManager(CamelContext camelContext, InfinispanConfiguration configuration) {
+        this.camelContext = camelContext;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void start() throws Exception {
+        cacheContainer = configuration.getCacheContainer();
+        if (cacheContainer == null) {
+            String uri = configuration.getConfigurationUri();
+            if (uri != null && camelContext != null) {
+                uri = camelContext.resolvePropertyPlaceholders(uri);
+            }
+
+            ConfigurationBuilder configurationBuilder = new ConfigurationBuilder()
+                .classLoader(Thread.currentThread().getContextClassLoader());
+
+            if (uri != null) {
+                configurationBuilder.withProperties(InfinispanUtil.loadProperties(camelContext, uri));
+            }
+
+            cacheContainer = new RemoteCacheManager(
+                configurationBuilder
+                    .addServers(configuration.getHost())
+                    .build(),
+                true);
+
+            isManagedCacheContainer = true;
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (isManagedCacheContainer) {
+            cacheContainer.stop();
+        }
+    }
+
+    public BasicCacheContainer getCacheContainer() {
+        return cacheContainer;
+    }
+
+    public boolean isCacheContainerEmbedded() {
+        return InfinispanUtil.isEmbedded(cacheContainer);
+    }
+
+    public boolean isCacheContainerRemote() {
+        return InfinispanUtil.isRemote(cacheContainer);
+    }
+
+    public BasicCache<Object, Object> getCache() {
+        return getCache(configuration.getCacheName());
+    }
+
+    public BasicCache<Object, Object> getCache(Exchange exchange) {
+        return getCache(exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class));
+    }
+
+    public BasicCache<Object, Object> getCache(String cacheName) {
+        if (cacheName == null) {
+            cacheName = configuration.getCacheName();
+        }
+
+        LOGGER.trace("Cache[{}]", cacheName);
+
+        BasicCache<Object, Object> cache = InfinispanUtil.getCache(cacheContainer, cacheName);
+        if (configuration.hasFlags() && InfinispanUtil.isEmbedded(cache)) {
+            cache = new DecoratedCache(InfinispanUtil.asAdvanced(cache), configuration.getFlags());
+        }
+
+        return cache;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
index c54fd54..7da8552 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.component.infinispan.remote.InfinispanRemoteOperation;
 import org.apache.camel.util.ObjectHelper;
 import org.infinispan.commons.api.BasicCache;
@@ -28,7 +29,7 @@ import org.infinispan.query.dsl.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.component.infinispan.InfinispanUtil.isInHeaderEmpty;
+import static org.apache.camel.component.infinispan.InfinispanUtil.isHeaderEmpty;
 
 public final class InfinispanOperation {
     private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanOperation.class);
@@ -37,12 +38,20 @@ public final class InfinispanOperation {
     }
 
     public static void process(Exchange exchange, InfinispanConfiguration configuration, BasicCache<Object, Object> cache) {
-        Operation operation = getOperation(exchange, configuration);
-        operation.execute(configuration, cache, exchange);
+        final Message in = exchange.getIn();
+
+        Operation operation = getOperation(in, configuration);
+        operation.execute(
+            configuration,
+            exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES) != null
+                ? cache
+                : InfinispanUtil.ignoreReturnValuesCache(cache),
+            in
+        );
     }
 
-    private static Operation getOperation(Exchange exchange, InfinispanConfiguration configuration) {
-        String operation = exchange.getIn().getHeader(InfinispanConstants.OPERATION, String.class);
+    private static Operation getOperation(Message message, InfinispanConfiguration configuration) {
+        String operation = message.getHeader(InfinispanConstants.OPERATION, String.class);
         if (operation == null) {
             if (configuration.hasCommand()) {
                 operation = InfinispanConstants.OPERATION + configuration.getCommand();
@@ -51,310 +60,306 @@ public final class InfinispanOperation {
             }
         }
         LOGGER.trace("Operation: [{}]", operation);
-        return Operation.valueOf(operation.substring(InfinispanConstants.OPERATION.length()).toUpperCase());
+        return Operation.fromOperation(operation);
     }
 
     private enum Operation {
         PUT {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 Object result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        result = cache.put(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        result = cache.put(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        result = cache.put(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        result = cache.put(getKey(message), getValue(message), lifespan, timeUnit);
                     }
                 } else {
-                    result = cache.put(getKey(exchange), getValue(exchange));
+                    result = cache.put(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, PUTASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 NotifyingFuture result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        result = cache.putAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        result = cache.putAsync(getKey(message), getValue(message), lifespan, timeUnit);
                     }
                 } else {
-                    result = cache.putAsync(getKey(exchange), getValue(exchange));
+                    result = cache.putAsync(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, PUTALL {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        cache.putAll(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        cache.putAll(getMap(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        cache.putAll(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        cache.putAll(getMap(message), lifespan, timeUnit);
                     }
                 } else {
-                    cache.putAll(getMap(exchange));
+                    cache.putAll(getMap(message));
                 }
             }
         }, PUTALLASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 NotifyingFuture result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        result = cache.putAllAsync(getMap(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        result = cache.putAllAsync(getMap(message), lifespan, timeUnit);
                     }
                 } else {
-                    result = cache.putAllAsync(getMap(exchange));
+                    result = cache.putAllAsync(getMap(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, PUTIFABSENT {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 Object result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        result = cache.putIfAbsent(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        result = cache.putIfAbsent(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        result = cache.putIfAbsent(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        result = cache.putIfAbsent(getKey(message), getValue(message), lifespan, timeUnit);
                     }
                 } else {
-                    result = cache.putIfAbsent(getKey(exchange), getValue(exchange));
+                    result = cache.putIfAbsent(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, PUTIFABSENTASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 NotifyingFuture result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        result = cache.putIfAbsentAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                     } else {
-                        result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        result = cache.putIfAbsentAsync(getKey(message), getValue(message), lifespan, timeUnit);
                     }
                 } else {
-                    result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange));
+                    result = cache.putIfAbsentAsync(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, GET {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                setResult(cache.get(getKey(exchange)), exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                setResult(cache.get(getKey(message)), message);
             }
         }, CONTAINSKEY {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                setResult(cache.containsKey(getKey(exchange)), exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                setResult(cache.containsKey(getKey(message)), message);
             }
         }, CONTAINSVALUE {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                Object result = cache.containsValue(getValue(exchange));
-                setResult(result, exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                setResult(cache.containsValue(getValue(message)), message);
             }
         }, REMOVE {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 Object result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (ObjectHelper.isEmpty(getValue(exchange))) {
-                    result = cache.remove(getKey(exchange));
+                if (ObjectHelper.isEmpty(getValue(message))) {
+                    result = cache.remove(getKey(message));
                 } else {
-                    result = cache.remove(getKey(exchange), getValue(exchange));
+                    result = cache.remove(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, REMOVEASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 NotifyingFuture result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (ObjectHelper.isEmpty(getValue(exchange))) {
-                    result = cache.removeAsync(getKey(exchange));
+                if (ObjectHelper.isEmpty(getValue(message))) {
+                    result = cache.removeAsync(getKey(message));
                 } else {
-                    result = cache.removeAsync(getKey(exchange), getValue(exchange));
+                    result = cache.removeAsync(getKey(message), getValue(message));
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, REPLACE {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 Object result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                            result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        if (ObjectHelper.isEmpty(getOldValue(message))) {
+                            result = cache.replace(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                         } else {
-                            result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                            result = cache.replace(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                         }
                     } else {
-                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                            result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        if (ObjectHelper.isEmpty(getOldValue(message))) {
+                            result = cache.replace(getKey(message), getValue(message), lifespan, timeUnit);
                         } else {
-                            result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                            result = cache.replace(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit);
                         }
                     }
                 } else {
-                    if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                        result = cache.replace(getKey(exchange), getValue(exchange));
+                    if (ObjectHelper.isEmpty(getOldValue(message))) {
+                        result = cache.replace(getKey(message), getValue(message));
                     } else {
-                        result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange));
+                        result = cache.replace(getKey(message), getOldValue(message), getValue(message));
                     }
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, REPLACEASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 NotifyingFuture result;
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
-                    cache = InfinispanUtil.ignoreReturnValuesCache(cache);
-                }
-                if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
-                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
-                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
-                    if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
-                        && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
-                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
-                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
-                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                            result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                if (hasLifespan(message)) {
+                    long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    TimeUnit timeUnit =  message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+                    if (hasMaxIdleTime(message)) {
+                        long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        TimeUnit maxIdleTimeUnit =  message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+                        if (ObjectHelper.isEmpty(getOldValue(message))) {
+                            result = cache.replaceAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                         } else {
-                            result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                            result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
                         }
                     } else {
-                        if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                            result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                        if (ObjectHelper.isEmpty(getOldValue(message))) {
+                            result = cache.replaceAsync(getKey(message), getValue(message), lifespan, timeUnit);
                         } else {
-                            result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                            result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit);
                         }
                     }
                 } else {
-                    if (ObjectHelper.isEmpty(getOldValue(exchange))) {
-                        result = cache.replaceAsync(getKey(exchange), getValue(exchange));
+                    if (ObjectHelper.isEmpty(getOldValue(message))) {
+                        result = cache.replaceAsync(getKey(message), getValue(message));
                     } else {
-                        result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange));
+                        result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message));
                     }
                 }
-                setResult(result, exchange);
+                setResult(result, message);
             }
         }, SIZE {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                setResult(cache.size(), exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                setResult(cache.size(), message);
             }
         }, CLEAR {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
                 cache.clear();
             }
         }, CLEARASYNC {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                setResult(cache.clearAsync(), exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                setResult(cache.clearAsync(), message);
             }
         }, QUERY {
             @Override
-            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-                Query query = getQuery(configuration, cache, exchange);
+            void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+                Query query = getQuery(configuration, cache, message);
                 if (query == null) {
                     return;
                 }
-                setResult(query.list(), exchange);
+                setResult(query.list(), message);
             }
         };
 
-        void setResult(Object result, Exchange exchange) {
-            exchange.getIn().setHeader(InfinispanConstants.RESULT, result);
+        private static final Operation[] OPERATIONS = values();
+
+        void setResult(Object result, Message message) {
+            message.setHeader(InfinispanConstants.RESULT, result);
         }
 
-        Object getKey(Exchange exchange) {
-            return exchange.getIn().getHeader(InfinispanConstants.KEY);
+        Object getKey(Message message) {
+            return message.getHeader(InfinispanConstants.KEY);
         }
 
-        Object getValue(Exchange exchange) {
-            return exchange.getIn().getHeader(InfinispanConstants.VALUE);
+        Object getValue(Message message) {
+            return message.getHeader(InfinispanConstants.VALUE);
         }
 
-        Object getOldValue(Exchange exchange) {
-            return exchange.getIn().getHeader(InfinispanConstants.OLD_VALUE);
+        Object getOldValue(Message message) {
+            return message.getHeader(InfinispanConstants.OLD_VALUE);
         }
 
-        Map<? extends Object, ? extends Object>  getMap(Exchange exchange) {
-            return (Map<? extends Object, ? extends Object>) exchange.getIn().getHeader(InfinispanConstants.MAP);
+        Map<? extends Object, ? extends Object>  getMap(Message message) {
+            return (Map<? extends Object, ? extends Object>) message.getHeader(InfinispanConstants.MAP);
         }
 
-        Query getQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+        Query getQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
             if (InfinispanUtil.isRemote(cache)) {
-                return InfinispanRemoteOperation.buildQuery(configuration, cache, exchange);
+                return InfinispanRemoteOperation.buildQuery(configuration, cache, message);
             } else {
                 return null;
             }
 
         }
 
-        abstract void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange);
+        boolean hasLifespan(Message message) {
+            return !isHeaderEmpty(message, InfinispanConstants.LIFESPAN_TIME)
+                && !isHeaderEmpty(message, InfinispanConstants.LIFESPAN_TIME_UNIT);
+        }
+
+        boolean hasMaxIdleTime(Message message) {
+            return !isHeaderEmpty(message, InfinispanConstants.MAX_IDLE_TIME)
+                && !isHeaderEmpty(message, InfinispanConstants.MAX_IDLE_TIME_UNIT);
+        }
+
+        abstract void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message);
+
+        public static Operation fromOperation(String operation) {
+            int len;
+            String name;
+
+            for (int i = OPERATIONS.length - 1; i >= 0; i--) {
+                name = OPERATIONS[i].name();
+                len = name.length();
+                if (len == operation.length() - InfinispanConstants.OPERATION_LEN) {
+                    if (name.regionMatches(true, 0, operation, InfinispanConstants.OPERATION_LEN, len)) {
+                        return OPERATIONS[i];
+                    }
+                }
+            }
+
+            throw new IllegalArgumentException("Unknown Operation for string: " + operation);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
index 31e1e16..6fbd1c1 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
@@ -18,55 +18,31 @@ package org.apache.camel.component.infinispan;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
-import org.infinispan.client.hotrod.RemoteCacheManager;
-import org.infinispan.client.hotrod.configuration.Configuration;
-import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
-import org.infinispan.commons.api.BasicCache;
-import org.infinispan.commons.api.BasicCacheContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class InfinispanProducer extends DefaultProducer {
-    private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class);
     private final InfinispanConfiguration configuration;
-    private BasicCacheContainer cacheContainer;
-    private boolean isManagedCacheContainer;
+    private final InfinispanManager manager;
 
     public InfinispanProducer(InfinispanEndpoint endpoint, InfinispanConfiguration configuration) {
         super(endpoint);
         this.configuration = configuration;
+        this.manager = new InfinispanManager(endpoint.getCamelContext(), configuration);
     }
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        InfinispanOperation.process(exchange, configuration, getCache(exchange));
+        InfinispanOperation.process(exchange, configuration, manager.getCache(exchange));
     }
 
     @Override
     protected void doStart() throws Exception {
-        cacheContainer = configuration.getCacheContainer();
-        if (cacheContainer == null) {
-            Configuration config = new ConfigurationBuilder().classLoader(Thread.currentThread().getContextClassLoader()).addServers(configuration.getHost()).build();
-            cacheContainer = new RemoteCacheManager(config, true);
-            isManagedCacheContainer = true;
-        }
         super.doStart();
+        manager.start();
     }
 
     @Override
     protected void doStop() throws Exception {
-        if (isManagedCacheContainer) {
-            cacheContainer.stop();
-        }
+        manager.stop();
         super.doStop();
     }
-
-    private BasicCache<Object, Object> getCache(Exchange exchange) {
-        String cacheName = exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class);
-        if (cacheName == null) {
-            cacheName = configuration.getCacheName();
-        }
-        LOGGER.trace("Cache[{}]", cacheName);
-        return cacheName != null ? cacheContainer.getCache(cacheName) : cacheContainer.getCache();
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
index 95a5d04..3bc6c2a 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
@@ -16,8 +16,17 @@
  */
 package org.apache.camel.component.infinispan;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ResourceHelper;
+import org.infinispan.AdvancedCache;
 import org.infinispan.Cache;
 import org.infinispan.client.hotrod.RemoteCache;
 import org.infinispan.client.hotrod.RemoteCacheManager;
@@ -68,10 +77,20 @@ public final class InfinispanUtil {
     }
 
     @SuppressWarnings("unchecked")
+    public static <K, V> Cache<K, V> asEmbedded(BasicCache<K, V> cache) {
+        return Cache.class.cast(cache);
+    }
+
+    @SuppressWarnings("unchecked")
     public static <K, V>  RemoteCache<K, V> asRemote(BasicCache<K, V> cache) {
         return RemoteCache.class.cast(cache);
     }
 
+    @SuppressWarnings("unchecked")
+    public static <K, V> AdvancedCache<K, V> asAdvanced(BasicCache<K, V> cache) {
+        return Cache.class.cast(cache).getAdvancedCache();
+    }
+
     public static <K, V> BasicCache<K, V> ignoreReturnValuesCache(BasicCache<K, V> cache) {
         if (isEmbedded(cache)) {
             return ((Cache<K, V>) cache).getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES);
@@ -81,10 +100,36 @@ public final class InfinispanUtil {
     }
 
     public static boolean isInHeaderEmpty(Exchange exchange, String header) {
-        return ObjectHelper.isEmpty(exchange.getIn().getHeader(header));
+        return isHeaderEmpty(exchange.getIn(), header);
+    }
+
+    public static boolean isHeaderEmpty(Message message, String header) {
+        return ObjectHelper.isEmpty(message.getHeader(header));
     }
 
     public static BasicCache<Object, Object> getCache(BasicCacheContainer cacheContainer, String cacheName) {
         return ObjectHelper.isEmpty(cacheName) ? cacheContainer.getCache() : cacheContainer.getCache(cacheName);
     }
+
+    public static Properties loadProperties(CamelContext camelContext, String uri) throws IOException {
+        if (camelContext != null) {
+            try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) {
+                Properties properties = new Properties();
+                properties.load(is);
+
+                return properties;
+            } catch (IOException e) {
+            }
+        }
+
+        try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(uri)) {
+            Properties properties = new Properties();
+            properties.load(is);
+
+            return properties;
+        } catch (IOException e) {
+        }
+
+        throw new FileNotFoundException("Cannot find resource: " + uri);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
index 4f50950..0d6f630 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
@@ -22,7 +22,6 @@ import org.infinispan.notifications.Listener;
 
 @Listener(clustered = false, sync = false)
 public class InfinispanAsyncLocalEventListener extends InfinispanSyncLocalEventListener {
-
     public InfinispanAsyncLocalEventListener(InfinispanConsumer consumer, Set<String> eventTypes) {
         super(consumer, eventTypes);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
index 113c073..d4a70ca 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
@@ -20,6 +20,7 @@ import org.apache.camel.component.infinispan.InfinispanConfiguration;
 import org.apache.camel.component.infinispan.InfinispanConsumer;
 import org.apache.camel.component.infinispan.InfinispanConsumerHandler;
 import org.apache.camel.component.infinispan.InfinispanEventListener;
+import org.apache.camel.component.infinispan.InfinispanUtil;
 import org.infinispan.Cache;
 
 public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsumerHandler {
@@ -30,7 +31,7 @@ public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsum
 
     @Override
     public InfinispanEventListener start(InfinispanConsumer consumer) {
-        Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache();
+        Cache<?, ?> embeddedCache = InfinispanUtil.asEmbedded(consumer.getCache());
         InfinispanConfiguration configuration = consumer.getConfiguration();
         InfinispanEventListener listener;
         if (configuration.hasCustomListener()) {
@@ -55,8 +56,7 @@ public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsum
 
     @Override
     public void stop(InfinispanConsumer consumer) {
-        Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache();
+        Cache<?, ?> embeddedCache = InfinispanUtil.asEmbedded(consumer.getCache());
         embeddedCache.removeListener(consumer.getListener());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
index bcf6957..14fac43 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.infinispan.embedded;
 
 import java.util.Set;
+
 import org.apache.camel.component.infinispan.InfinispanConsumer;
 import org.apache.camel.component.infinispan.InfinispanEventListener;
 import org.infinispan.notifications.Listener;
@@ -25,16 +26,12 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
 import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Listener(clustered = true, sync = true)
 public class InfinispanSyncClusteredEventListener extends InfinispanEventListener {
-    /*clustered listeners only listen for post events*/
+    // clustered listeners only listen for post events
     private static final boolean IS_PRE = false;
 
-    private final transient Logger logger = LoggerFactory.getLogger(this.getClass());
-
     public InfinispanSyncClusteredEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) {
         super(infinispanConsumer, eventTypes);
     }
@@ -44,8 +41,6 @@ public class InfinispanSyncClusteredEventListener extends InfinispanEventListene
     @CacheEntryRemoved
     @CacheEntryExpired
     public void processEvent(CacheEntryEvent<Object, Object> event) {
-        logger.trace("Received CacheEntryEvent [{}]", event);
-
         if (isAccepted(event.getType().toString())) {
             infinispanConsumer.processEvent(event.getType().toString(), IS_PRE, event.getCache().getName(), event.getKey());
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
index 1cf9325..e8112bd 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.infinispan.embedded;
 
 import java.util.Set;
+
 import org.apache.camel.component.infinispan.InfinispanConsumer;
 import org.apache.camel.component.infinispan.InfinispanEventListener;
 import org.infinispan.notifications.Listener;
@@ -30,13 +31,9 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivate
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
 import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Listener(clustered = false, sync = true)
 public class InfinispanSyncLocalEventListener extends InfinispanEventListener {
-    private final transient Logger logger = LoggerFactory.getLogger(this.getClass());
-
     public InfinispanSyncLocalEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) {
         super(infinispanConsumer, eventTypes);
     }
@@ -51,8 +48,6 @@ public class InfinispanSyncLocalEventListener extends InfinispanEventListener {
     @CacheEntryVisited
     @CacheEntryExpired
     public void processEvent(CacheEntryEvent<Object, Object> event) {
-        logger.trace("Received CacheEntryEvent [{}]", event);
-
         dispatch(event.getType().toString(), event.isPre(), event.getCache().getName(), event.getKey());
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
index e7838b3..b28a460 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
@@ -29,17 +29,19 @@ import org.infinispan.manager.DefaultCacheManager;
 public class InfinispanIdempotentRepository extends ServiceSupport implements IdempotentRepository<Object> {
     private final String cacheName;
     private final BasicCacheContainer cacheContainer;
-    private boolean isManagedCacheContainer;
+    private final boolean isManagedCacheContainer;
+    private BasicCache<Object, Boolean> cache;
 
     public InfinispanIdempotentRepository(BasicCacheContainer cacheContainer, String cacheName) {
         this.cacheContainer = cacheContainer;
         this.cacheName = cacheName;
+        this.isManagedCacheContainer = false;
     }
 
     public InfinispanIdempotentRepository(String cacheName) {
-        cacheContainer = new DefaultCacheManager();
+        this.cacheContainer = new DefaultCacheManager();
         this.cacheName = cacheName;
-        isManagedCacheContainer = true;
+        this.isManagedCacheContainer = true;
     }
 
     public InfinispanIdempotentRepository() {
@@ -106,16 +108,21 @@ public class InfinispanIdempotentRepository extends ServiceSupport implements Id
 
     @Override
     protected void doShutdown() throws Exception {
-        super.doShutdown();
         if (isManagedCacheContainer) {
             cacheContainer.stop();
         }
+
+        super.doShutdown();
     }
 
     private BasicCache<Object, Boolean> getCache() {
-        return cacheName != null
+        if (cache == null) {
+            cache = cacheName != null
                 ? cacheContainer.<Object, Boolean>getCache(cacheName)
                 : cacheContainer.<Object, Boolean>getCache();
+        }
+
+        return cache;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
index dd0f590..42b3cec 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
@@ -16,7 +16,7 @@
  */
 package org.apache.camel.component.infinispan.remote;
 
-import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.component.infinispan.InfinispanConfiguration;
 import org.apache.camel.component.infinispan.InfinispanConstants;
 import org.apache.camel.component.infinispan.InfinispanQueryBuilder;
@@ -30,8 +30,8 @@ public final class InfinispanRemoteOperation {
     private InfinispanRemoteOperation() {
     }
 
-    public static Query buildQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
-        InfinispanQueryBuilder queryBuilder = exchange.getIn().getHeader(InfinispanConstants.QUERY_BUILDER, InfinispanQueryBuilder.class);
+    public static Query buildQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+        InfinispanQueryBuilder queryBuilder = message.getHeader(InfinispanConstants.QUERY_BUILDER, InfinispanQueryBuilder.class);
         if (queryBuilder == null) {
             queryBuilder = configuration.getQueryBuilder();
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
index b47b1db..747a9e4 100644
--- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.infinispan;
 
 import java.util.List;
+
 import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.infinispan.Cache;
@@ -90,15 +91,18 @@ public class InfinispanClusterTestSupport extends CamelTestSupport {
         } catch (Throwable ex) {
             throw new Exception(ex);
         }
+
         super.setUp();
     }
 
     @Override
     public void tearDown() throws Exception {
+        super.tearDown();
+
+        // Has to be done later, maybe CamelTestSupport should
         for (BasicCacheContainer container: clusteredCacheContainers) {
             container.stop();
         }
-        super.tearDown();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
new file mode 100644
index 0000000..48c7772
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import org.infinispan.cache.impl.DecoratedCache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.manager.DefaultCacheManager;
+import org.jgroups.util.UUID;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class InfinispanConfigurationTestIT {
+
+    @Test
+    public void embeddedCacheWithFlagsTest() throws Exception {
+        InfinispanConfiguration configuration = new InfinispanConfiguration();
+        configuration.setHost("localhost");
+        configuration.setCacheName("misc_cache");
+        configuration.setCacheContainer(new DefaultCacheManager(true));
+        configuration.setFlags(
+            org.infinispan.context.Flag.SKIP_CACHE_LOAD,
+            org.infinispan.context.Flag.SKIP_CACHE_STORE
+        );
+
+        InfinispanManager manager = new InfinispanManager(configuration);
+        manager.start();
+
+        BasicCache<Object, Object> cache = manager.getCache();
+        assertNotNull(cache);
+        assertTrue(cache instanceof DecoratedCache);
+
+        DecoratedCache<Object, Object> decoratedCache = (DecoratedCache<Object, Object>)cache;
+        assertTrue(decoratedCache.getFlags().contains(org.infinispan.context.Flag.SKIP_CACHE_LOAD));
+        assertTrue(decoratedCache.getFlags().contains(org.infinispan.context.Flag.SKIP_CACHE_STORE));
+
+        manager.getCacheContainer().stop();
+        manager.stop();
+    }
+
+    @Test
+    public void remoteCacheWithoutProperties() throws Exception {
+        InfinispanConfiguration configuration = new InfinispanConfiguration();
+        configuration.setHost("localhost");
+        configuration.setCacheName("misc_cache");
+
+        InfinispanManager manager = new InfinispanManager(configuration);
+        manager.start();
+
+        BasicCache<Object, Object> cache = manager.getCache();
+        assertNotNull(cache);
+        assertTrue(cache instanceof RemoteCache);
+
+        RemoteCache<Object, Object> remoteCache = InfinispanUtil.asRemote(cache);
+
+        String key = UUID.randomUUID().toString();
+        assertNull(remoteCache.put(key, "val1"));
+        assertNull(remoteCache.put(key, "val2"));
+
+        manager.stop();
+    }
+
+    @Test
+    public void remoteCacheWithPropertiesTest() throws Exception {
+        InfinispanConfiguration configuration = new InfinispanConfiguration();
+        configuration.setHost("localhost");
+        configuration.setCacheName("misc_cache");
+        configuration.setConfigurationUri("infinispan/client.properties");
+
+        InfinispanManager manager = new InfinispanManager(configuration);
+        manager.start();
+
+        BasicCache<Object, Object> cache = manager.getCache();
+        assertNotNull(cache);
+        assertTrue(cache instanceof RemoteCache);
+
+        String key = UUID.randomUUID().toString();
+        assertNull(cache.put(key, "val1"));
+        assertNotNull(cache.put(key, "val2"));
+
+        manager.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/test/resources/infinispan/client.properties
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/resources/infinispan/client.properties b/components/camel-infinispan/src/test/resources/infinispan/client.properties
new file mode 100644
index 0000000..4cacbe2
--- /dev/null
+++ b/components/camel-infinispan/src/test/resources/infinispan/client.properties
@@ -0,0 +1,20 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+# Whether or not to implicitly Flag.FORCE_RETURN_VALUE for all calls.
+# The default value for this property is false.
+infinispan.client.hotrod.force_return_values = true

http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/resources/log4j.xml b/components/camel-infinispan/src/test/resources/log4j.xml
new file mode 100644
index 0000000..71a2b38
--- /dev/null
+++ b/components/camel-infinispan/src/test/resources/log4j.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+    <appender name="FILE" class="org.apache.log4j.FileAppender">
+        <param name="File" value="target/camel-infinispan-test.log" />
+        <param name="Append" value="true" />
+        <param name="Threshold" value="INFO" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d [%-15.15t] %-5p %-30.30c{1} - %m%n" />
+        </layout>
+    </appender>
+
+    <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+        <param name="Threshold" value="INFO" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%30.30t] %-30.30c{1} %-5p %m%n" />
+        </layout>
+    </appender>
+
+    <!-- ================ -->
+    <!-- Limit categories -->
+    <!-- ================ -->
+    <category name="org.jboss.arquillian">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.infinispan.arquillian.core.WithRunningServerObserver">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.infinispan.server.test.util.TestsuiteListener">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.apache.http">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.infinispan">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.jgroups">
+        <priority value="INFO" />
+    </category>
+
+    <category name="org.apache.commons.httpclient.auth">
+        <priority value="INFO"/>
+    </category>
+
+    <category name="org.apache.auth">
+        <priority value="INFO"/>
+    </category>
+    
+    <category name="org.apache.directory">
+        <priority value="WARN"/>
+    </category>
+
+    <!-- ======================= -->
+    <!-- Setup the Root category -->
+    <!-- ======================= -->
+
+    <root>
+        <priority value="INFO" />
+        <appender-ref ref="FILE" />
+    </root>
+
+</log4j:configuration>