You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/04/27 21:57:19 UTC
camel git commit: CAMEL-8709 Camel-Infinispan: Add Async operations
to available set
Repository: camel
Updated Branches:
refs/heads/master fe823e345 -> 1451499b8
CAMEL-8709 Camel-Infinispan: Add Async operations to available set
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1451499b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1451499b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1451499b
Branch: refs/heads/master
Commit: 1451499b83a69842658f0bfd95a8cd1fa2632e80
Parents: fe823e3
Author: Andrea Cosentino <an...@gmail.com>
Authored: Sun Apr 26 22:07:12 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Apr 27 21:55:20 2015 +0200
----------------------------------------------------------------------
.../infinispan/InfinispanConstants.java | 5 +
.../infinispan/InfinispanOperation.java | 87 +++++
.../infinispan/InfinispanProducerTest.java | 371 +++++++++++++++++++
3 files changed, 463 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1451499b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
index 2c0ba42..481446c 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
@@ -25,13 +25,18 @@ public interface InfinispanConstants {
String MAP = "CamelInfinispanMap";
String OPERATION = "CamelInfinispanOperation";
String PUT = "CamelInfinispanOperationPut";
+ String PUT_ASYNC = "CamelInfinispanOperationPutAsync";
String PUT_IF_ABSENT = "CamelInfinispanOperationPutIfAbsent";
+ String PUT_IF_ABSENT_ASYNC = "CamelInfinispanOperationPutIfAbsentAsync";
String GET = "CamelInfinispanOperationGet";
String CONTAINS_KEY = "CamelInfinispanOperationContainsKey";
String CONTAINS_VALUE = "CamelInfinispanOperationContainsValue";
String PUT_ALL = "CamelInfinispanOperationPutAll";
+ String PUT_ALL_ASYNC = "CamelInfinispanOperationPutAllAsync";
String REMOVE = "CamelInfinispanOperationRemove";
+ String REMOVE_ASYNC = "CamelInfinispanOperationRemoveAsync";
String REPLACE = "CamelInfinispanOperationReplace";
+ String REPLACE_ASYNC = "CamelInfinispanOperationReplaceAsync";
String CLEAR = "CamelInfinispanOperationClear";
String SIZE = "CamelInfinispanOperationSize";
String RESULT = "CamelInfinispanOperationResult";
http://git-wip-us.apache.org/repos/asf/camel/blob/1451499b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
index 3450666..38b3e74 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.util.ObjectHelper;
import org.infinispan.commons.api.BasicCache;
+import org.infinispan.commons.util.concurrent.NotifyingFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +75,26 @@ public class InfinispanOperation {
}
setResult(result, exchange);
}
+ }, PUTASYNC {
+ @Override
+ void execute(BasicCache<Object, Object> cache, Exchange exchange) {
+ NotifyingFuture result;
+ if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) {
+ long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
+ if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME))
+ && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) {
+ long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ String maxIdleTimeUnit = (String) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
+ result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ } else {
+ result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ }
+ } else {
+ result = cache.putAsync(getKey(exchange), getValue(exchange));
+ }
+ setResult(result, exchange);
+ }
}, PUTALL {
@Override
void execute(BasicCache<Object, Object> cache, Exchange exchange) {
@@ -92,6 +113,26 @@ public class InfinispanOperation {
cache.putAll(getMap(exchange));
}
}
+ }, PUTALLASYNC {
+ @Override
+ void execute(BasicCache<Object, Object> cache, Exchange exchange) {
+ NotifyingFuture result;
+ if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) {
+ long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
+ if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME))
+ && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) {
+ long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ String maxIdleTimeUnit = (String) exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
+ result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ } else {
+ result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ }
+ } else {
+ result = cache.putAllAsync(getMap(exchange));
+ }
+ setResult(result, exchange);
+ }
}, PUTIFABSENT {
@Override
void execute(BasicCache<Object, Object> cache, Exchange exchange) {
@@ -112,6 +153,26 @@ public class InfinispanOperation {
}
setResult(result, exchange);
}
+ }, PUTIFABSENTASYNC {
+ @Override
+ void execute(BasicCache<Object, Object> cache, Exchange exchange) {
+ NotifyingFuture result;
+ if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) {
+ long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
+ if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME))
+ && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) {
+ long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
+ result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ } else {
+ result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ }
+ } else {
+ result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange));
+ }
+ setResult(result, exchange);
+ }
}, GET {
@Override
void execute(BasicCache<Object, Object> cache, Exchange exchange) {
@@ -136,6 +197,12 @@ public class InfinispanOperation {
Object result = cache.remove(getKey(exchange));
setResult(result, exchange);
}
+ }, REMOVEASYNC {
+ @Override
+ void execute(BasicCache<Object, Object> cache, Exchange exchange) {
+ NotifyingFuture result = cache.removeAsync(getKey(exchange));
+ setResult(result, exchange);
+ }
}, REPLACE {
@Override
void execute(BasicCache<Object, Object> cache, Exchange exchange) {
@@ -156,6 +223,26 @@ public class InfinispanOperation {
}
setResult(result, exchange);
}
+ }, REPLACEASYNC {
+ @Override
+ void execute(BasicCache<Object, Object> cache, Exchange exchange) {
+ NotifyingFuture result;
+ if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME)) && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT))) {
+ long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
+ if (!ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME))
+ && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) {
+ long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
+ result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ } else {
+ result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ }
+ } else {
+ result = cache.replaceAsync(getKey(exchange), getValue(exchange));
+ }
+ setResult(result, exchange);
+ }
}, SIZE {
@Override
void execute(BasicCache<Object, Object> cache, Exchange exchange) {
http://git-wip-us.apache.org/repos/asf/camel/blob/1451499b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
index 84df8e4..c806847 100644
--- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.infinispan.commons.util.concurrent.NotifyingFuture;
import org.junit.Test;
public class InfinispanProducerTest extends InfinispanTestSupport {
@@ -79,6 +80,86 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
}
@Test
+ public void publishKeyAndValueAsync() throws Exception {
+ Exchange exchange = template.send("direct:putasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+ }
+ });
+
+ Thread.sleep(10000);
+ NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+ assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+
+ Object value = currentCache().get(KEY_ONE);
+ assertEquals(value.toString(), VALUE_ONE);
+ }
+
+ @Test
+ public void publishKeyAndValueAsyncWithLifespan() throws Exception {
+ Exchange exchange = template.send("direct:putasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME));
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+ }
+ });
+
+ Thread.sleep(1000);
+ NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+ assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+
+ Object value = currentCache().get(KEY_ONE);
+ assertEquals(value.toString(), VALUE_ONE);
+
+ Thread.sleep(6000);
+ exchange = template.send("direct:get", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ }
+ });
+ String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+ assertEquals(null, resultGet);
+ }
+
+ @Test
+ public void publishKeyAndValueAsyncWithLifespanAndMaxIdle() throws Exception {
+ Exchange exchange = template.send("direct:putasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE));
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+ exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME));
+ exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString());
+ }
+ });
+
+ Thread.sleep(1000);
+ NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+ assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+
+ Object value = currentCache().get(KEY_ONE);
+ assertEquals(value.toString(), VALUE_ONE);
+
+ Thread.sleep(10000);
+ exchange = template.send("direct:get", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ }
+ });
+ String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+ assertEquals(null, resultGet);
+ }
+
+ @Test
public void publishMapNormal() throws Exception {
template.send("direct:start", new Processor() {
@Override
@@ -181,6 +262,110 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
}
@Test
+ public void publishMapNormalAsync() throws Exception {
+ template.send("direct:putallasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put(KEY_ONE, VALUE_ONE);
+ map.put(KEY_TWO, VALUE_TWO);
+ exchange.getIn().setHeader(InfinispanConstants.MAP, map);
+ }
+ });
+
+ Thread.sleep(100);
+ assertEquals(currentCache().size(), 2);
+ Object value = currentCache().get(KEY_ONE);
+ assertEquals(value.toString(), VALUE_ONE);
+ value = currentCache().get(KEY_TWO);
+ assertEquals(value.toString(), VALUE_TWO);
+ }
+
+ @Test
+ public void publishMapWithLifespanAsync() throws Exception {
+ template.send("direct:putallasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put(KEY_ONE, VALUE_ONE);
+ map.put(KEY_TWO, VALUE_TWO);
+ exchange.getIn().setHeader(InfinispanConstants.MAP, map);
+ exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT_ALL);
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME));
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+ }
+ });
+
+ Thread.sleep(100);
+ assertEquals(currentCache().size(), 2);
+ Object value = currentCache().get(KEY_ONE);
+ assertEquals(value.toString(), VALUE_ONE);
+ value = currentCache().get(KEY_TWO);
+ assertEquals(value.toString(), VALUE_TWO);
+
+ Thread.sleep(LIFESPAN_TIME * 1000);
+
+ Exchange exchange = template.send("direct:get", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ }
+ });
+ String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+ assertEquals(null, resultGet);
+
+ exchange = template.send("direct:get", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_TWO);
+ }
+ });
+ resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+ assertEquals(null, resultGet);
+ }
+
+ @Test
+ public void publishMapWithLifespanAndMaxIdleTimeAsync() throws Exception {
+ template.send("direct:putallasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put(KEY_ONE, VALUE_ONE);
+ map.put(KEY_TWO, VALUE_TWO);
+ exchange.getIn().setHeader(InfinispanConstants.MAP, map);
+ exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.PUT_ALL);
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE));
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+ exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME));
+ exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString());
+ }
+ });
+
+ Thread.sleep(100);
+ assertEquals(currentCache().size(), 2);
+
+ Thread.sleep(10000);
+
+ Exchange exchange = template.send("direct:get", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ }
+ });
+ String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+ assertEquals(null, resultGet);
+
+ exchange = template.send("direct:get", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_TWO);
+ }
+ });
+ resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+ assertEquals(null, resultGet);
+ }
+
+ @Test
public void putIfAbsentAlreadyExists() throws Exception {
currentCache().put(KEY_ONE, VALUE_ONE);
@@ -217,6 +402,86 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
}
@Test
+ public void putIfAbsentKeyAndValueAsync() throws Exception {
+ Exchange exchange = template.send("direct:putifabsentasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+ }
+ });
+
+ Thread.sleep(10000);
+ NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+ assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+
+ Object value = currentCache().get(KEY_ONE);
+ assertEquals(value.toString(), VALUE_ONE);
+ }
+
+ @Test
+ public void putIfAbsentKeyAndValueAsyncWithLifespan() throws Exception {
+ Exchange exchange = template.send("direct:putifabsentasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME));
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+ }
+ });
+
+ Thread.sleep(1000);
+ NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+ assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+
+ Object value = currentCache().get(KEY_ONE);
+ assertEquals(value.toString(), VALUE_ONE);
+
+ Thread.sleep(6000);
+ exchange = template.send("direct:get", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ }
+ });
+ String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+ assertEquals(null, resultGet);
+ }
+
+ @Test
+ public void putIfAbsentKeyAndValueAsyncWithLifespanAndMaxIdle() throws Exception {
+ Exchange exchange = template.send("direct:putifabsentasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE));
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+ exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME));
+ exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString());
+ }
+ });
+
+ Thread.sleep(1000);
+ NotifyingFuture resultPutAsync = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+ assertEquals(Boolean.TRUE, resultPutAsync.isDone());
+
+ Object value = currentCache().get(KEY_ONE);
+ assertEquals(value.toString(), VALUE_ONE);
+
+ Thread.sleep(10000);
+ exchange = template.send("direct:get", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ }
+ });
+ String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+ assertEquals(null, resultGet);
+ }
+
+ @Test
public void notContainsKeyTest() throws Exception {
currentCache().put(KEY_ONE, VALUE_ONE);
@@ -423,6 +688,82 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
assertEquals(null, resultGet);
}
+
+ @Test
+ public void replaceAValueByKeyAsync() throws Exception {
+ currentCache().put(KEY_ONE, VALUE_ONE);
+
+ Exchange exchange = template.request("direct:replaceasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO);
+ }
+ });
+
+ assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE);
+ assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+ }
+
+ @Test
+ public void replaceAValueByKeyWithLifespanAsync() throws Exception {
+ currentCache().put(KEY_ONE, VALUE_ONE);
+
+ Exchange exchange = template.request("direct:replaceasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO);
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME));
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+ }
+ });
+
+ assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE);
+ assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+
+ Thread.sleep(LIFESPAN_TIME * 1000);
+
+ exchange = template.send("direct:get", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ }
+ });
+ String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+ assertEquals(null, resultGet);
+ }
+
+ @Test
+ public void replaceAValueByKeyWithLifespanAndMaxIdleTimeAsync() throws Exception {
+ currentCache().put(KEY_ONE, VALUE_ONE);
+
+ Exchange exchange = template.request("direct:replaceasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO);
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE));
+ exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString());
+ exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME));
+ exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString());
+ }
+ });
+
+ assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class), VALUE_ONE);
+ assertEquals(currentCache().get(KEY_ONE), VALUE_TWO);
+
+ Thread.sleep(10000);
+
+ exchange = template.send("direct:get", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ }
+ });
+ String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class);
+ assertEquals(null, resultGet);
+ }
@Test
public void deletesExistingValueByKey() throws Exception {
@@ -441,6 +782,26 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
Object value = currentCache().get(KEY_ONE);
assertEquals(value, null);
}
+
+ @Test
+ public void deletesExistingValueByKeyAsync() throws Exception {
+ currentCache().put(KEY_ONE, VALUE_ONE);
+
+ Exchange exchange = template.request("direct:removeasync", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE);
+ exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.REMOVE_ASYNC);
+ }
+ });
+
+ Thread.sleep(100);
+ NotifyingFuture fut = exchange.getIn().getHeader(InfinispanConstants.RESULT, NotifyingFuture.class);
+ assertEquals(fut.isDone(), Boolean.TRUE);
+
+ Object value = currentCache().get(KEY_ONE);
+ assertEquals(value, null);
+ }
@Test
public void clearsAllValues() throws Exception {
@@ -527,6 +888,16 @@ public class InfinispanProducerTest extends InfinispanTestSupport {
.to("infinispan://localhost?cacheContainer=#cacheContainer&command=CONTAINSVALUE");
from("direct:size")
.to("infinispan://localhost?cacheContainer=#cacheContainer&command=SIZE");
+ from("direct:putasync")
+ .to("infinispan://localhost?cacheContainer=#cacheContainer&command=PUTASYNC");
+ from("direct:putallasync")
+ .to("infinispan://localhost?cacheContainer=#cacheContainer&command=PUTALLASYNC");
+ from("direct:putifabsentasync")
+ .to("infinispan://localhost?cacheContainer=#cacheContainer&command=PUTIFABSENTASYNC");
+ from("direct:replaceasync")
+ .to("infinispan://localhost?cacheContainer=#cacheContainer&command=REPLACEASYNC");
+ from("direct:removeasync")
+ .to("infinispan://localhost?cacheContainer=#cacheContainer&command=REMOVEASYNC");
}
};
}