You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/04/27 21:57:19 UTC

camel git commit: CAMEL-8709 Camel-Infinispan: Add Async operations to available set

Repository: camel
Updated Branches:
  refs/heads/master fe823e345 -> 1451499b8


CAMEL-8709 Camel-Infinispan: Add Async operations to available set


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1451499b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1451499b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1451499b

Branch: refs/heads/master
Commit: 1451499b83a69842658f0bfd95a8cd1fa2632e80
Parents: fe823e3
Author: Andrea Cosentino <an...@gmail.com>
Authored: Sun Apr 26 22:07:12 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Apr 27 21:55:20 2015 +0200

----------------------------------------------------------------------
 .../infinispan/InfinispanConstants.java         |   5 +
 .../infinispan/InfinispanOperation.java         |  87 +++++
 .../infinispan/InfinispanProducerTest.java      | 371 +++++++++++++++++++
 3 files changed, 463 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1451499b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
index 2c0ba42..481446c 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
@@ -25,13 +25,18 @@ public interface InfinispanConstants {
     String MAP = "CamelInfinispanMap";    
     String OPERATION = "CamelInfinispanOperation";
     String PUT = "CamelInfinispanOperationPut";
+    String PUT_ASYNC = "CamelInfinispanOperationPutAsync";
     String PUT_IF_ABSENT = "CamelInfinispanOperationPutIfAbsent";
+    String PUT_IF_ABSENT_ASYNC = "CamelInfinispanOperationPutIfAbsentAsync";
     String GET = "CamelInfinispanOperationGet";
     String CONTAINS_KEY = "CamelInfinispanOperationContainsKey";
     String CONTAINS_VALUE = "CamelInfinispanOperationContainsValue";
     String PUT_ALL = "CamelInfinispanOperationPutAll";
+    String PUT_ALL_ASYNC = "CamelInfinispanOperationPutAllAsync";
     String REMOVE = "CamelInfinispanOperationRemove";
+    String REMOVE_ASYNC = "CamelInfinispanOperationRemoveAsync";
     String REPLACE = "CamelInfinispanOperationReplace";
+    String REPLACE_ASYNC = "CamelInfinispanOperationReplaceAsync";
     String CLEAR = "CamelInfinispanOperationClear";
     String SIZE = "CamelInfinispanOperationSize";
     String RESULT = "CamelInfinispanOperationResult";

http://git-wip-us.apache.org/repos/asf/camel/blob/1451499b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
index 3450666..38b3e74 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.Exchange;
 import org.apache.camel.util.ObjectHelper;
 import org.infinispan.commons.api.BasicCache;
+import org.infinispan.commons.util.concurrent.NotifyingFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,6 +75,26 @@ public class InfinispanOperation {
                 }
                 setResult(result, exchange);
             }
+        }, PUTASYNC {
+            @Override
+            void execute(BasicCache<Object, Object> cache, Exchange exchange) {
+                NotifyingFuture result;
+                if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) {
+                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
+                    if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) 
+                        && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) {
+                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        String maxIdleTimeUnit =  (String) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
+                        result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                    } else {
+                        result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                    }
+                } else {
+                    result = cache.putAsync(getKey(exchange), getValue(exchange));
+                }
+                setResult(result, exchange);
+            }
         }, PUTALL {
             @Override
             void execute(BasicCache<Object, Object> cache, Exchange exchange) {
@@ -92,6 +113,26 @@ public class InfinispanOperation {
                     cache.putAll(getMap(exchange));
                 }
             }
+        }, PUTALLASYNC {
+            @Override
+            void execute(BasicCache<Object, Object> cache, Exchange exchange) {
+                NotifyingFuture result;
+                if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) {
+                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
+                    if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) 
+                        && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) {
+                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        String maxIdleTimeUnit =  (String) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
+                        result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                    } else {
+                        result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                    }
+                } else {
+                    result = cache.putAllAsync(getMap(exchange));
+                }
+                setResult(result, exchange);
+            }
         }, PUTIFABSENT {
             @Override
             void execute(BasicCache<Object, Object> cache, Exchange exchange) {
@@ -112,6 +153,26 @@ public class InfinispanOperation {
                 }
                 setResult(result, exchange);
             }
+        }, PUTIFABSENTASYNC {
+            @Override
+            void execute(BasicCache<Object, Object> cache, Exchange exchange) {
+                NotifyingFuture result;
+                if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) {
+                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
+                    if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) 
+                        && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) {
+                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
+                        result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                    } else {
+                        result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                    }
+                } else {
+                    result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange));
+                }
+                setResult(result, exchange);
+            }
         }, GET {
             @Override
             void execute(BasicCache<Object, Object> cache, Exchange exchange) {
@@ -136,6 +197,12 @@ public class InfinispanOperation {
                 Object result = cache.remove(getKey(exchange));
                 setResult(result, exchange);
             }
+        }, REMOVEASYNC {
+            @Override
+            void execute(BasicCache<Object, Object> cache, Exchange exchange) {
+                NotifyingFuture result = cache.removeAsync(getKey(exchange));
+                setResult(result, exchange);
+            }
         }, REPLACE {
             @Override
             void execute(BasicCache<Object, Object> cache, Exchange exchange) {
@@ -156,6 +223,26 @@ public class InfinispanOperation {
                 }
                 setResult(result, exchange);
             }
+        }, REPLACEASYNC {
+            @Override
+            void execute(BasicCache<Object, Object> cache, Exchange exchange) {
+                NotifyingFuture result;
+                if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) {
+                    long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+                    String timeUnit =  exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
+                    if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME)) 
+                        && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) {
+                        long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+                        String maxIdleTimeUnit =  exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
+                        result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+                    } else {
+                        result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+                    }
+                } else {
+                    result = cache.replaceAsync(getKey(exchange), getValue(exchange));
+                }
+                setResult(result, exchange);
+            }
         }, SIZE {
             @Override
             void execute(BasicCache<Object, Object> cache, Exchange exchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/1451499b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
index 84df8e4..c806847 100644
--- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.infinispan.commons.util.concurrent.NotifyingFuture;
 import org.junit.Test;
 
 public class InfinispanProducerTest extends InfinispanTestSupport {
@@ -79,6 +80,86 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
     }
     
     @Test
+    public void publishKeyAndValueAsync() throws Exception {
+        Exchange exchange = template.send("direct:putasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+            }
+        });
+
+        Thread.sleep(10000);
+        NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+        assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+        
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value.toString(), VALUE_ONE);      
+    }
+
+    @Test
+    public void publishKeyAndValueAsyncWithLifespan() throws Exception {
+        Exchange exchange = template.send("direct:putasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME));
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+            }
+        });
+
+        Thread.sleep(1000);
+        NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+        assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+        
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value.toString(), VALUE_ONE);  
+        
+        Thread.sleep(6000);
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }
+    
+    @Test
+    public void publishKeyAndValueAsyncWithLifespanAndMaxIdle() throws Exception {
+        Exchange exchange = template.send("direct:putasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE));
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+                exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME));
+                exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString());
+            }
+        });
+
+        Thread.sleep(1000);
+        NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+        assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+        
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value.toString(), VALUE_ONE);  
+        
+        Thread.sleep(10000);
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }
+    
+    @Test
     public void publishMapNormal() throws Exception {
         template.send("direct:start", new Processor() {
             @Override
@@ -181,6 +262,110 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
     }
     
     @Test
+    public void publishMapNormalAsync() throws Exception {
+        template.send("direct:putallasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                Map<String, String> map = new HashMap<String, String>();
+                map.put(KEY_ONE, VALUE_ONE);
+                map.put(KEY_TWO, VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.MAP, map);
+            }
+        });
+        
+        Thread.sleep(100);
+        assertEquals(currentCache().size(), 2);
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value.toString(), VALUE_ONE);
+        value = currentCache().get(KEY_TWO);
+        assertEquals(value.toString(), VALUE_TWO);
+    }
+    
+    @Test
+    public void publishMapWithLifespanAsync() throws Exception {
+        template.send("direct:putallasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                Map<String, String> map = new HashMap<String, String>();
+                map.put(KEY_ONE, VALUE_ONE);
+                map.put(KEY_TWO, VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.MAP, map);
+                exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT_ALL);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME));
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+            }
+        });
+
+        Thread.sleep(100);
+        assertEquals(currentCache().size(), 2);
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value.toString(), VALUE_ONE);
+        value = currentCache().get(KEY_TWO);
+        assertEquals(value.toString(), VALUE_TWO);
+        
+        Thread.sleep(LIFESPAN_TIME * 1000);
+        
+        Exchange exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+        
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_TWO);
+            }
+        });
+        resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }
+    
+    @Test
+    public void publishMapWithLifespanAndMaxIdleTimeAsync() throws Exception {
+        template.send("direct:putallasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                Map<String, String> map = new HashMap<String, String>();
+                map.put(KEY_ONE, VALUE_ONE);
+                map.put(KEY_TWO, VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.MAP, map);
+                exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT_ALL);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE));
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+                exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME));
+                exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString());
+            }
+        });
+        
+        Thread.sleep(100);
+        assertEquals(currentCache().size(), 2);
+        
+        Thread.sleep(10000);
+        
+        Exchange exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+        
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_TWO);
+            }
+        });
+        resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }
+    
+    @Test
     public void putIfAbsentAlreadyExists() throws Exception {
         currentCache().put(KEY_ONE, VALUE_ONE);
         
@@ -217,6 +402,86 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
     }
     
     @Test
+    public void putIfAbsentKeyAndValueAsync() throws Exception {
+        Exchange exchange = template.send("direct:putifabsentasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+            }
+        });
+
+        Thread.sleep(10000);
+        NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+        assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+        
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value.toString(), VALUE_ONE);      
+    }
+
+    @Test
+    public void putIfAbsentKeyAndValueAsyncWithLifespan() throws Exception {
+        Exchange exchange = template.send("direct:putifabsentasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME));
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+            }
+        });
+
+        Thread.sleep(1000);
+        NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+        assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+        
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value.toString(), VALUE_ONE);  
+        
+        Thread.sleep(6000);
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }
+    
+    @Test
+    public void putIfAbsentKeyAndValueAsyncWithLifespanAndMaxIdle() throws Exception {
+        Exchange exchange = template.send("direct:putifabsentasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE));
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+                exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME));
+                exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString());
+            }
+        });
+
+        Thread.sleep(1000);
+        NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+        assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+        
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value.toString(), VALUE_ONE);  
+        
+        Thread.sleep(10000);
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }
+    
+    @Test
     public void notContainsKeyTest() throws Exception {
         currentCache().put(KEY_ONE, VALUE_ONE);
         
@@ -423,6 +688,82 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
         String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
         assertEquals(null, resultGet);
     }
+    
+    @Test
+    public void replaceAValueByKeyAsync() throws Exception {
+        currentCache().put(KEY_ONE, VALUE_ONE);
+
+        Exchange exchange = template.request("direct:replaceasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO);
+            }
+        });
+
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE);
+        assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+    }
+    
+    @Test
+    public void replaceAValueByKeyWithLifespanAsync() throws Exception {
+        currentCache().put(KEY_ONE, VALUE_ONE);
+
+        Exchange exchange = template.request("direct:replaceasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME));
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+            }
+        });
+
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE);
+        assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+        
+        Thread.sleep(LIFESPAN_TIME * 1000);
+        
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }
+    
+    @Test
+    public void replaceAValueByKeyWithLifespanAndMaxIdleTimeAsync() throws Exception {
+        currentCache().put(KEY_ONE, VALUE_ONE);
+
+        Exchange exchange = template.request("direct:replaceasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO);
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE));
+                exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+                exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME));
+                exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString());
+            }
+        });
+
+        assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE);
+        assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+        
+        Thread.sleep(10000);
+        
+        exchange = template.send("direct:get", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+            }
+        });
+        String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+        assertEquals(null, resultGet);
+    }
 
     @Test
     public void deletesExistingValueByKey() throws Exception {
@@ -441,6 +782,26 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
         Object value = currentCache().get(KEY_ONE);
         assertEquals(value, null);
     }
+    
+    @Test
+    public void deletesExistingValueByKeyAsync() throws Exception {
+        currentCache().put(KEY_ONE, VALUE_ONE);
+
+        Exchange exchange = template.request("direct:removeasync", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+                exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.REMOVE_ASYNC);
+            }
+        });
+
+        Thread.sleep(100);
+        NotifyingFuture fut = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+        assertEquals(fut.isDone(), Boolean.TRUE);
+
+        Object value = currentCache().get(KEY_ONE);
+        assertEquals(value, null);
+    }
 
     @Test
     public void clearsAllValues() throws Exception {
@@ -527,6 +888,16 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
                         .to("infinispan://localhost?cacheContainer=#cacheContainer&command=CONTAINSVALUE");
                 from("direct:size")
                         .to("infinispan://localhost?cacheContainer=#cacheContainer&command=SIZE");
+                from("direct:putasync")
+                        .to("infinispan://localhost?cacheContainer=#cacheContainer&command=PUTASYNC");
+                from("direct:putallasync")
+                        .to("infinispan://localhost?cacheContainer=#cacheContainer&command=PUTALLASYNC");
+                from("direct:putifabsentasync")
+                        .to("infinispan://localhost?cacheContainer=#cacheContainer&command=PUTIFABSENTASYNC");
+                from("direct:replaceasync")
+                        .to("infinispan://localhost?cacheContainer=#cacheContainer&command=REPLACEASYNC");
+                from("direct:removeasync")
+                        .to("infinispan://localhost?cacheContainer=#cacheContainer&command=REMOVEASYNC");
             }
         };
     }